uni memo

dlt(data load tool)でgithub上のファイルをbigqueryに転送する

dlt(data load tool)でgithub上のファイルをbigqueryに転送する

ブログ記事のテキストデータとして色々分析や可視化をしたくなってきた。記事データはgithub上にmarkdownファイル形式でおいてあるので、それらを抽出し、bigqueryのテーブルとしてロードする。全体の流れとしてはprefectからdltにてデータロードを。データ加工はdbtな感じの流れにしている。この記事ではdlt周りについて書いておく。記事の最後にリポジトリのリンクをはってある

元々gatsbyjsで構築されているのもあり、graphqlを通して取得や加工は少しやっていたが、GAなどとまとめて集計してしまいたい欲や、可視化を別なツールでやりたい欲が出てきたため、bigqueryに集約することにした

環境

python 3.11

dlt 1.5

dlt pipelineの定義

dlt(data load tool)はpython製のデータ転送向けのツール。取得から格納までの処理(接続情報や抽出定義、格納定義の設定)を簡単にかける。設定項目や考慮することが多いため楽にはなるが、色々な副産物やパラメータについて覚えることが多くて大変

github resource周り

取得から、格納するカラムの定義まで。1テーブル1resource、なイメージ。複数のときはsourceという名称が使われている

dltは主要な取得元であればモジュールを用意しており、rest apiなんかもある。今回行いたい処理は、単純な返り値を格納するなどでは済まない部分もあったので、取得ロジックはgithub apiを使って独自で書いている。取得部分の実装、GitHubMarkdownFetcherはclaudeに書いてもらった。特定のパス以下のファイルからmarkdownに絞って、それらのファイルの中身と最終更新日(コミット日時)を取得している

./dlt_project/blog_content.py

from prefect_github import GitHubCredentials
import base64
import dataclasses
from dataclasses import dataclass
import requests
import dlt
from typing import Generator, List,  Optional
from datetime import datetime

owner = "uni-3"
repo = "gatsby-blog"

@dataclass
class MarkdownFile:
    path: str
    content: str
    last_modified: datetime

class GitHubMarkdownFetcher:
    def __init__(self, owner: str, repo: str, path: Optional[str] = "", token: Optional[str] = None):
        """
        GitHubのMarkdownファイルを取得するためのクラス

        Args:
            owner: GitHubのユーザー名またはオーガニゼーション名
            repo: リポジトリ名
            path: 精査するパス
            token: GitHubのパーソナルアクセストークン(オプション)
        """
        self.owner = owner
        self.repo = repo
        self.path = path
        self.base_url = "https://api.github.com"
        self.headers = {
            "Accept": "application/vnd.github.v3+json"
        }
        if token:
            self.headers["Authorization"] = f"Bearer {token}"

    def get_all_files(self) -> Generator[dict, None, None]:
        """
        指定されたパス以下の全Markdownファイルを一度に取得

        Args:
            path: 取得を開始するパス(デフォルトはリポジトリのルート)

        Returns:
            MarkdownFileオブジェクトのリスト
        """
        try:
            url = f"{self.base_url}/repos/{self.owner}/{self.repo}/git/trees/main"
            # recursive=1 を追加してすべてのファイルを一度に取得
            params = {"recursive": "1"}

            response = requests.get(url, headers=self.headers, params=params)
            response.raise_for_status()

            tree = response.json()["tree"]
            files = []

            # pathが指定されている場合は、そのパス以下のファイルのみをフィルタ
            for item in tree:
                if (item["type"] == "blob" and
                    item["path"].endswith(".md") and
                        item["path"].startswith(self.path)):

                    content = self._get_file_content(item["path"])
                    last_modified = self._get_file_last_modified(item["path"])
                    file = MarkdownFile(
                        path=item["path"],
                        content=content,
                        last_modified=last_modified
                    )
                    yield dataclasses.asdict(file)

        except requests.exceptions.RequestException as e:
            print(f"Error fetching files: {e}")
            raise

    def _get_file_content(self, file_path: str) -> str:
        """
        ファイルの内容を取得

        Args:
            file_path: ファイルのパス

        Returns:
            ファイルの内容(文字列)
        """
        url = f"{self.base_url}/repos/{self.owner}/{self.repo}/contents/{file_path}"
        response = requests.get(url, headers=self.headers)
        response.raise_for_status()

        content = response.json()
        if content.get("encoding") == "base64":
            return base64.b64decode(content["content"]).decode('utf-8')
        else:
            response = requests.get(
                content["download_url"], headers=self.headers)
            response.raise_for_status()
            return response.text

    def _get_file_last_modified(self, file_path: str) -> datetime:
        """
        ファイルの最終更新日を取得

        Args:
            file_path: ファイルのパス

        Returns:
            最終更新日のdatetimeオブジェクト
        """
        url = f"{self.base_url}/repos/{self.owner}/{self.repo}/commits"
        params = {
            "path": file_path,
            "per_page": 1  # 最新のコミットのみ取得
        }

        response = requests.get(url, headers=self.headers, params=params)
        response.raise_for_status()

        commits = response.json()
        if commits:
            # コミットの日時を取得してdatetimeオブジェクトに変換
            last_modified_str = commits[0]["commit"]["committer"]["date"]
            return datetime.strptime(last_modified_str, "%Y-%m-%dT%H:%M:%SZ")
        else:
            return datetime.min  # コミットが見つからない場合のフォールバック

@dlt.resource(
    name="blog_content",
    write_disposition="replace"
)
def get_resources(fetcher: GitHubMarkdownFetcher):
    # print(f"fmarkdown sile, {next(fetcher.get_all_files())}")
    yield from fetcher.get_all_files()

bigquery転送周り

resourceをどこに、どのような手法で転送するか

取得定義をinstanceとして生成して、リソースの取得をしている。最初はクラスメソッドで書こうとしたが、@dlt.resourceデコレータが使えなかったのでリソース取得用関数を適当に書いている

デフォルトで転送するだけならbigquery_adapterを使わなくとも良いが、descriptonやpartitionなど、(おそらく各種宛先の)独自の設定を入れたい場合はadapterでの指定が必要になってくる

./tasks/blog_content.py

def load() -> dlt.common.pipeline.LoadInfo:
    fetcher = blog_content.GitHubMarkdownFetcher(
        owner=owner,
        repo=repo,
        path="content",
        token=dlt.secrets["sources.rest_api_pipeline.github_source"]
    )

    pipeline = dlt.pipeline(
        pipeline_name='blog_content',
        destination='bigquery',
        dataset_name='blog_info',
    )
    load_info = pipeline.run(
        bigquery_adapter(
            blog_content.get_resources(fetcher),
            table_description='blog content',
        )
    )

    row_counts = pipeline.last_trace.last_normalize_info.row_counts
    print(f"row count: {row_counts}")

    return load_info

実行ログ。prefect flowとして動かしている。取得行数なんかも表示できる

python flows/blog_content.py 
16:58:14.266 | INFO    | prefect.engine - Created flow run 'gabby-mongrel' for flow 'blog_content'
16:58:15.728 | INFO    | Flow run 'gabby-mongrel' - start load
/workspaces/prefect/.venv/lib/python3.11/site-packages/google/cloud/bigquery/client.py:589: UserWarning: Cannot create BigQuery Storage client, the dependency google-cloud-bigquery-storage is not installed.
  warnings.warn(
16:58:46.080 | INFO    | Task run 'load_blog_content-ad3' - row count: {'blog_content': 114}
16:59:58.553 | INFO    | Task run 'load_blog_content-ad3' - load info Pipeline blog_content load step completed in 18.05 seconds
1 load package(s) were loaded to destination bigquery and into dataset blog_info
The bigquery destination used xxxxx.iam.gserviceaccount.com@xxxx location to store data
Load package 1735837097.1531174 is LOADED and contains no failed jobs
16:59:58.556 | INFO    | Task run 'load_blog_content-ad3' - Finished in state Completed()
16:59:58.557 | INFO    | Flow run 'gabby-mongrel' - loaded Pipeline blog_content load step completed in 18.05 seconds
...
17:00:16.263 | INFO    | Flow run 'gabby-mongrel' - Finished in state Completed()

転送先のデータセットにはdltのpipeline jobの履歴管理用のテーブルがいくつか、ロード先のテーブルにはロード時の_dlt_id、_dlt_load_idなどのカラムが追加される。要するにいつのロードジョブにて格納されたデータなのかが辿れるようになっている。一回壊れたら修正や確認が大変そうである

気にしたところ

stagingのデータセット名

デフォルトで

{table_name}_stating
の名称でキャッシュ用のデータセットが作成されるが、ぱっと見dbtのstagingとかぶるので、 configで名称を設定した

# .dlt/config.toml
[destination.bigquery]
location = "US"
staging_dataset_name_layout="%s_dlt_staging"

dlt secretの格納

値は基本的にprefect cloudに保持しておいて、実行時にロードして、dltのsecretとして設定するようにしている

次点では.envを使うとよさそう。いい感じにsecrets.tomlを生成できるようなモジュールもなさそうだった

    dlt.secrets["sources.rest_api_pipeline.github_source"] = GitHubCredentials.load(
        "github-credentials-block").token.get_secret_value()

とにかく格納先や取得元についてconfigで設定できてしまうのは便利。scd2などの自前実装が大変そうなものも対応しているので、細かい要件にも対応できそうな予感。反面、pipelineの状態管理などは暗黙的になってしまう節があるので、実運用は大変そう。実際ローカルで開発している際、キャッシュ的なフォルダを削除 

rm -r ~/.dlt
して再実行したらエラーがなおったりもした

参考

あまりまとまってないが、リポジトリはこちら

https://github.com/uni-3/prefect/

2025, Built with Gatsby. This site uses Google Analytics.