bigqueryを使っていると、権限管理の都合上、ローカルでの実行ではユーザー認証、サーバー上などではサービスカウント認証、などといった認証方式の切り替えを行うケースが出てくる
PrefectDbtRunnerを用いたとき、実現するのにやりづらい部分があったので思いついた方法をまとめた。特にdbt profiles.ymlは認証方法に加え、スキーマや接続先を切り替える設定項目(target)もあるので、そのあたりのケアに注意する必要がでてくる
有用そうだと思った方法は実装して動作確認をしてみた。ついでにPrefectDbtRunnerの利点も記載
だいぶマニアックな記事になった
環境
- prefect: 3.6
- prefect-dbt: 0.7.9
- dbt-core: 1.9
方針ぎめ
前提・要件
-
ルートリポジトリのdbt_project以下にdbtプロジェクトを配置している
-
targetはdev/prod
違いとしては宛先のスキーマ(データセット)が異なる
- target名はprodにしたい
カスタムスキーマの指定にgenerate_schema_name_for_envを使いたい。この関数内では、target名はprod前提で動くので、本番環境向けにはtarget prodとする
- cli経由でもtarget prodで実行したい
実際の運用では実行できない(もしくはデフォルトではエラーになる)ようにするほうがベターだが、analysisのクエリ実行や、compileをしたいときもあるので許可したい
- ローカルからflowを実行した際にはoauthの認証も許可したい
実行環境とアカウントを明確に分けておきたいので、ローカル(スクリプト経由で)実行した際はデフォルトの認証を使いたい。必須ではないが、
deployment(work pool)上ではサービスアカウント実行にする
方針
思いついたのが以下
envを使って認証方式を切り替えつつ、flowではblocks経由でサービスアカウントファイルを生成して、一時ファイルとして設置する方法
- cli経由ではoauth。オプションとして、.envに値を設定することでサービスアカウントでも実行可能
- flowをローカル実行する場合、oauth経由で実行できる。deployment(work pool)上での実行時はサービスアカウント(blocks読み取り)で実行する
まとめるとこう。このパターンを実現したい
AIにまとめさせた、ので項目に重複もあるがこんな感じ
| シナリオ | DBT_TARGET | DBT_AUTH_METHOD | 選ばれる outputs | 認証 | BIGQUERY_KEYFILE | 実行方法 |
|---|---|---|---|---|---|---|
| ローカルdev | dev | oauth | dev | OAuth | 不要 | python flows/dbt.py or DBT CLI |
| ローカルでprod確認 | prod | oauth | prod | OAuth | 不要 | python flows/dbt.py or DBT CLI |
| ローカルでprod SA | prod | service-account | prod | SA | 必要 ローカルに設置 or blocksから生成 | python flows/dbt.py or DBT CLI |
| Prefect dev | dev | oauth | dev | OAuth | 不要 | Prefect |
| Prefect prod | prod | service-account | prod | SA | 必要 blocksから生成 | Prefect |
他に考慮したこと
他に考えたことと却下したこと
- ローカルではtarget devのみ動かす運用にする target prodに対してanalysis実行したいときもあるのでやめておく
- profiles.ymlをflow用とlocal実行用で分ける 同じターゲットの内容で認証方法だけ変えたファイルを用意するのはちょっとな、という個人的な好みで不採用 こちらのprofiles.ymlを分ける方法は別解としてこの記事に記載しておく
- ローカル/deployment上での実行でtarget名を分ける(prod/prod-flowなどとする)
こちらはカスタムスキーマの指定に
generate_schema_name_for_envを使う場合はマクロのロジックに手を入れることになるので却下した
正直どの方法も認証用にカスタム設定するので見づらい(仕組みを把握しづらい)ので大差はないな、という印象
実装
profiles.yml
中身は以下prod targetのみ記載(devもほぼ同じなため)。envへの設定の有無で実行環境が変わる
methodがoauthのとき、keyfileは無視されるので値が設定されてなくとも、問題なく動く
blog:
target: dev
outputs:
prod:
type: bigquery
method: "{{ env_var('DBT_AUTH_METHOD', 'oauth') }}"
project: "{{ env_var('BIGQUERY_PROJECT_ID', 'dummy-project') }}"
keyfile: "{{ env_var('BIGQUERY_KEYFILE', '') }}"
schema: blog
timeout_seconds: 300
location: USprefect flow
flowといってるが実態はtaskとして実装した。実行確認用のスクリプトも仕込んでいる
run_dbt_commandは任意のコマンドを実行するので、タスク名には引数の値を設定している task_run_name="run_dbt_{command[0]} の部分
from prefect import task
from prefect_dbt.core.runner import PrefectDbtRunner
from prefect_dbt.core.settings import PrefectDbtSettings
from prefect_gcp import GcpCredentials
from dotenv import load_dotenv
import json
import tempfile
import os
from contextlib import contextmanager
load_dotenv()
@contextmanager
def temporary_gcp_credentials(block_name: str):
"""
GCP Credentialsブロックからサービスアカウントキーを取得し、一時ファイルとして保存するコンテキストマネージャ
"""
tmp_path = None
try:
gcp_creds = GcpCredentials.load(block_name)
creds_dict = gcp_creds.service_account_info.get_secret_value()
with tempfile.NamedTemporaryFile(mode="w+", delete=False, suffix=".json") as tmp:
json.dump(creds_dict, tmp)
tmp_path = tmp.name
# キーファイル環境変数をセット
os.environ["BIGQUERY_KEYFILE"] = tmp_path
# プロジェクトID環境変数をセット
if "project_id" in creds_dict:
os.environ["BIGQUERY_PROJECT_ID"] = creds_dict["project_id"]
# 認証方法セット
os.environ["DBT_AUTH_METHOD"] = "service-account"
yield tmp_path
finally:
# クリーンアップ
if tmp_path and os.path.exists(tmp_path):
os.remove(tmp_path)
os.environ.pop("BIGQUERY_KEYFILE", None)
os.environ.pop("BIGQUERY_PROJECT_ID", None)
os.environ.pop("DBT_AUTH_METHOD", None)
@task(retries=1, retry_delay_seconds=5, log_prints=True, task_run_name="run_dbt_{command[0]}")
def run_dbt_command(command: list[str], settings: PrefectDbtSettings) -> None:
"""
dbtコマンドを実行する
Args:
command: dbtコマンドの引数リスト(例: ["run", "--select", "pokemon"])
settings: PrefectDbtSettings
"""
print(f"Running dbt command: {' '.join(command)}\n")
runner = PrefectDbtRunner(settings=settings, raise_on_failure=True)
print(f"Executing: dbt {' '.join(command)}")
runner.invoke(command)
print(f"Completed: dbt {' '.join(command)}\n")
@task(retries=0, log_prints=True)
def transform_data_with_dbt(
profile_name: str = "blog",
target: str = "prod",
project_dir: str = "dbt_project",
profiles_dir: str = "dbt_project",
select: str | None = None
) -> str:
"""
dbt実行(PrefectDbtRunner方式)
Args:
profile_name: dbt profile名
target: dbt target名
project_dir: dbtプロジェクトディレクトリ
profiles_dir: dbt profilesディレクトリ
select: dbt selectオプション(モデル指定)
"""
print(f"Starting dbt transformation with profile: {profile_name}, target: {target}")
settings = PrefectDbtSettings(
project_dir=project_dir,
profiles_dir=profiles_dir,
)
# dbt deps実行
print("Running dbt deps...")
run_dbt_command(["deps"], settings)
# dbt run実行
command = ["run", "--profile", profile_name, "--target", target]
if select:
command.extend(["--select", select])
print(f"Running dbt with command: {' '.join(command)}")
run_dbt_command(command, settings)
return f"dbt transformation completed successfully for profile: {profile_name}"
if __name__ == "__main__":
# using blocks for key file
with temporary_gcp_credentials("dbt-bigquery"):
transform_data_with_dbt(
profile_name='blog',
profiles_dir='dbt_project',
target='prod',
select='blog'
)
この設定であれば環境変数を設定しない限り、dbt cliや、ローカル実行ではoauth認証が適用される
また、task内でload_envを実行しているため、.envにて環境変数に認証方法の項目を設定することで認証方法の切り替えを行うことができる。このときはblocksに登録したGCPの認証キーファイルがtemporary_gcp_credentialsにて適用され、サービスアカウントでコマンドが実行される
以下のようなあたいを.envに入れると、認証方法が変わる
DBT_AUTH_METHOD=service-account実行
prefect taskとして
認証情報だけみたいのでdbt debugの出力で確認する
↓をtransform_data_with_dbt()内の適当な行に追加
+ run_dbt_command(["debug", "--profile", profile_name, "--target", target], settings)- oauth
Task run 'run_dbt_debug'
...
03:08:06.102 | INFO | Task run 'run_dbt_debug' - Configuration:
03:08:06.169 | INFO | Task run 'run_dbt_debug' - profiles.yml file [OK found and valid]
03:08:06.236 | INFO | Task run 'run_dbt_debug' - dbt_project.yml file [OK found and valid]
03:08:06.301 | INFO | Task run 'run_dbt_debug' - Required dependencies:
03:08:06.365 | INFO | Task run 'run_dbt_debug' - - git [OK found]
03:08:06.426 | INFO | Task run 'run_dbt_debug' - Connection:
03:08:06.490 | INFO | Task run 'run_dbt_debug' - method: oauth
03:08:06.541 | INFO | Task run 'run_dbt_debug' - database: xxxx-xxxxxx
03:08:06.592 | INFO | Task run 'run_dbt_debug' - execution_project: xxxx-xxxxx
03:08:06.644 | INFO | Task run 'run_dbt_debug' - schema: blog
03:08:06.697 | INFO | Task run 'run_dbt_debug' - location: US
03:08:06.749 | INFO | Task run 'run_dbt_debug' - priority: None
...
03:08:07.958 | INFO | Task run 'run_dbt_debug' - Connection test: [OK connection ok]- service account
.envファイルに以下を追加
+ DBT_AUTH_METHOD=service-accountTask run 'run_dbt_debug'
...
03:09:19.416 | INFO | Task run 'run_dbt_debug' - Configuration:
03:09:19.490 | INFO | Task run 'run_dbt_debug' - profiles.yml file [OK found and valid]
03:09:19.563 | INFO | Task run 'run_dbt_debug' - dbt_project.yml file [OK found and valid]
03:09:19.636 | INFO | Task run 'run_dbt_debug' - Required dependencies:
03:09:19.710 | INFO | Task run 'run_dbt_debug' - - git [OK found]
03:09:19.848 | INFO | Task run 'run_dbt_debug' - Connection:
03:09:19.908 | INFO | Task run 'run_dbt_debug' - method: service-account
03:09:19.967 | INFO | Task run 'run_dbt_debug' - database: xxxx-xxxxxx
03:09:20.027 | INFO | Task run 'run_dbt_debug' - execution_project: xxxx-xxxxxx
03:09:20.086 | INFO | Task run 'run_dbt_debug' - schema: blog
03:09:20.145 | INFO | Task run 'run_dbt_debug' - location: US
03:09:20.204 | INFO | Task run 'run_dbt_debug' - priority: None
...
03:09:21.593 | INFO | Task run 'run_dbt_debug' - All checks passed!dbt cli実行
ターミナルからdbt cli実行
oauth認証のみ確認。理論上は環境変数に DBT_AUTH_METHOD BIGQUERY_KEYFILEを設定してやればservice-account認証で動かすことは可能
uv run dbt debug --target prod --profile blog
18:11:08 Running with dbt=1.9.1
18:11:08 dbt version: 1.9.1
...
18:11:11 adapter type: bigquery
18:11:11 adapter version: 1.9.2
18:11:11 Configuration:
18:11:11 profiles.yml file [OK found and valid]
18:11:11 dbt_project.yml file [OK found and valid]
18:11:11 Required dependencies:
18:11:11 - git [OK found]
18:11:11 Connection:
18:11:11 method: oauth
18:11:11 database: xxxx-xxxxxx
18:11:11 execution_project: xxxx-xxxxxx
18:11:11 schema: blog
18:11:11 location: US
18:11:11 priority: None別解 flow実行用profilesの作成
flowの実行用にprofile_dirを作ってそちらを指定するのがいいよなってなったのでそうする
profilesの記載には公式ドキュメントにある{{ prefect.blocks… }}の構文を使ってみる。prefect上の実行前提のためblock読み取りなど、認証周りのsetupをしなくてよくなる
prefect.blocks構文のgcp-credentialsはblockの種類、dbt-bigqueryはblockの名前を指している
service_account_infoが_区切りなのを発見する部分でだいぶ詰まった
# dbt_project/profiles/flow/profiles.yml
blog:
target: prod
outputs:
prod:
type: bigquery
method: service-account-json
project: "{{ prefect.blocks.gcp-credentials.dbt-bigquery.project }}"
schema: blog
keyfile_json:
project_id: "{{ prefect.blocks.gcp-credentials.dbt-bigquery.service_account_info.project_id }}"
private_key_id: "{{ prefect.blocks.gcp-credentials.dbt-bigquery.service_account_info.private_key_id }}"
private_key: "{{ prefect.blocks.gcp-credentials.dbt-bigquery.service_account_info.private_key }}"
client_email: "{{ prefect.blocks.gcp-credentials.dbt-bigquery.service_account_info.client_email }}"
client_id: "{{ prefect.blocks.gcp-credentials.dbt-bigquery.service_account_info.client_id }}"
auth_uri: "{{ prefect.blocks.gcp-credentials.dbt-bigquery.service_account_info.auth_uri }}"
token_uri: "{{ prefect.blocks.gcp-credentials.dbt-bigquery.service_account_info.token_uri }}"
auth_provider_x509_cert_url: "{{ prefect.blocks.gcp-credentials.dbt-bigquery.service_account_info.auth_provider_x509_cert_url }}"
client_x509_cert_url: "{{ prefect.blocks.gcp-credentials.dbt-bigquery.service_account_info.client_x509_cert_url }}"
timeout_seconds: 300
location: USprofiles dirを指定して実行してみる
@@ -97,10 +101,11 @@ def transform_data_with_dbt(
if __name__ == "__main__":
# using blocks for key file
- with temporary_gcp_credentials("dbt-bigquery"):
transform_data_with_dbt(
profile_name='blog',
+ profiles_dir='dbt_project/profiles/flow',
target='prod',
select='blog'
)認証は通ったのでよさそう。debugの実行ログにはclient_idなどの値は表示されなかった
uv run tasks/dbt_tasks.py
...
04:11:19.598 | INFO | Task run 'transform_data_with_dbt' - Starting dbt transformation with profile: blog, target: prod
04:11:19.603 | INFO | Task run 'run_dbt_debug' - Running dbt command: debug --profile blog --target prod
...
04:11:23.644 | INFO | Task run 'run_dbt_debug' - Required dependencies:
04:11:23.711 | INFO | Task run 'run_dbt_debug' - - git [OK found]
04:11:23.777 | INFO | Task run 'run_dbt_debug' - Connection:
04:11:23.845 | INFO | Task run 'run_dbt_debug' - method: service-account-json
04:11:23.993 | INFO | Task run 'run_dbt_debug' - database: xxxx-xxxxxx
04:11:24.067 | INFO | Task run 'run_dbt_debug' - execution_project: xxxx-xxxxxx
04:11:24.130 | INFO | Task run 'run_dbt_debug' - schema: blog
04:11:24.192 | INFO | Task run 'run_dbt_debug' - location: US
04:11:24.293 | INFO | Task run 'run_dbt_debug' - priority: None
...
04:11:24.872 | INFO | Task run 'run_dbt_debug' - client_id: None
04:11:24.940 | INFO | Task run 'run_dbt_debug' - token_uri: None
...
04:11:25.359 | INFO | Task run 'run_dbt_debug' - Connection test: [OK connection ok]おまけ PrefectDbtRunnerのイケてるところ
従来のDbtCoreOperationと比較したときのいいところ
ほとんど公式ドキュメントで有用性は説明されているが、まとめておこう
https://docs.prefect.io/integrations/prefect-dbt#dbt-core
- 柔軟なコマンド実行 PrefectDbtRunnerはrunnerで実体化するので、runnerの経由でdbtの様々なコマンドを(オプションも変えて)実行可能
- エラーハンドリングしやすい PrefectDbtRunnerの引数 raise_on_failure=True/False で失敗時の挙動を指定できる。この記事では試さなかったが、dbt testが落ちても後続の処理を進めたい場合などに重宝する dbt testはすべてのテストが成功したらステータスコード0を、テストが失敗したらステータスコード1を返す。DbtCoreOperationではテストに失敗したらflowもエラー発生とされるため、後続の処理や、flowのstatusを調整する必要があった
- モデルごとの実行をprefect taskとして認識する DbtCoreOperationではdbtコマンド自体をtaskで分けるのみなので、特定のモデルのエラー時に実行全体のログを読み解く必要がある。PrefectDbtRunnerではmodelごと(クエリごと?)にprefect taskとして分けるため、デバッグが容易になる
dbt run実行時のタスクごとのログ。dbt run自体のタスクでは全体のサマリとしてログがでてきて、便利
- run_dbt_runのログ
12:03:26 AM Info Running dbt command: run --profile blog --target prod
12:03:26 AM Info Executing: dbt run --profile blog --target prod
12:03:26 AM Info Running with dbt=1.9.1
12:03:26 AM Info Registered adapter: bigquery=1.10.3
12:03:27 AM Info Unable to do partial parsing because saved manifest not found. Starting full parse.
12:03:32 AM Info Found 39 models, 1 analysis, 2 operations, 16 sources, 0 macros
12:03:32 AM Info Concurrency: 1 threads (target='prod')
12:06:44 AM Info Finished running 1 incremental model, 2 project hooks, 6 table models in 0 hours 3 minutes and 11.37 seconds (191.37s).
12:06:44 AM Info Completed successfully 12:06:44 AM
Info Done. PASS=9 WARN=0 ERROR=0 SKIP=0 NO-OP=0 TOTAL=9
12:06:44 AM Info Completed: dbt run --profile blog --target prod
12:06:44 AM Info Finished in state Completed()- model blog_content_embeddingsのログ
各モデルのログ。こちらは自動生成されるタスク
12:06:39 AM Info 7 of 7 START sql incremental model blog_marts.blog_content_embeddings ..... [RUN]
12:06:43 AM Info 7 of 7 OK created sql incremental model blog_marts.blog_content_embeddings [MERGE (0.0 rows, 1.7 MiB processed) in 3.88s]
12:06:43 AM Info Finished in state Completed()prefect uiに表示されるタスクのグラフはこんな感じ。依存関係を矢印でつなぐなどしている

PrefectDbtRunnerいいところばっかり書いてしまったので、DbtCoreOperationのいいところも。対応しているdbt関連のblocksが豊富なのでconfig(認証含む)の統合などは利便性が高い