ブログ記事のテキストデータとして色々分析や可視化をしたくなってきた。記事データはgithub上にmarkdownファイル形式でおいてあるので、それらを抽出し、bigqueryのテーブルとしてロードする。全体の流れとしてはprefectからdltにてデータロードを。データ加工はdbtな感じの流れにしている。この記事ではdlt周りについて書いておく。記事の最後にリポジトリのリンクをはってある
元々gatsbyjsで構築されているのもあり、graphqlを通して取得や加工は少しやっていたが、GAなどとまとめて集計してしまいたい欲や、可視化を別なツールでやりたい欲が出てきたため、bigqueryに集約することにした
環境
python 3.11
dlt 1.5
dlt pipelineの定義
dlt(data load tool)はpython製のデータ転送向けのツール。取得から格納までの処理(接続情報や抽出定義、格納定義の設定)を簡単にかける。設定項目や考慮することが多いため楽にはなるが、色々な副産物やパラメータについて覚えることが多くて大変
github resource周り
取得から、格納するカラムの定義まで。1テーブル1resource、なイメージ。複数のときはsourceという名称が使われている
dltは主要な取得元であればモジュールを用意しており、rest apiなんかもある。今回行いたい処理は、単純な返り値を格納するなどでは済まない部分もあったので、取得ロジックはgithub apiを使って独自で書いている。取得部分の実装、GitHubMarkdownFetcherはclaudeに書いてもらった。特定のパス以下のファイルからmarkdownに絞って、それらのファイルの中身と最終更新日(コミット日時)を取得している
./dlt_project/blog_content.py
from prefect_github import GitHubCredentials
import base64
import dataclasses
from dataclasses import dataclass
import requests
import dlt
from typing import Generator, List, Optional
from datetime import datetime
owner = "uni-3"
repo = "gatsby-blog"
@dataclass
class MarkdownFile:
path: str
content: str
last_modified: datetime
class GitHubMarkdownFetcher:
def __init__(self, owner: str, repo: str, path: Optional[str] = "", token: Optional[str] = None):
"""
GitHubのMarkdownファイルを取得するためのクラス
Args:
owner: GitHubのユーザー名またはオーガニゼーション名
repo: リポジトリ名
path: 精査するパス
token: GitHubのパーソナルアクセストークン(オプション)
"""
self.owner = owner
self.repo = repo
self.path = path
self.base_url = "https://api.github.com"
self.headers = {
"Accept": "application/vnd.github.v3+json"
}
if token:
self.headers["Authorization"] = f"Bearer {token}"
def get_all_files(self) -> Generator[dict, None, None]:
"""
指定されたパス以下の全Markdownファイルを一度に取得
Args:
path: 取得を開始するパス(デフォルトはリポジトリのルート)
Returns:
MarkdownFileオブジェクトのリスト
"""
try:
url = f"{self.base_url}/repos/{self.owner}/{self.repo}/git/trees/main"
# recursive=1 を追加してすべてのファイルを一度に取得
params = {"recursive": "1"}
response = requests.get(url, headers=self.headers, params=params)
response.raise_for_status()
tree = response.json()["tree"]
files = []
# pathが指定されている場合は、そのパス以下のファイルのみをフィルタ
for item in tree:
if (item["type"] == "blob" and
item["path"].endswith(".md") and
item["path"].startswith(self.path)):
content = self._get_file_content(item["path"])
last_modified = self._get_file_last_modified(item["path"])
file = MarkdownFile(
path=item["path"],
content=content,
last_modified=last_modified
)
yield dataclasses.asdict(file)
except requests.exceptions.RequestException as e:
print(f"Error fetching files: {e}")
raise
def _get_file_content(self, file_path: str) -> str:
"""
ファイルの内容を取得
Args:
file_path: ファイルのパス
Returns:
ファイルの内容(文字列)
"""
url = f"{self.base_url}/repos/{self.owner}/{self.repo}/contents/{file_path}"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
content = response.json()
if content.get("encoding") == "base64":
return base64.b64decode(content["content"]).decode('utf-8')
else:
response = requests.get(
content["download_url"], headers=self.headers)
response.raise_for_status()
return response.text
def _get_file_last_modified(self, file_path: str) -> datetime:
"""
ファイルの最終更新日を取得
Args:
file_path: ファイルのパス
Returns:
最終更新日のdatetimeオブジェクト
"""
url = f"{self.base_url}/repos/{self.owner}/{self.repo}/commits"
params = {
"path": file_path,
"per_page": 1 # 最新のコミットのみ取得
}
response = requests.get(url, headers=self.headers, params=params)
response.raise_for_status()
commits = response.json()
if commits:
# コミットの日時を取得してdatetimeオブジェクトに変換
last_modified_str = commits[0]["commit"]["committer"]["date"]
return datetime.strptime(last_modified_str, "%Y-%m-%dT%H:%M:%SZ")
else:
return datetime.min # コミットが見つからない場合のフォールバック
@dlt.resource(
name="blog_content",
write_disposition="replace"
)
def get_resources(fetcher: GitHubMarkdownFetcher):
# print(f"fmarkdown sile, {next(fetcher.get_all_files())}")
yield from fetcher.get_all_files()
bigquery転送周り
resourceをどこに、どのような手法で転送するか
取得定義をinstanceとして生成して、リソースの取得をしている。最初はクラスメソッドで書こうとしたが、@dlt.resourceデコレータが使えなかったのでリソース取得用関数を適当に書いている
デフォルトで転送するだけならbigquery_adapterを使わなくとも良いが、descriptonやpartitionなど、(おそらく各種宛先の)独自の設定を入れたい場合はadapterでの指定が必要になってくる
./tasks/blog_content.py
def load() -> dlt.common.pipeline.LoadInfo:
fetcher = blog_content.GitHubMarkdownFetcher(
owner=owner,
repo=repo,
path="content",
token=dlt.secrets["sources.rest_api_pipeline.github_source"]
)
pipeline = dlt.pipeline(
pipeline_name='blog_content',
destination='bigquery',
dataset_name='blog_info',
)
load_info = pipeline.run(
bigquery_adapter(
blog_content.get_resources(fetcher),
table_description='blog content',
)
)
row_counts = pipeline.last_trace.last_normalize_info.row_counts
print(f"row count: {row_counts}")
return load_info
実行ログ。prefect flowとして動かしている。取得行数なんかも表示できる
python flows/blog_content.py
16:58:14.266 | INFO | prefect.engine - Created flow run 'gabby-mongrel' for flow 'blog_content'
16:58:15.728 | INFO | Flow run 'gabby-mongrel' - start load
/workspaces/prefect/.venv/lib/python3.11/site-packages/google/cloud/bigquery/client.py:589: UserWarning: Cannot create BigQuery Storage client, the dependency google-cloud-bigquery-storage is not installed.
warnings.warn(
16:58:46.080 | INFO | Task run 'load_blog_content-ad3' - row count: {'blog_content': 114}
16:59:58.553 | INFO | Task run 'load_blog_content-ad3' - load info Pipeline blog_content load step completed in 18.05 seconds
1 load package(s) were loaded to destination bigquery and into dataset blog_info
The bigquery destination used xxxxx.iam.gserviceaccount.com@xxxx location to store data
Load package 1735837097.1531174 is LOADED and contains no failed jobs
16:59:58.556 | INFO | Task run 'load_blog_content-ad3' - Finished in state Completed()
16:59:58.557 | INFO | Flow run 'gabby-mongrel' - loaded Pipeline blog_content load step completed in 18.05 seconds
...
17:00:16.263 | INFO | Flow run 'gabby-mongrel' - Finished in state Completed()
転送先のデータセットにはdltのpipeline jobの履歴管理用のテーブルがいくつか、ロード先のテーブルにはロード時の_dlt_id、_dlt_load_idなどのカラムが追加される。要するにいつのロードジョブにて格納されたデータなのかが辿れるようになっている。一回壊れたら修正や確認が大変そうである
気にしたところ
stagingのデータセット名
デフォルトで
{table_name}_stating
# .dlt/config.toml
[destination.bigquery]
location = "US"
staging_dataset_name_layout="%s_dlt_staging"
dlt secretの格納
値は基本的にprefect cloudに保持しておいて、実行時にロードして、dltのsecretとして設定するようにしている
次点では.envを使うとよさそう。いい感じにsecrets.tomlを生成できるようなモジュールもなさそうだった
dlt.secrets["sources.rest_api_pipeline.github_source"] = GitHubCredentials.load(
"github-credentials-block").token.get_secret_value()
とにかく格納先や取得元についてconfigで設定できてしまうのは便利。scd2などの自前実装が大変そうなものも対応しているので、細かい要件にも対応できそうな予感。反面、pipelineの状態管理などは暗黙的になってしまう節があるので、実運用は大変そう。実際ローカルで開発している際、キャッシュ的なフォルダを削除
rm -r ~/.dlt
参考
- ロード戦略。mergeなどもできる: https://dlthub.com/docs/general-usage/incremental-loading#merge-incremental_loading
- source: https://dlthub.com/docs/general-usage/source
- dlt secret: https://dlthub.com/docs/general-usage/credentials/setup#naming-convention
あまりまとまってないが、リポジトリはこちら