uni farm

cloud composer(airflow)でbigqueryテーブルの集計結果をGCSへupload

cloud composer(airflow)でbigqueryテーブルの集計結果をGCSへupload

bigqueryの集計結果をGCSに出力したかった。しかし、どうやら1回でできそうなoperatorがなかったため、結果を一旦集計テーブルとして作成し、GCSにexportする形をとった

使用したoperatorはこちらのstack overflow回答まんまです

構成

  • python: 3.7
  • airflow: 1.10.6

実装

dagは日次実行する

from __future__ import print_function
from airflow.models import DAG
from airflow.contrib.operators import bigquery_operator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator

import datetime


# 7/2~毎日0時(日本時間9時)に実行
default_args = {
    "owner": "Airflow",
    "depends_on_past": False,
    "start_date": datetime.datetime(2020, 7, 1),
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 0
}
dag = DAG(
    "bq_to_gcs",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_args,
    catchup=False,
)

実行日ごとにテーブルを作成しておく。テーブルの定義はダミー

sql=f"""
select
  user_id
  ,item_id
  ,date(timestamp(hits_datetime)) as action_dt
  ,action_type
from `{PROJECT_ID}.table_name.raw_data`
where
 date_sub(current_date('Asia/Tokyo'), interval 30 day)
group_by action_dt, user_id, item_id, action_type
""".(PROJECT_ID)

bq_distination_table = bigquery_operator.BigQueryOperator(
    task_id="bq_distination_table",
    sql=sql,
    use_legacy_sql=False,
    destination_dataset_table= PROJECT_ID+".sandbox.agg_data_{{ds_nodash}}",
     write_disposition='WRITE_TRUNCATE',
    dag=dag
)

結果をGCSへ

# export to gcs a dataset generated by pre dag
upload_to_gcs = BigQueryToCloudStorageOperator(
    task_id="upload_to_gcs",
    source_project_dataset_table=PROJECT_ID+".sandbox.agg_data_{{ds_nodash}}",
    destination_cloud_storage_uris="gs://bukcet/export_{{ds_nodash}}.csv",
    compression="GZIP", # or NONE
    export_format="CSV",
    field_delimiter=",",
    print_header=True,
    bigquery_conn_id="bigquery_default",
    delegate_to=None,
    labels=None,
    dag=dag
)

bq_distination_table >> upload_to_gcs

テーブル設計は

ワイルドカード テーブルを使用した複数テーブルに対するクエリ

を参考に実行日ごとでテーブルを作成することでスキャンするテーブル範囲を少なくなるようにした

パーティションテーブルでの実装も検討したが、 あらかじめテーブルを作らなければならない、クエリに条件指定を入れる必要がある。のがやや面倒だったので断念した

dagはこんな感じ(左2つのtask)。GCSファイルはai platformの学習データとして用いる予定

dag

参考

2023, Built with Gatsby. This site uses Google Analytics.