uni-3 log

    Search by

    cloud composer(airflow)でbigqueryテーブルの集計結果をGCSへupload

    bigqueryの集計結果をGCSに出力したかった。しかし、どうやら1回でできそうなoperatorがなかったため、結果を一旦集計テーブルとして作成し、GCSにexportする形をとった

    使用したoperatorはこちらのstack overflow回答まんまです

    構成

    • python: 3.7
    • airflow: 1.10.6

    実装

    dagは日次実行する

    from __future__ import print_function
    from airflow.models import DAG
    from airflow.contrib.operators import bigquery_operator
    from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
    
    import datetime
    
    
    # 7/2~毎日0時(日本時間9時)に実行
    default_args = {
        "owner": "Airflow",
        "depends_on_past": False,
        "start_date": datetime.datetime(2020, 7, 1),
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 0
    }
    dag = DAG(
        "bq_to_gcs",
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_args,
        catchup=False,
    )

    実行日ごとにテーブルを作成しておく。テーブルの定義はダミー

    sql=f"""
    select
      user_id
      ,item_id
      ,date(timestamp(hits_datetime)) as action_dt
      ,action_type
    from `{PROJECT_ID}.table_name.raw_data`
    where
     date_sub(current_date('Asia/Tokyo'), interval 30 day)
    group_by action_dt, user_id, item_id, action_type
    """.(PROJECT_ID)
    
    bq_distination_table = bigquery_operator.BigQueryOperator(
        task_id="bq_distination_table",
        sql=sql,
        use_legacy_sql=False,
        destination_dataset_table= PROJECT_ID+".sandbox.agg_data_{{ds_nodash}}",
         write_disposition='WRITE_TRUNCATE',
        dag=dag
    )

    結果をGCSへ

    # export to gcs a dataset generated by pre dag
    upload_to_gcs = BigQueryToCloudStorageOperator(
        task_id="upload_to_gcs",
        source_project_dataset_table=PROJECT_ID+".sandbox.agg_data_{{ds_nodash}}",
        destination_cloud_storage_uris="gs://bukcet/export_{{ds_nodash}}.csv",
        compression="GZIP", # or NONE
        export_format="CSV",
        field_delimiter=",",
        print_header=True,
        bigquery_conn_id="bigquery_default",
        delegate_to=None,
        labels=None,
        dag=dag
    )
    
    bq_distination_table >> upload_to_gcs

    テーブル設計は

    ワイルドカード テーブルを使用した複数テーブルに対するクエリ

    を参考に実行日ごとでテーブルを作成することでスキャンするテーブル範囲を少なくなるようにした

    パーティションテーブルでの実装も検討したが、 あらかじめテーブルを作らなければならない、クエリに条件指定を入れる必要がある。のがやや面倒だったので断念した

    dagはこんな感じ(左2つのtask)。GCSファイルはai platformの学習データとして用いる予定

    dag

    参考

    2021, Built with Gatsby. This site uses Google Analytics.