bigqueryの集計結果をGCSに出力したかった。しかし、どうやら1回でできそうなoperatorがなかったため、結果を一旦集計テーブルとして作成し、GCSにexportする形をとった
使用したoperatorはこちらのstack overflow回答まんまです
構成
- python: 3.7
- airflow: 1.10.6
実装
dagは日次実行する
from __future__ import print_function
from airflow.models import DAG
from airflow.contrib.operators import bigquery_operator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
import datetime
# 7/2~毎日0時(日本時間9時)に実行
default_args = {
"owner": "Airflow",
"depends_on_past": False,
"start_date": datetime.datetime(2020, 7, 1),
"email_on_failure": False,
"email_on_retry": False,
"retries": 0
}
dag = DAG(
"bq_to_gcs",
schedule_interval=datetime.timedelta(days=1),
default_args=default_args,
catchup=False,
)
実行日ごとにテーブルを作成しておく。テーブルの定義はダミー
sql=f"""
select
user_id
,item_id
,date(timestamp(hits_datetime)) as action_dt
,action_type
from `{PROJECT_ID}.table_name.raw_data`
where
date_sub(current_date('Asia/Tokyo'), interval 30 day)
group_by action_dt, user_id, item_id, action_type
""".(PROJECT_ID)
bq_distination_table = bigquery_operator.BigQueryOperator(
task_id="bq_distination_table",
sql=sql,
use_legacy_sql=False,
destination_dataset_table= PROJECT_ID+".sandbox.agg_data_{{ds_nodash}}",
write_disposition='WRITE_TRUNCATE',
dag=dag
)
結果をGCSへ
# export to gcs a dataset generated by pre dag
upload_to_gcs = BigQueryToCloudStorageOperator(
task_id="upload_to_gcs",
source_project_dataset_table=PROJECT_ID+".sandbox.agg_data_{{ds_nodash}}",
destination_cloud_storage_uris="gs://bukcet/export_{{ds_nodash}}.csv",
compression="GZIP", # or NONE
export_format="CSV",
field_delimiter=",",
print_header=True,
bigquery_conn_id="bigquery_default",
delegate_to=None,
labels=None,
dag=dag
)
bq_distination_table >> upload_to_gcs
テーブル設計は
を参考に実行日ごとでテーブルを作成することでスキャンするテーブル範囲を少なくなるようにした
パーティションテーブルでの実装も検討したが、 あらかじめテーブルを作らなければならない、クエリに条件指定を入れる必要がある。のがやや面倒だったので断念した
dagはこんな感じ(左2つのtask)。GCSファイルはai platformの学習データとして用いる予定