prefectのdbt runner、PrefectDbtRunnerでbigqueryの認証方法を切り替える


bigqueryを使っていると、権限管理の都合上、ローカルでの実行ではユーザー認証、サーバー上などではサービスカウント認証、などといった認証方式の切り替えを行うケースが出てくる

PrefectDbtRunnerを用いたとき、実現するのにやりづらい部分があったので思いついた方法をまとめた。特にdbt profiles.ymlは認証方法に加え、スキーマや接続先を切り替える設定項目(target)もあるので、そのあたりのケアに注意する必要がでてくる

有用そうだと思った方法は実装して動作確認をしてみた。ついでにPrefectDbtRunnerの利点も記載

だいぶマニアックな記事になった

環境

  • prefect: 3.6
  • prefect-dbt: 0.7.9
  • dbt-core: 1.9

方針ぎめ

前提・要件

  • ルートリポジトリのdbt_project以下にdbtプロジェクトを配置している

  • targetはdev/prod

違いとしては宛先のスキーマ(データセット)が異なる

  • target名はprodにしたい

カスタムスキーマの指定にgenerate_schema_name_for_envを使いたい。この関数内では、target名はprod前提で動くので、本番環境向けにはtarget prodとする

https://docs.getdbt.com/docs/build/custom-schemas#a-built-in-alternative-pattern-for-generating-schema-names

  • cli経由でもtarget prodで実行したい

実際の運用では実行できない(もしくはデフォルトではエラーになる)ようにするほうがベターだが、analysisのクエリ実行や、compileをしたいときもあるので許可したい

  • ローカルからflowを実行した際にはoauthの認証も許可したい

実行環境とアカウントを明確に分けておきたいので、ローカル(スクリプト経由で)実行した際はデフォルトの認証を使いたい。必須ではないが、

deployment(work pool)上ではサービスアカウント実行にする

方針

思いついたのが以下

envを使って認証方式を切り替えつつ、flowではblocks経由でサービスアカウントファイルを生成して、一時ファイルとして設置する方法

  • cli経由ではoauth。オプションとして、.envに値を設定することでサービスアカウントでも実行可能
  • flowをローカル実行する場合、oauth経由で実行できる。deployment(work pool)上での実行時はサービスアカウント(blocks読み取り)で実行する

まとめるとこう。このパターンを実現したい

AIにまとめさせた、ので項目に重複もあるがこんな感じ

シナリオDBT_TARGETDBT_AUTH_METHOD選ばれる outputs認証BIGQUERY_KEYFILE実行方法
ローカルdevdevoauthdevOAuth不要python flows/dbt.py or DBT CLI
ローカルでprod確認prodoauthprodOAuth不要python flows/dbt.py or DBT CLI
ローカルでprod SAprodservice-accountprodSA必要 ローカルに設置 or blocksから生成python flows/dbt.py or DBT CLI
Prefect devdevoauthdevOAuth不要Prefect
Prefect prodprodservice-accountprodSA必要 blocksから生成Prefect

他に考慮したこと

他に考えたことと却下したこと

  • ローカルではtarget devのみ動かす運用にする target prodに対してanalysis実行したいときもあるのでやめておく
  • profiles.ymlをflow用とlocal実行用で分ける 同じターゲットの内容で認証方法だけ変えたファイルを用意するのはちょっとな、という個人的な好みで不採用 こちらのprofiles.ymlを分ける方法は別解としてこの記事に記載しておく
  • ローカル/deployment上での実行でtarget名を分ける(prod/prod-flowなどとする) こちらはカスタムスキーマの指定にgenerate_schema_name_for_envを使う場合はマクロのロジックに手を入れることになるので却下した

正直どの方法も認証用にカスタム設定するので見づらい(仕組みを把握しづらい)ので大差はないな、という印象

実装

profiles.yml

中身は以下prod targetのみ記載(devもほぼ同じなため)。envへの設定の有無で実行環境が変わる

methodがoauthのとき、keyfileは無視されるので値が設定されてなくとも、問題なく動く

blog:
  target: dev
  outputs:
    prod:
      type: bigquery
      method: "{{ env_var('DBT_AUTH_METHOD', 'oauth') }}"
      project: "{{ env_var('BIGQUERY_PROJECT_ID', 'dummy-project') }}"
      keyfile: "{{ env_var('BIGQUERY_KEYFILE', '') }}"
      schema: blog
      timeout_seconds: 300
      location: US

prefect flow

flowといってるが実態はtaskとして実装した。実行確認用のスクリプトも仕込んでいる

run_dbt_commandは任意のコマンドを実行するので、タスク名には引数の値を設定している task_run_name="run_dbt_{command[0]} の部分

from prefect import task
from prefect_dbt.core.runner import PrefectDbtRunner
from prefect_dbt.core.settings import PrefectDbtSettings
from prefect_gcp import GcpCredentials
from dotenv import load_dotenv
import json
import tempfile
import os
from contextlib import contextmanager
 
load_dotenv()
 
@contextmanager
def temporary_gcp_credentials(block_name: str):
    """
    GCP Credentialsブロックからサービスアカウントキーを取得し、一時ファイルとして保存するコンテキストマネージャ
    """
    tmp_path = None
    try:
        gcp_creds = GcpCredentials.load(block_name)
        creds_dict = gcp_creds.service_account_info.get_secret_value()
 
        with tempfile.NamedTemporaryFile(mode="w+", delete=False, suffix=".json") as tmp:
            json.dump(creds_dict, tmp)
            tmp_path = tmp.name
 
        # キーファイル環境変数をセット
        os.environ["BIGQUERY_KEYFILE"] = tmp_path
        # プロジェクトID環境変数をセット
        if "project_id" in creds_dict:
            os.environ["BIGQUERY_PROJECT_ID"] = creds_dict["project_id"]
 
        # 認証方法セット
        os.environ["DBT_AUTH_METHOD"] = "service-account"
 
        yield tmp_path
 
    finally:
        # クリーンアップ
        if tmp_path and os.path.exists(tmp_path):
            os.remove(tmp_path)
 
        os.environ.pop("BIGQUERY_KEYFILE", None)
        os.environ.pop("BIGQUERY_PROJECT_ID", None)
        os.environ.pop("DBT_AUTH_METHOD", None)
 
@task(retries=1, retry_delay_seconds=5, log_prints=True, task_run_name="run_dbt_{command[0]}")
def run_dbt_command(command: list[str], settings: PrefectDbtSettings) -> None:
    """
    dbtコマンドを実行する
    Args:
        command: dbtコマンドの引数リスト(例: ["run", "--select", "pokemon"])
        settings: PrefectDbtSettings
    """
    print(f"Running dbt command: {' '.join(command)}\n")
 
    runner = PrefectDbtRunner(settings=settings, raise_on_failure=True)
 
    print(f"Executing: dbt {' '.join(command)}")
    runner.invoke(command)
    print(f"Completed: dbt {' '.join(command)}\n")
 
@task(retries=0, log_prints=True)
def transform_data_with_dbt(
    profile_name: str = "blog",
    target: str = "prod",
    project_dir: str = "dbt_project",
    profiles_dir: str = "dbt_project",
    select: str | None = None
) -> str:
    """
    dbt実行(PrefectDbtRunner方式)
    Args:
        profile_name: dbt profile名
        target: dbt target名
        project_dir: dbtプロジェクトディレクトリ
        profiles_dir: dbt profilesディレクトリ
        select: dbt selectオプション(モデル指定)
    """
    print(f"Starting dbt transformation with profile: {profile_name}, target: {target}")
 
    settings = PrefectDbtSettings(
        project_dir=project_dir,
        profiles_dir=profiles_dir,
    )
 
    # dbt deps実行
    print("Running dbt deps...")
    run_dbt_command(["deps"], settings)
 
    # dbt run実行
    command = ["run", "--profile", profile_name, "--target", target]
    if select:
        command.extend(["--select", select])
 
    print(f"Running dbt with command: {' '.join(command)}")
    run_dbt_command(command, settings)
 
    return f"dbt transformation completed successfully for profile: {profile_name}"
 
if __name__ == "__main__":
    # using blocks for key file
    with temporary_gcp_credentials("dbt-bigquery"):
        transform_data_with_dbt(
            profile_name='blog',
            profiles_dir='dbt_project',
            target='prod',
            select='blog'
        )
 

この設定であれば環境変数を設定しない限り、dbt cliや、ローカル実行ではoauth認証が適用される

また、task内でload_envを実行しているため、.envにて環境変数に認証方法の項目を設定することで認証方法の切り替えを行うことができる。このときはblocksに登録したGCPの認証キーファイルがtemporary_gcp_credentialsにて適用され、サービスアカウントでコマンドが実行される

以下のようなあたいを.envに入れると、認証方法が変わる

DBT_AUTH_METHOD=service-account

実行

prefect taskとして

認証情報だけみたいのでdbt debugの出力で確認する

↓をtransform_data_with_dbt()内の適当な行に追加

+    run_dbt_command(["debug", "--profile", profile_name, "--target", target], settings)
  • oauth
Task run 'run_dbt_debug'
...
03:08:06.102 | INFO    | Task run 'run_dbt_debug' - Configuration:
03:08:06.169 | INFO    | Task run 'run_dbt_debug' -   profiles.yml file [OK found and valid]
03:08:06.236 | INFO    | Task run 'run_dbt_debug' -   dbt_project.yml file [OK found and valid]
03:08:06.301 | INFO    | Task run 'run_dbt_debug' - Required dependencies:
03:08:06.365 | INFO    | Task run 'run_dbt_debug' -  - git [OK found]
 
03:08:06.426 | INFO    | Task run 'run_dbt_debug' - Connection:
03:08:06.490 | INFO    | Task run 'run_dbt_debug' -   method: oauth
03:08:06.541 | INFO    | Task run 'run_dbt_debug' -   database: xxxx-xxxxxx
03:08:06.592 | INFO    | Task run 'run_dbt_debug' -   execution_project: xxxx-xxxxx
03:08:06.644 | INFO    | Task run 'run_dbt_debug' -   schema: blog
03:08:06.697 | INFO    | Task run 'run_dbt_debug' -   location: US
03:08:06.749 | INFO    | Task run 'run_dbt_debug' -   priority: None
...
03:08:07.958 | INFO    | Task run 'run_dbt_debug' -   Connection test: [OK connection ok]
  • service account

.envファイルに以下を追加

+ DBT_AUTH_METHOD=service-account
Task run 'run_dbt_debug'
...
03:09:19.416 | INFO    | Task run 'run_dbt_debug' - Configuration:
03:09:19.490 | INFO    | Task run 'run_dbt_debug' -   profiles.yml file [OK found and valid]
03:09:19.563 | INFO    | Task run 'run_dbt_debug' -   dbt_project.yml file [OK found and valid]
03:09:19.636 | INFO    | Task run 'run_dbt_debug' - Required dependencies:
03:09:19.710 | INFO    | Task run 'run_dbt_debug' -  - git [OK found]
 
03:09:19.848 | INFO    | Task run 'run_dbt_debug' - Connection:
03:09:19.908 | INFO    | Task run 'run_dbt_debug' -   method: service-account
03:09:19.967 | INFO    | Task run 'run_dbt_debug' -   database: xxxx-xxxxxx
03:09:20.027 | INFO    | Task run 'run_dbt_debug' -   execution_project: xxxx-xxxxxx
03:09:20.086 | INFO    | Task run 'run_dbt_debug' -   schema: blog
03:09:20.145 | INFO    | Task run 'run_dbt_debug' -   location: US
03:09:20.204 | INFO    | Task run 'run_dbt_debug' -   priority: None
...
03:09:21.593 | INFO    | Task run 'run_dbt_debug' - All checks passed!

dbt cli実行

ターミナルからdbt cli実行

oauth認証のみ確認。理論上は環境変数に DBT_AUTH_METHOD BIGQUERY_KEYFILEを設定してやればservice-account認証で動かすことは可能

uv run dbt debug --target prod --profile blog
18:11:08  Running with dbt=1.9.1
18:11:08  dbt version: 1.9.1
...
18:11:11  adapter type: bigquery
18:11:11  adapter version: 1.9.2
18:11:11  Configuration:
18:11:11    profiles.yml file [OK found and valid]
18:11:11    dbt_project.yml file [OK found and valid]
18:11:11  Required dependencies:
18:11:11   - git [OK found]
 
18:11:11  Connection:
18:11:11    method: oauth
18:11:11    database: xxxx-xxxxxx
18:11:11    execution_project: xxxx-xxxxxx
18:11:11    schema: blog
18:11:11    location: US
18:11:11    priority: None

別解 flow実行用profilesの作成

flowの実行用にprofile_dirを作ってそちらを指定するのがいいよなってなったのでそうする

profilesの記載には公式ドキュメントにある{{ prefect.blocks… }}の構文を使ってみる。prefect上の実行前提のためblock読み取りなど、認証周りのsetupをしなくてよくなる

prefect.blocks構文のgcp-credentialsはblockの種類、dbt-bigqueryはblockの名前を指している

service_account_infoが_区切りなのを発見する部分でだいぶ詰まった

# dbt_project/profiles/flow/profiles.yml
blog:
  target: prod
  outputs:
 
    prod:
      type: bigquery
      method: service-account-json
      project: "{{ prefect.blocks.gcp-credentials.dbt-bigquery.project }}"
      schema: blog
      keyfile_json:
        project_id: "{{ prefect.blocks.gcp-credentials.dbt-bigquery.service_account_info.project_id }}"
        private_key_id: "{{ prefect.blocks.gcp-credentials.dbt-bigquery.service_account_info.private_key_id }}"
        private_key: "{{ prefect.blocks.gcp-credentials.dbt-bigquery.service_account_info.private_key }}"
        client_email: "{{ prefect.blocks.gcp-credentials.dbt-bigquery.service_account_info.client_email }}"
        client_id: "{{ prefect.blocks.gcp-credentials.dbt-bigquery.service_account_info.client_id }}"
        auth_uri: "{{ prefect.blocks.gcp-credentials.dbt-bigquery.service_account_info.auth_uri }}"
        token_uri: "{{ prefect.blocks.gcp-credentials.dbt-bigquery.service_account_info.token_uri }}"
        auth_provider_x509_cert_url: "{{ prefect.blocks.gcp-credentials.dbt-bigquery.service_account_info.auth_provider_x509_cert_url }}"
        client_x509_cert_url: "{{ prefect.blocks.gcp-credentials.dbt-bigquery.service_account_info.client_x509_cert_url }}"
      timeout_seconds: 300
      location: US

profiles dirを指定して実行してみる

 
@@ -97,10 +101,11 @@ def transform_data_with_dbt(
 
 
 if __name__ == "__main__":
     # using blocks for key file
-     with temporary_gcp_credentials("dbt-bigquery"):
         transform_data_with_dbt(
             profile_name='blog',
+            profiles_dir='dbt_project/profiles/flow',
             target='prod',
             select='blog'
         )

認証は通ったのでよさそう。debugの実行ログにはclient_idなどの値は表示されなかった

uv run tasks/dbt_tasks.py
...
04:11:19.598 | INFO    | Task run 'transform_data_with_dbt' - Starting dbt transformation with profile: blog, target: prod
04:11:19.603 | INFO    | Task run 'run_dbt_debug' - Running dbt command: debug --profile blog --target prod
...
04:11:23.644 | INFO    | Task run 'run_dbt_debug' - Required dependencies:
04:11:23.711 | INFO    | Task run 'run_dbt_debug' -  - git [OK found]
 
04:11:23.777 | INFO    | Task run 'run_dbt_debug' - Connection:
04:11:23.845 | INFO    | Task run 'run_dbt_debug' -   method: service-account-json
04:11:23.993 | INFO    | Task run 'run_dbt_debug' -   database: xxxx-xxxxxx
04:11:24.067 | INFO    | Task run 'run_dbt_debug' -   execution_project: xxxx-xxxxxx
04:11:24.130 | INFO    | Task run 'run_dbt_debug' -   schema: blog
04:11:24.192 | INFO    | Task run 'run_dbt_debug' -   location: US
04:11:24.293 | INFO    | Task run 'run_dbt_debug' -   priority: None
...
04:11:24.872 | INFO    | Task run 'run_dbt_debug' -   client_id: None
04:11:24.940 | INFO    | Task run 'run_dbt_debug' -   token_uri: None
...
04:11:25.359 | INFO    | Task run 'run_dbt_debug' -   Connection test: [OK connection ok]

おまけ PrefectDbtRunnerのイケてるところ

従来のDbtCoreOperationと比較したときのいいところ

ほとんど公式ドキュメントで有用性は説明されているが、まとめておこう

https://docs.prefect.io/integrations/prefect-dbt#dbt-core

  1. 柔軟なコマンド実行 PrefectDbtRunnerはrunnerで実体化するので、runnerの経由でdbtの様々なコマンドを(オプションも変えて)実行可能
  2. エラーハンドリングしやすい PrefectDbtRunnerの引数 raise_on_failure=True/False で失敗時の挙動を指定できる。この記事では試さなかったが、dbt testが落ちても後続の処理を進めたい場合などに重宝する dbt testはすべてのテストが成功したらステータスコード0を、テストが失敗したらステータスコード1を返す。DbtCoreOperationではテストに失敗したらflowもエラー発生とされるため、後続の処理や、flowのstatusを調整する必要があった
  3. モデルごとの実行をprefect taskとして認識する DbtCoreOperationではdbtコマンド自体をtaskで分けるのみなので、特定のモデルのエラー時に実行全体のログを読み解く必要がある。PrefectDbtRunnerではmodelごと(クエリごと?)にprefect taskとして分けるため、デバッグが容易になる

dbt run実行時のタスクごとのログ。dbt run自体のタスクでは全体のサマリとしてログがでてきて、便利

  • run_dbt_runのログ
12:03:26 AM Info Running dbt command: run --profile blog --target prod
12:03:26 AM Info Executing: dbt run --profile blog --target prod
12:03:26 AM Info Running with dbt=1.9.1
12:03:26 AM Info Registered adapter: bigquery=1.10.3
 
12:03:27 AM Info Unable to do partial parsing because saved manifest not found. Starting full parse.
12:03:32 AM Info Found 39 models, 1 analysis, 2 operations, 16 sources, 0 macros
12:03:32 AM Info Concurrency: 1 threads (target='prod')
12:06:44 AM Info Finished running 1 incremental model, 2 project hooks, 6 table models in 0 hours 3 minutes and 11.37 seconds (191.37s).
12:06:44 AM Info Completed successfully 12:06:44 AM
            Info Done. PASS=9 WARN=0 ERROR=0 SKIP=0 NO-OP=0 TOTAL=9
12:06:44 AM Info Completed: dbt run --profile blog --target prod
12:06:44 AM Info Finished in state Completed()
  • model blog_content_embeddingsのログ

各モデルのログ。こちらは自動生成されるタスク

12:06:39 AM Info 7 of 7 START sql incremental model blog_marts.blog_content_embeddings ..... [RUN]
12:06:43 AM Info 7 of 7 OK created sql incremental model blog_marts.blog_content_embeddings  [MERGE (0.0 rows, 1.7 MiB processed) in 3.88s]
12:06:43 AM Info Finished in state Completed()

prefect uiに表示されるタスクのグラフはこんな感じ。依存関係を矢印でつなぐなどしている

run-dbt-task-graph

PrefectDbtRunnerいいところばっかり書いてしまったので、DbtCoreOperationのいいところも。対応しているdbt関連のblocksが豊富なのでconfig(認証含む)の統合などは利便性が高い

参考