Obsidian Gitプラグインで学ぶ、Gitのロック機構の仕組み

※ この記事はAIによって執筆されています。

はじめに

Obsidianで自動バックアップのためにGitプラグインを使っていたところ、ターミナルからgit addを実行しようとしたら、以下のようなエラーが出て困ったことはありませんか?

fatal: Unable to create '.git/index.lock': File exists.

Another git process seems to be running in this repository.

この記事では、このエラーの背景にあるGitのロック機構について、技術的に深掘りして解説します。

目次

Gitのロック機構とは

Gitは、複数のプロセスが同時にリポジトリを変更することを防ぐため、ロックファイルという仕組みを使っています。

主要なロックファイル

Gitリポジトリには、以下のようなロックファイルが使われます:

  • .git/index.lock - git addgit commit実行時に作成
  • .git/refs/heads/<branch>.lock - ブランチ更新時に作成
  • .git/packed-refs.lock - 参照の最適化時に作成

これらのファイルは、Git操作中に一時的に作成され、操作完了後に自動的に削除されます。

ロックの動作フロー

Gitがどのようにロックファイルを使っているか、git addを例に見てみましょう。

  1. Gitコマンド実行開始
    • .git/index.lockファイルを作成(排他制御開始)
    • ファイルが既に存在する場合は即座にエラー
  2. 操作の実行
    • .git/index.lockに新しいインデックス情報を書き込み
  3. 操作完了
    • .git/index.lock.git/indexにアトミックにリネーム
    • ロックファイルが消える

重要なのは、Gitは待機せずに即座に失敗するという点です。これを「フェイル・ファスト」設計と呼びます。

なぜロックが必要なのか

データ破損の防止

もしロック機構がなかったら、以下のような問題が発生します:

# もしロックがなかったら...

# プロセスA: git add file1.txt
# - .git/indexファイルを読み込み
# - ファイル情報を追加
# - .git/indexに書き込み中...

# プロセスB: git add file2.txt (同時実行)
# - .git/indexファイルを読み込み (古いデータ)
# - ファイル情報を追加
# - .git/indexに書き込み
# → プロセスAの変更が消える!

このような競合状態(Race Condition)を防ぐために、ロック機構が必須なのです。

アトミック操作の保証

Gitは、ロックファイルを使ってアトミック(原子的)な書き込みを実現しています:

# Gitの安全な書き込み手順
1. .git/index.lock を作成 (排他制御開始)
2. .git/index.lock に新しいデータを書き込み
3. rename(.git/index.lock, .git/index)  # アトミック操作

ファイルのリネームは、ほとんどのファイルシステムで1命令で完了する操作です。そのため、以下のような中途半端な状態にはなりません:

  • 成功 ✓
  • 失敗 ✓
  • 中途半端 ✗(発生しない)

Obsidian Gitプラグインとの競合

プラグインの設定を確認

.obsidian/plugins/obsidian-git/data.jsonを見ると、以下のような設定があります:

{
  "commitMessage": "vault backup: {{date}}",
  "autoSaveInterval": 1,
  "autoPushInterval": 1,
  "autoPullInterval": 1,
  "autoPullOnBoot": true
}

この設定では、1分ごとに自動的にgit addgit commitgit pushが実行されます。

競合が発生する仕組み

Obsidian Gitプラグイン
  ├→ 1分ごとに自動実行 (autoSaveInterval: 1)
  │   └→ git add → git commit → git push
  │
  └→ .git/index.lock を保持中(数秒間)
       ↓
     ターミナルで git add 実行
       ↓
     エラー: "Unable to create '.git/index.lock'"

プラグインがロックを保持している間は、ターミナルからのGit操作が即座に失敗します。

技術的な実装の詳細

POSIX環境でのロック実装

Gitの内部では、以下のようなシステムコールを使ってロックを実現しています:

// Gitの内部実装(簡略版)
int lock_file(const char *path) {
    // O_CREAT | O_EXCL: ファイルが存在しなければ作成、
    // 既に存在すればエラー(アトミック操作)
    int fd = open(path, O_CREAT | O_EXCL | O_WRONLY, 0666);

    if (fd < 0) {
        if (errno == EEXIST) {
            // ロックファイルが既に存在
            return -1;  // ロック取得失敗
        }
    }
    return fd;  // ロック取得成功
}

O_CREAT | O_EXCLフラグの組み合わせにより、ファイルシステムレベルでアトミックなロック取得が可能になります。

なぜスピンロックではないのか

多くの並行処理システムでは、ロックが取れるまで待機する「スピンロック」を使いますが、Gitは即座に失敗します。その理由は:

  • Git操作は通常数秒で完了する
  • 長時間のロックは異常な状態(クラッシュなど)を示す
  • ユーザーに状況を知らせるべき

この設計により、デッドロックやハングアップを防いでいます。

認証処理の仕組み

Obsidian Gitプラグインには、認証情報を安全に扱うための工夫があります。

.obsidian/plugins/obsidian-git/obsidian_askpass.shというスクリプトを見ると:

#!/bin/sh

PROMPT="$1"
TEMP_FILE="$OBSIDIAN_GIT_CREDENTIALS_INPUT"

cleanup() {
    rm -f "$TEMP_FILE" "$TEMP_FILE.response"
}
trap cleanup EXIT

echo "$PROMPT" > "$TEMP_FILE"

while [ ! -e "$TEMP_FILE.response" ]; do
    if [ ! -e "$TEMP_FILE" ]; then
        echo "Trigger file got removed: Abort" >&2
        exit 1
    fi
    sleep 0.1
done

RESPONSE=$(cat "$TEMP_FILE.response")
echo "$RESPONSE"

この仕組みは以下のように動作します:

  1. Gitが認証情報を要求
  2. スクリプトが一時ファイルにプロンプトを書き込み
  3. Obsidianプラグインがそれを検出
  4. ユーザーにパスワード入力を促す
  5. 入力された認証情報を.responseファイルに書き込み
  6. スクリプトがそれを読み取ってGitに渡す

これにより、コマンドラインでパスワードを直接扱わずに済みます。

競合を回避する方法

方法1: プラグインを一時停止

.obsidian/plugins/obsidian-git/data.jsonを編集:

{
  "autoSaveInterval": 0,
  "autoPushInterval": 0,
  "autoPullInterval": 0
}

すべてを0にすることで、自動実行を完全に無効化できます。

方法2: タイミングをずらす

自動実行の間隔を長くする:

{
  "autoSaveInterval": 10,  // 10分に1回
  "autoPushInterval": 10
}

これにより、ターミナル操作の余地が増えます。

方法3: リトライロジックを実装

ターミナルでスクリプトを使う:

#!/bin/bash
# 最大10回リトライ
for i in {1..10}; do
  if git add .; then
    echo "Success!"
    break
  else
    echo "Locked, retrying in 2 seconds... ($i/10)"
    sleep 2
  fi
done

このスクリプトは、ロックされている場合に自動的にリトライします。

トラブルシューティング

ロックファイルが残ってしまった場合

プラグインやGitがクラッシュすると、ロックファイルが残ることがあります:

# 1. Gitプロセスが実行中でないことを確認
ps aux | grep git

# 2. ロックファイルを手動で削除
rm -f .git/index.lock

# 3. 再度Git操作を試す
git add .

⚠️ 警告: Gitプロセスが実行中の場合は削除しないでください。データ破損の原因になります。

まとめ

この記事では、以下について学びました:

  • Gitのロックファイルは、データ破損を防ぐための重要な仕組み
  • ロックファイルにより、アトミックな操作が保証される
  • Gitは「フェイル・ファスト」設計で、即座にエラーを返す
  • Obsidian Gitプラグインとの競合は、自動実行の設定で回避可能
  • ファイルシステムレベルのO_CREAT | O_EXCLフラグでロックを実現

Obsidian Gitプラグインのような自動化ツールは便利ですが、その背後にある技術を理解することで、より安全に使いこなせるようになります。

参考資料


この記事が役に立ったら、ぜひブックマークやシェアをお願いします!

Python BigQuery ClientのRowIteratorを理解する - ページングとデータ取得の仕組み

※ 本記事はAIによって書かれています。

PythonでBigQueryを使う際、クエリ結果を扱うRowIteratorの挙動を正しく理解していますか?この記事では、RowIteratorの内部実装を詳しく解説し、ネットワーク通信とローカルデータアクセスのタイミングを明らかにします。

はじめに

BigQueryのPythonクライアントライブラリを使ってクエリを実行すると、結果はRowIteratorオブジェクトとして返されます。

query_job = client.query("SELECT * FROM dataset.table")
results = query_job.result()  # RowIteratorを取得

for row in results:
    print(row.name, row.age)

このシンプルなコードの裏側で、実は複雑なページング処理が行われています。本記事では、その仕組みを徹底的に解説します。

RowIteratorの基本的な挙動

遅延評価とページング

RowIteratorは 遅延評価(lazy evaluation)ページング を組み合わせたイテレータです。重要なポイントは以下の通りです:

  1. 初回アクセス時にのみネットワーク通信が発生
  2. ページ単位でデータを取得
  3. 取得したページはメモリにキャッシュ
  4. 次のページが必要になったタイミングで追加取得
データ取得フローの可視化

RowIteratorがどのタイミングでネットワーク通信を行い、どのタイミングでローカルキャッシュを使用するかを図で見てみましょう。

フロー図

この図から、以下のことが分かります:

  1. 🌐 ネットワーク通信: 最初のイテレーションとページ境界でのみ発生
  2. 💾 ローカルキャッシュ: 同一ページ内では配列からの高速アクセス

クラス階層構造

RowIteratorは以下のような継承関係になっています:

google.api_core.page_iterator.Iterator
    ↓
google.api_core.page_iterator.HTTPIterator
    ↓
google.cloud.bigquery.table.RowIterator

この階層構造により、HTTP通信によるページング処理が実装されています。

重要な処理のコード解説

1. Iterator基底クラス - イテレーションの骨格

まず、基底となる`Iterator`クラスを見ていきます。

class Iterator:
    """ページングされたAPIメソッドのための基底イテレータ"""
    
    def __init__(
        self,
        client,
        item_to_value=_item_to_value_identity,
        page_token=None,
        max_results=None,
    ):
        self._started = False           # イテレーション開始フラグ
        self.__active_iterator = None   # 💾 現在アクティブなイテレータ
        self.page_number = 0            # 現在のページ番号
        self.next_page_token = page_token  # 🔑 次のページトークン
        self.num_results = 0            # 取得済み結果数
    
    def __iter__(self):
        """イテレータ自身を返す"""
        if self._started:
            raise ValueError("Iterator has already started")
        self._started = True
        
        # 💡 ページイテレータを取得して行イテレータを作成
        self.__active_iterator = self._items_iter()
        return self.__active_iterator
    
    def _items_iter(self):
        """各アイテムをイテレートするジェネレータ"""
        # 🌐 ページを順次取得しながら、各ページ内のアイテムをyield
        for page in self._page_iter(increment=True):
            for item in page:
                self.num_results += 1
                yield item
                
                if self.max_results and self.num_results >= self.max_results:
                    return

ポイント:

  • _items_iter(): 行単位でイテレートする最上位ジェネレータ
  • _page_iter(): ページ単位でイテレートする中間ジェネレータ
  • 再イテレート不可: _startedフラグで制御
2. HTTPIterator - ページング実装の核心

次に、HTTP通信を行うHTTPIterator_next_page()メソッドを見てみましょう。

class HTTPIterator(Iterator):
    """HTTP/JSON API用のイテレータ"""
    
    def _next_page(self):
        """🌐 次のページをHTTP経由で取得
        
        ⭐ これがネットワーク通信が発生するメソッド!
        """
        # 次のページトークンがなければ終了
        if self.next_page_token is None:
            return None
        
        # 🔧 リクエストパラメータを構築
        params = self.extra_params.copy() if self.extra_params else {}
        params[self._PAGE_TOKEN] = self.next_page_token
        
        # max_resultsを考慮してページサイズを調整
        if self.max_results is not None:
            remaining = self.max_results - self.num_results
            if self._page_size is not None:
                params[self._MAX_RESULTS] = min(remaining, self._page_size)
            else:
                params[self._MAX_RESULTS] = remaining
        
        # 🌐 HTTPリクエストを実行!
        response = self.api_request(
            method=self._HTTP_METHOD,  # "GET"
            path=self.path,
            query_params=params
        )
        
        # レスポンスからデータを抽出
        items = response.get(self._items_key, ())
        
        # 🔑 次のページトークンを更新
        self.next_page_token = response.get(self._next_token)
        
        # 💾 Pageオブジェクトを作成
        page = Page(self, items, self.item_to_value)
        
        return page

重要ポイント:

  • このメソッドだけがHTTPリクエストを送信
  • next_page_tokenでページング状態を管理
  • next_page_tokenNoneになったら全ページ取得完了
3. Pageクラス - ローカルデータのイテレーション

ページ内のデータは、単純な配列アクセスで取得されます。

class Page:
    """単一ページの結果を表すクラス"""
    
    def __init__(self, parent, items, item_to_value, raw_page=None):
        self._parent = parent
        self._items = items  # 💾 メモリ上のデータ
        self._item_to_value = item_to_value
        self._index = 0  # 現在の読み取り位置
    
    def __next__(self):
        """💾 次のアイテムをローカルから取得
        
        ⭐ ネットワーク通信は発生しない!
        """
        if self._index >= len(self._items):
            raise StopIteration
        
        # 💾 メモリ上の配列から取得
        item = self._items[self._index]
        self._index += 1
        
        # アイテムを変換して返す
        return self._item_to_value(self._parent, item)

ポイント:

  • _itemsは💾メモリ上の配列
  • __next__()は配列から順次取得するだけ(高速)
  • ネットワーク通信は一切発生しない
4. RowIterator - BigQuery固有の実装

BigQuery専用のRowIteratorクラスは、JSON行データをRowオブジェクトに変換します。

class RowIterator(HTTPIterator):
    """BigQueryの行イテレータ"""
    
    def __init__(
        self,
        client,
        api_request,
        path,
        schema,
        page_token=None,
        max_results=None,
        page_size=None,
        **kwargs
    ):
        # 親クラスを初期化
        super(RowIterator, self).__init__(
            client,
            api_request,
            path,
            item_to_value=_item_to_row,  # ⭐ JSON → Row変換
            items_key="rows",             # ⭐ BigQueryは"rows"キー
            page_token=page_token,
            max_results=max_results,
            page_size=page_size,
        )
        
        self._schema = schema
        self._field_to_index = self._make_field_to_index()


def _item_to_row(iterator, resource):
    """JSONアイテムをRowオブジェクトに変換"""
    return Row(
        _row_tuple_from_json(resource, iterator.schema),
        iterator._field_to_index
    )

DataFrameへの変換

to_dataframe() - 全データを一度に取得
def _to_dataframe_tabledata_list(self, dtypes):
    """tabledata.list APIでDataFrameを構築"""
    column_names = [field.name for field in self.schema]
    frames = []
    
    # 🌐 全ページをループで取得
    for page in self.pages:
        # 💾 ページからDataFrameを作成
        page_dataframe = self._page_to_dataframe(
            page, column_names, dtypes
        )
        frames.append(page_dataframe)
    
    # 💾 全DataFrameを結合
    if frames:
        return pandas.concat(frames, ignore_index=True)
    else:
        return pandas.DataFrame(columns=column_names)

⚠️ 注意: このメソッドは全ページを一度にメモリに読み込みます。大きなテーブルではメモリ不足の可能性があります。

to_dataframe_iterable() - ストリーム処理

メモリ効率的な処理には、`to_dataframe_iterable()`を使用します。

def to_dataframe_iterable(self, bqstorage_client=None, dtypes=None):
    """ページ単位でDataFrameをストリーム処理
    
    ✅ メモリ効率的!
    """
    column_names = [field.name for field in self.schema]
    
    # 🌐 ページを1つずつ取得
    for page in self.pages:
        # 💾 1ページ分のDataFrameを作成
        page_df = self._page_to_dataframe(page, column_names, dtypes)
        
        # ⭐ yieldで返す = メモリを保持しない
        yield page_df


# 使用例
for df_chunk in row_iterator.to_dataframe_iterable():
    # 💾 1ページ分だけメモリに存在
    process_chunk(df_chunk)
    # ループを抜けるとdf_chunkは破棄される

実行フローの可視化

パターンA: 行単位のイテレーション
ユーザーコード: for row in iterator
    ↓
Iterator.__iter__()
    ↓ _items_iter()を作成
_items_iter()
    ↓ _page_iter()を呼び出し
_page_iter()
    ↓ 🌐 _next_page()を呼び出し
HTTPIterator._next_page()
    ↓ 🌐 HTTPリクエスト送信
BigQuery API
    ↓ レスポンス返却
Page作成 💾
    ↓ Pageから1行取り出し
_item_to_row() → Row変換
    ↓
ユーザーコードにRow返却

[ページ内の残りの行]
    ↓ 💾 Pageから順次取り出し (ネットワーク通信なし)
    
[ページ終了]
    ↓ _page_iter()が次のページ要求
    ↓ 🌐 _next_page()再度呼び出し
パターンB: DataFrameへの一括変換
ユーザーコード: iterator.to_dataframe()
    ↓
_to_dataframe_tabledata_list()
    ↓
for page in self.pages:
    Page 1取得 🌐 → DataFrame変換 💾
    Page 2取得 🌐 → DataFrame変換 💾
    Page 3取得 🌐 → DataFrame変換 💾
    ↓
pandas.concat() 💾 # 全て結合
    ↓
完全なDataFrame返却

実践的な使用パターン

パターン1: 基本的なイテレーション(推奨)
# シンプルで効率的
query_job = client.query("SELECT * FROM table")
for row in query_job.result():
    print(f"{row.name}: {row.age}")
    # ページングは自動的に処理される
パターン2: 全データをDataFrameに
# ⚠️ 注意: 全データをメモリに読み込む
df = query_job.result().to_dataframe()
print(df.head())
パターン3: ストリーム処理(大量データ推奨)
# メモリ効率的
for df_chunk in query_job.result().to_dataframe_iterable():
    # 各ページを処理
    process(df_chunk)
    # 処理後、メモリは解放される
パターン4: ページサイズの制御
# 1ページ500行ずつ取得
results = query_job.result(page_size=500)
for row in results:
    process(row)

パフォーマンスのTips

ネットワーク通信が発生するタイミング 🌐
  1. `HTTPIterator._next_page()` - ページ取得時
  2. 初回イテレーション時
  3. ページ境界を超える時
ローカルデータのイテレーション 💾
  1. Page.__next__() - ページ内の行取得
  2. 同一ページ内でのforループ
  3. _items配列へのアクセス
最適化のポイント
      1. 適切なpage_sizeを設定

デフォルトは環境依存ですが、通常数千行です。

results = query_job.result(page_size=1000)
      1. BigQuery Storage APIを使用

高速化のために利用できます。

df = query_job.result().to_dataframe(create_bqstorage_client=True)
      1. 大量データは`to_dataframe_iterable()`を使用

メモリ効率的な処理が可能です。

      1. 再イテレート不可に注意

RowIteratorは1回しかイテレートできません。

# ❌ 2回目のイテレートは不可
iterator = query_job.result()
for row in iterator:
    print(row)  # 1回目: OK

for row in iterator:
    print(row)  # 2回目: StopIteration例外

# ✅ 複数回使う場合はリスト化
rows = list(query_job.result())
for row in rows:
    print(row)  # 何度でもOK

まとめ

RowIteratorの3層構造
Iterator._items_iter()     ← 行単位のイテレーション
    ↓
Iterator._page_iter()      ← ページ単位のイテレーション  
    ↓
HTTPIterator._next_page()  ← HTTPリクエスト実行
重要なクラスの役割
クラス名 役割
Iterator イテレーションの骨格を提供
HTTPIterator HTTP通信によるページング実装
Page 1ページ分のデータをメモリに保持
RowIterator BigQuery固有の行処理
キーポイント
  1. ネットワーク通信は_next_page()のみで発生
  2. ページ内は単純な配列アクセス(高速)
  3. 遅延評価により必要な時だけデータを取得
  4. ページングは自動で行われる

この構造により、BigQueryは大量データを効率的に処理できます。

参考リンク

NBAで学ぶスタースキーマ

※この記事はAIによって作成されています

はじめに

データウェアハウスの設計において、スタースキーマは最も重要な概念の一つです。本記事では、NBA(アメリカプロバスケットボールリーグ)のデータを例に、スタースキーマの基本概念から実装まで、実践的に学んでいきます。

スタースキーマとは

スタースキーマは、データウェアハウスで使用される多次元データモデルの一種です。その名前の通り、中央にファクトテーブルがあり、周りにディメンションテーブルが星のように配置された構造を持ちます。

スタースキーマの特徴

  • シンプルな構造: 理解しやすく、クエリ性能が優秀
  • 非正規化: データの重複を許容し、クエリ速度を優先
  • 分析に最適: OLAP(Online Analytical Processing)に適している

NBAデータでスタースキーマを設計する

1. ビジネス要件の定義

まず、どのような分析をしたいかを明確にします:

  • 選手の試合ごとのパフォーマンス分析
  • チーム別、シーズン別の成績比較
  • 時系列での傾向分析
  • 会場別のホームアドバンテージ分析

2. ファクトテーブルの設計

中核となるファクトテーブル game_stats を設計します:

CREATE TABLE game_stats (
    -- サロゲートキー
    game_stats_id BIGINT PRIMARY KEY,
    
    -- 外部キー(ディメンションテーブルへの参照)
    player_key INT NOT NULL,
    team_key INT NOT NULL,
    opponent_team_key INT NOT NULL,
    date_key INT NOT NULL,
    venue_key INT NOT NULL,
    game_key INT NOT NULL,
    
    -- メジャー(分析対象の数値データ)
    points INT,
    rebounds INT,
    assists INT,
    steals INT,
    blocks INT,
    turnovers INT,
    field_goals_made INT,
    field_goals_attempted INT,
    three_pointers_made INT,
    three_pointers_attempted INT,
    free_throws_made INT,
    free_throws_attempted INT,
    minutes_played DECIMAL(5,2),
    plus_minus INT,
    
    -- 計算済みメジャー
    field_goal_percentage DECIMAL(5,3),
    three_point_percentage DECIMAL(5,3),
    free_throw_percentage DECIMAL(5,3),
    efficiency_rating DECIMAL(6,2)
);

3. ディメンションテーブルの設計

選手ディメンション(Player Dimension)

CREATE TABLE dim_player (
    player_key INT PRIMARY KEY,
    player_id VARCHAR(20) NOT NULL, -- ナチュラルキー
    player_name VARCHAR(100) NOT NULL,
    position VARCHAR(20),
    height_cm INT,
    weight_kg INT,
    birth_date DATE,
    nationality VARCHAR(50),
    draft_year INT,
    draft_round INT,
    draft_pick INT,
    
    -- SCD Type 2 対応
    effective_date DATE NOT NULL,
    expiry_date DATE,
    is_current BOOLEAN DEFAULT TRUE,
    
    -- その他の属性
    college VARCHAR(100),
    years_of_experience INT
);

サンプルデータ例:

player_key player_id player_name position height_cm weight_kg birth_date nationality draft_year effective_date is_current
1001 lebron_001 LeBron James SF 206 113 1984-12-30 USA 2003 2018-07-01 TRUE
1002 curry_001 Stephen Curry PG 191 84 1988-03-14 USA 2009 2009-06-25 TRUE
1003 durant_001 Kevin Durant SF 211 109 1988-09-29 USA 2007 2023-02-08 TRUE
1004 giannis_001 Giannis Antetokounmpo PF 211 110 1994-12-06 Greece 2013 2013-06-27 TRUE

チームディメンション(Team Dimension)

CREATE TABLE dim_team (
    team_key INT PRIMARY KEY,
    team_id VARCHAR(10) NOT NULL,
    team_name VARCHAR(100) NOT NULL,
    team_city VARCHAR(50) NOT NULL,
    conference VARCHAR(20), -- Eastern/Western
    division VARCHAR(50),
    founded_year INT,
    arena_name VARCHAR(100),
    team_colors VARCHAR(100),
    
    -- SCD Type 2 対応
    effective_date DATE NOT NULL,
    expiry_date DATE,
    is_current BOOLEAN DEFAULT TRUE
);

サンプルデータ例:

team_key team_id team_name team_city conference division founded_year arena_name effective_date is_current
2001 LAL Los Angeles Lakers Los Angeles Western Pacific 1947 Crypto.com Arena 1999-10-17 TRUE
2002 GSW Golden State Warriors San Francisco Western Pacific 1946 Chase Center 2019-09-06 TRUE
2003 PHX Phoenix Suns Phoenix Western Pacific 1968 Footprint Center 1992-06-01 TRUE
2004 MIL Milwaukee Bucks Milwaukee Eastern Central 1968 Fiserv Forum 2018-08-26 TRUE

日付ディメンション(Date Dimension)

CREATE TABLE dim_date (
    date_key INT PRIMARY KEY, -- YYYYMMDD形式
    full_date DATE NOT NULL,
    day_of_week VARCHAR(20),
    day_of_month INT,
    day_of_year INT,
    week_of_year INT,
    month_name VARCHAR(20),
    month_number INT,
    quarter INT,
    year INT,
    
    -- NBA固有の属性
    nba_season VARCHAR(10), -- '2023-24'
    season_type VARCHAR(20), -- 'Regular Season', 'Playoffs', 'Preseason'
    is_holiday BOOLEAN,
    is_weekend BOOLEAN
);

サンプルデータ例:

date_key full_date day_of_week month_name quarter year nba_season season_type is_weekend
20231215 2023-12-15 Friday December 2 2023 2023-24 Regular Season FALSE
20240225 2024-02-25 Sunday February 1 2024 2023-24 Regular Season TRUE
20240415 2024-04-15 Monday April 2 2024 2023-24 Playoffs FALSE
20240620 2024-06-20 Thursday June 2 2024 2023-24 Playoffs FALSE

会場ディメンション(Venue Dimension)

CREATE TABLE dim_venue (
    venue_key INT PRIMARY KEY,
    venue_id VARCHAR(20) NOT NULL,
    venue_name VARCHAR(100) NOT NULL,
    city VARCHAR(50),
    state VARCHAR(50),
    country VARCHAR(50),
    capacity INT,
    opened_year INT,
    surface_type VARCHAR(20),
    
    -- 地理情報
    latitude DECIMAL(10,8),
    longitude DECIMAL(11,8),
    timezone VARCHAR(50)
);

サンプルデータ例:

venue_key venue_id venue_name city state capacity opened_year timezone
3001 crypto_arena Crypto.com Arena Los Angeles CA 20000 1999 America/Los_Angeles
3002 chase_center Chase Center San Francisco CA 18064 2019 America/Los_Angeles
3003 footprint_center Footprint Center Phoenix AZ 18422 1992 America/Phoenix
3004 fiserv_forum Fiserv Forum Milwaukee WI 17500 2018 America/Chicago

試合ディメンション(Game Dimension)

CREATE TABLE dim_game (
    game_key INT PRIMARY KEY,
    game_id VARCHAR(20) NOT NULL,
    season VARCHAR(10),
    game_type VARCHAR(20), -- 'Regular Season', 'Playoffs'
    playoff_round VARCHAR(30), -- 'First Round', 'Finals'等
    game_number INT, -- シリーズ内でのゲーム番号
    
    -- 試合結果
    home_team_score INT,
    away_team_score INT,
    overtime_periods INT,
    attendance INT,
    
    -- 試合状況
    game_status VARCHAR(20), -- 'Final', 'In Progress'
    broadcast_networks VARCHAR(200)
);

サンプルデータ例:

game_key game_id season game_type home_team_score away_team_score overtime_periods attendance
4001 0022300456 2023-24 Regular Season 128 123 1 20000
4002 0022300457 2023-24 Regular Season 115 108 0 18064
4003 0042300101 2023-24 Playoffs 118 112 0 18422
4004 0042300102 2023-24 Playoffs 105 98 0 17500

ファクトテーブルのサンプルデータ

game_stats テーブルの例:

game_stats_id player_key team_key date_key venue_key game_key points rebounds assists minutes_played field_goal_percentage
100001 1001 2001 20231215 3001 4001 35 8 12 42.5 0.562
100002 1002 2002 20231215 3002 4002 28 5 7 38.2 0.478
100003 1003 2003 20240225 3003 4003 31 9 6 40.1 0.521
100004 1004 2004 20240225 3004 4004 29 14 8 41.8 0.545

実際のデータ例で理解するスタースキーマ

上記のサンプルデータを使って、スタースキーマがどのように機能するかを具体的に見てみましょう。

データの関連性の確認

ファクトテーブルの1行目のデータ(game_stats_id: 100001)を例に取ると:

SELECT 
    gs.points,
    gs.rebounds,
    gs.assists,
    p.player_name,
    t.team_name,
    d.full_date,
    v.venue_name,
    g.home_team_score,
    g.away_team_score
FROM game_stats gs
JOIN dim_player p ON gs.player_key = p.player_key
JOIN dim_team t ON gs.team_key = t.team_key  
JOIN dim_date d ON gs.date_key = d.date_key
JOIN dim_venue v ON gs.venue_key = v.venue_key
JOIN dim_game g ON gs.game_key = g.game_key
WHERE gs.game_stats_id = 100001;

結果:

points rebounds assists player_name team_name full_date venue_name home_team_score away_team_score
35 8 12 LeBron James Los Angeles Lakers 2023-12-15 Crypto.com Arena 128 123

この結果から、「LeBron Jamesが2023年12月15日にCrypto.com Arenaで行われた試合で35点、8リバウンド、12アシストを記録し、Lakers が128-123で勝利した」という情報が分かります。

集計クエリの例

同じサンプルデータを使用した分析例:

-- 選手別の平均スタッツ(サンプルデータから)
SELECT 
    p.player_name,
    AVG(gs.points) as avg_points,
    AVG(gs.rebounds) as avg_rebounds,
    AVG(gs.assists) as avg_assists
FROM game_stats gs
JOIN dim_player p ON gs.player_key = p.player_key
GROUP BY p.player_name;

結果(サンプルデータベース):

player_name avg_points avg_rebounds avg_assists
LeBron James 35.0 8.0 12.0
Stephen Curry 28.0 5.0 7.0
Kevin Durant 31.0 9.0 6.0
Giannis Antetokounmpo 29.0 14.0 8.0

スタースキーマの実装ベストプラクティス

1. サロゲートキーの使用

各ディメンションテーブルには、ビジネスキーとは独立したサロゲートキーを使用します:

-- 良い例:サロゲートキーを使用
SELECT 
    p.player_name,
    SUM(gs.points) as total_points
FROM game_stats gs
JOIN dim_player p ON gs.player_key = p.player_key
WHERE p.is_current = TRUE;

-- 避けるべき例:ナチュラルキーの直接使用
-- ナチュラルキーは変更される可能性があるため非推奨

2. Slowly Changing Dimensions (SCD) の実装

選手の所属チーム変更などを追跡するためのSCD Type 2の実装例:

-- 選手のトレード処理例
-- 1. 現在のレコードを無効化
UPDATE dim_player 
SET expiry_date = '2024-02-08',
    is_current = FALSE
WHERE player_id = 'lebron_james_001' 
    AND is_current = TRUE;

-- 2. 新しいレコードを挿入
INSERT INTO dim_player (
    player_key, player_id, player_name, 
    current_team, effective_date, is_current
) VALUES (
    10001, 'lebron_james_001', 'LeBron James',
    'MIA', '2024-02-09', TRUE
);

3. 階層構造の実装

日付ディメンションでの階層構造の例:

-- 年→四半期→月→日の階層での集計
SELECT 
    d.year,
    d.quarter,
    d.month_name,
    AVG(gs.points) as avg_points
FROM game_stats gs
JOIN dim_date d ON gs.date_key = d.date_key
WHERE d.nba_season = '2023-24'
GROUP BY d.year, d.quarter, d.month_name
ORDER BY d.year, d.quarter, d.month_number;

実践的なクエリ例

1. 選手の月別パフォーマンス分析

SELECT 
    p.player_name,
    d.month_name,
    COUNT(*) as games_played,
    AVG(gs.points) as avg_points,
    AVG(gs.rebounds) as avg_rebounds,
    AVG(gs.assists) as avg_assists,
    AVG(gs.efficiency_rating) as avg_efficiency
FROM game_stats gs
JOIN dim_player p ON gs.player_key = p.player_key
JOIN dim_date d ON gs.date_key = d.date_key
WHERE d.nba_season = '2023-24'
    AND d.season_type = 'Regular Season'
    AND p.is_current = TRUE
GROUP BY p.player_name, d.month_name, d.month_number
ORDER BY p.player_name, d.month_number;

2. ホームアドバンテージ分析

SELECT 
    t.team_name,
    CASE 
        WHEN ht.team_key = gs.team_key THEN 'Home'
        ELSE 'Away'
    END as home_away,
    COUNT(*) as games,
    AVG(gs.points) as avg_points_scored,
    AVG(CASE WHEN g.home_team_score > g.away_team_score 
             AND ht.team_key = gs.team_key THEN 1
             WHEN g.away_team_score > g.home_team_score 
             AND at.team_key = gs.team_key THEN 1
             ELSE 0 END) as win_percentage
FROM game_stats gs
JOIN dim_team t ON gs.team_key = t.team_key
JOIN dim_game g ON gs.game_key = g.game_key
JOIN dim_team ht ON g.home_team_key = ht.team_key
JOIN dim_team at ON g.away_team_key = at.team_key
WHERE t.is_current = TRUE
GROUP BY t.team_name, home_away
ORDER BY t.team_name, home_away;

3. プレイオフパフォーマンス比較

SELECT 
    p.player_name,
    d.season_type,
    COUNT(*) as games,
    AVG(gs.points) as avg_points,
    AVG(gs.field_goal_percentage) as avg_fg_pct,
    AVG(gs.three_point_percentage) as avg_3p_pct
FROM game_stats gs
JOIN dim_player p ON gs.player_key = p.player_key
JOIN dim_date d ON gs.date_key = d.date_key
WHERE d.nba_season = '2023-24'
    AND p.is_current = TRUE
    AND gs.minutes_played >= 20  -- 最低出場時間でフィルタ
GROUP BY p.player_name, d.season_type
HAVING COUNT(*) >= 5  -- 最低試合数でフィルタ
ORDER BY p.player_name, d.season_type;

パフォーマンス最適化

1. インデックス戦略

-- ファクトテーブルのインデックス
CREATE INDEX idx_game_stats_player ON game_stats(player_key);
CREATE INDEX idx_game_stats_team ON game_stats(team_key);
CREATE INDEX idx_game_stats_date ON game_stats(date_key);
CREATE INDEX idx_game_stats_composite ON game_stats(date_key, team_key);

-- ディメンションテーブルのインデックス
CREATE INDEX idx_player_natural_key ON dim_player(player_id, is_current);
CREATE INDEX idx_team_natural_key ON dim_team(team_id, is_current);
CREATE INDEX idx_date_season ON dim_date(nba_season, season_type);

2. パーティショニング

大量データに対するパーティショニング戦略:

-- 日付によるパーティショニング
CREATE TABLE game_stats_partitioned (
    -- 同じカラム定義
) PARTITION BY RANGE (date_key);

-- 年度別パーティション作成
CREATE TABLE game_stats_2023 PARTITION OF game_stats_partitioned
    FOR VALUES FROM (20230101) TO (20240101);

CREATE TABLE game_stats_2024 PARTITION OF game_stats_partitioned
    FOR VALUES FROM (20240101) TO (20250101);

3. 集約テーブル

よく使用される集計データのマテリアライズドビュー:

CREATE MATERIALIZED VIEW mv_player_season_stats AS
SELECT 
    gs.player_key,
    d.nba_season,
    d.season_type,
    COUNT(*) as games_played,
    SUM(gs.points) as total_points,
    AVG(gs.points) as avg_points,
    SUM(gs.rebounds) as total_rebounds,
    AVG(gs.rebounds) as avg_rebounds,
    SUM(gs.assists) as total_assists,
    AVG(gs.assists) as avg_assists,
    AVG(gs.field_goal_percentage) as avg_fg_pct
FROM game_stats gs
JOIN dim_date d ON gs.date_key = d.date_key
GROUP BY gs.player_key, d.nba_season, d.season_type;

-- 定期的な更新
REFRESH MATERIALIZED VIEW mv_player_season_stats;

ETLプロセスの考慮事項

1. データ品質チェック

-- データ整合性チェックの例
-- 1. 参照整合性チェック
SELECT COUNT(*) as orphaned_records
FROM game_stats gs
LEFT JOIN dim_player p ON gs.player_key = p.player_key
WHERE p.player_key IS NULL;

-- 2. ビジネスルールチェック
SELECT game_key, SUM(minutes_played) as total_minutes
FROM game_stats
GROUP BY game_key
HAVING SUM(minutes_played) > 240 * 2; -- 1チーム最大240分 × 2チーム

2. 増分ロード戦略

-- 増分ロードのためのタイムスタンプ管理
ALTER TABLE game_stats ADD COLUMN 
    last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP;

-- 変更データのみ処理
SELECT * FROM source_game_stats 
WHERE last_modified > (
    SELECT MAX(last_updated) 
    FROM game_stats
);

まとめ

スタースキーマは、データウェアハウス設計の基本でありながら、強力な分析基盤を提供します。NBAデータの例を通じて見てきたように、以下の点が重要です:

設計の原則 - ビジネス要件を明確にしてからスキーマ設計を開始する - ファクトテーブルは分析対象のメジャーを中心に設計する - ディメンションテーブルは分析の軸となる属性を豊富に持つ

実装のベストプラクティス - サロゲートキーの一貫した使用 - SCD(Slowly Changing Dimensions)への適切な対応 - パフォーマンスを考慮したインデックスとパーティショニング

運用面での考慮事項 - データ品質チェックの自動化 - 効率的なETLプロセスの構築 - 集約テーブルによるクエリパフォーマンスの向上

スタースキーマの理解は、現代のデータ分析基盤において必須のスキルです。本記事で紹介した概念を基に、実際のプロジェクトでスタースキーマを活用してみてください。データの力を最大限に引き出し、価値ある洞察を得ることができるでしょう。


[asin:B08L39ZQ2N:detail]

[asin:B07H3TQ2XX:detail]

関連記事

タグ

データウェアハウス #スタースキーマ #SQL #NBA #データ分析 #OLAP

NBAで学ぶSCD(Slowly Changing Dimension)

※この記事はAIによって執筆されています。

はじめに

データウェアハウスにおいて、時間の経過とともに変化するデータをどのように管理するかは重要な課題です。SCD(Slowly Changing Dimension)は、このような変化するディメンションデータを効果的に処理するための手法です。

本記事では、NBA選手のデータを具体例として、SCD Type1からType7までの各手法を詳しく解説します。

SCDとは?

SCD(Slowly Changing Dimension)とは、時間の経過とともにゆっくりと変化するディメンションテーブルのデータを管理する手法です。例えば、選手の所属チーム、ポジション、住所などは時間とともに変化します。

前提:NBA選手ディメンションテーブルの初期状態

以下のNBA選手テーブルを例に各SCDタイプを説明します:

Players テーブル(初期状態)

player_key player_id name team position salary created_date updated_date
1 P001 LeBron James Lakers SF 44000000 2023-01-01 2023-01-01
2 P002 Stephen Curry Warriors PG 48000000 2023-01-01 2023-01-01

SCD Type 1: 上書き更新

概要: 既存のレコードを新しい値で上書きする最もシンプルな方法

特徴: - 履歴を保持しない - ストレージ効率が良い - 常に最新の状態のみ保持

適用例: LeBron Jamesの年俸が4400万ドルから4600万ドルに変更

更新後のテーブル:

player_key player_id name team position salary updated_date
1 P001 LeBron James Lakers SF 46000000 2024-01-15
2 P002 Stephen Curry Warriors PG 48000000 2023-01-01

SQL実装例:

UPDATE Players 
SET salary = 46000000, updated_date = '2024-01-15'
WHERE player_id = 'P001';

メリット: シンプル、ストレージ効率が良い
デメリット: 履歴が失われる、監査証跡なし

SCD Type 2: 履歴管理(新レコード追加)

概要: 変更時に新しいレコードを追加し、古いレコードを無効化

特徴: - 完全な履歴を保持 - 有効期間で管理 - 最も一般的なSCD手法

適用例: Stephen CurryがWarriorsからNetsに移籍

更新後のテーブル:

player_key player_id name team position salary start_date end_date is_current
1 P001 LeBron James Lakers SF 46000000 2023-01-01 9999-12-31 Y
2 P002 Stephen Curry Warriors PG 48000000 2023-01-01 2024-02-01 N
3 P002 Stephen Curry Nets PG 52000000 2024-02-01 9999-12-31 Y

SQL実装例:

-- 既存レコードを無効化
UPDATE Players 
SET end_date = '2024-02-01', is_current = 'N'
WHERE player_id = 'P002' AND is_current = 'Y';

-- 新レコード追加
INSERT INTO Players 
(player_id, name, team, position, salary, start_date, end_date, is_current)
VALUES 
('P002', 'Stephen Curry', 'Nets', 'PG', 52000000, '2024-02-01', '9999-12-31', 'Y');

メリット: 完全な履歴保持、時系列分析可能
デメリット: ストレージ使用量増加、クエリ複雑化

SCD Type 3: 限定履歴管理

概要: 現在値と以前の値を同一レコード内の異なるカラムで管理

特徴: - 直前の値のみ保持 - ファクトテーブルのキー変更不要 - 限定的な履歴管理

テーブル構造変更:

Column Type Description
previous_team VARCHAR(50) 直前の所属チーム
team_change_date DATE チーム変更日
ALTER TABLE Players 
ADD COLUMN previous_team VARCHAR(50),
ADD COLUMN team_change_date DATE;

適用例: LeBron JamesがLakersからHeatに移籍

更新後のテーブル:

player_key player_id name current_team previous_team team_change_date position
1 P001 LeBron James Heat Lakers 2024-03-01 SF
3 P002 Stephen Curry Nets NULL NULL PG

SQL実装例:

UPDATE Players 
SET previous_team = current_team,
    current_team = 'Heat',
    team_change_date = '2024-03-01'
WHERE player_id = 'P001';

メリット: シンプルなクエリ、ファクトテーブル影響なし
デメリット: 限定的な履歴、複数変更に対応困難

SCD Type 4: 履歴テーブル分離

概要: 現在データと履歴データを別テーブルで管理

特徴: - 現在テーブルと履歴テーブルに分離 - パフォーマンス最適化 - 明確な責務分離

テーブル設計:

Players_Current テーブル(現在データ):

player_key player_id name team position salary updated_date
1 P001 LeBron James Heat SF 46000000 2024-03-01
3 P002 Stephen Curry Nets PG 52000000 2024-02-01

Players_History テーブル(履歴データ):

history_key player_id name team position salary start_date end_date
1 P001 LeBron James Lakers SF 44000000 2023-01-01 2024-03-01
2 P002 Stephen Curry Warriors PG 48000000 2023-01-01 2024-02-01

更新処理例:

-- 履歴テーブルに移動
INSERT INTO Players_History 
SELECT player_key, player_id, name, team, position, salary, 
       updated_date AS start_date, CURRENT_DATE AS end_date
FROM Players_Current 
WHERE player_id = 'P001';

-- 現在テーブル更新
UPDATE Players_Current 
SET team = 'Heat', updated_date = CURRENT_DATE
WHERE player_id = 'P001';

メリット: 高パフォーマンス、明確な分離
デメリット: 複雑な管理、データ整合性確保が必要

SCD Type 5: ミニディメンション

概要: 頻繁に変化する属性を別ディメンションテーブルに分離

特徴: - 頻繁変化属性の分離 - ファクトテーブルの肥大化防止 - パフォーマンス向上

テーブル設計:

Players テーブル(メイン・安定属性):

player_key player_id name position height weight
1 P001 LeBron James SF 206 113
2 P002 Stephen Curry PG 191 84

Player_Contracts テーブル(ミニディメンション・変化頻繁):

contract_key contract_band salary_range team_type
1 High 40M-50M Large Market
2 Super Max 50M+ Large Market

Player_Contract_Bridge テーブル(ブリッジテーブル):

player_key contract_key start_date end_date
1 1 2023-01-01 2024-03-01
1 2 2024-03-01 9999-12-31
2 2 2023-01-01 9999-12-31

メリット: パフォーマンス向上、属性管理の柔軟性
デメリット: 設計複雑化、結合処理増加

SCD Type 6: ハイブリッド手法(Type 1 + Type 2 + Type 3)

概要: Type 1、Type 2、Type 3を組み合わせた包括的手法

特徴: - 完全な履歴保持(Type 2) - 現在値への高速アクセス(Type 1) - 直前値の比較(Type 3)

テーブル構造:

Players_Type6 テーブル:

player_key player_id name historical_team previous_team current_team start_date end_date is_current salary current_salary
1 P001 LeBron James Lakers Lakers Heat 2023-01-01 2024-03-01 N 44000000 46000000
2 P001 LeBron James Heat Lakers Heat 2024-03-01 9999-12-31 Y 46000000 46000000
3 P002 Stephen Curry Warriors Warriors Nets 2023-01-01 2024-02-01 N 48000000 52000000
4 P002 Stephen Curry Nets Warriors Nets 2024-02-01 9999-12-31 Y 52000000 52000000

メリット: 最大の柔軟性、多様な分析ニーズに対応
デメリット: 最も複雑、高いストレージコスト

SCD Type 7: ハイブリッド手法(Type 1 + Type 2 の統合キー)

概要: Type 1とType 2を組み合わせ、代理キーと自然キーの両方を提供

特徴: - 履歴管理(Type 2) - 現在データへの直接アクセス(Type 1) - 統合キーによる柔軟な結合

テーブル構造:

Players_Type7 テーブル:

surrogate_key natural_key current_key player_id name team start_date end_date is_current
1 1 2 P001 LeBron James Lakers 2023-01-01 2024-03-01 N
2 1 2 P001 LeBron James Heat 2024-03-01 9999-12-31 Y
3 3 4 P002 Stephen Curry Warriors 2023-01-01 2024-02-01 N
4 3 4 P002 Stephen Curry Nets 2024-02-01 9999-12-31 Y

クエリ例:

-- 履歴分析(natural_keyを使用)
SELECT * FROM Players_Type7 WHERE natural_key = 1;

-- 現在データのみ(current_keyを使用)
SELECT * FROM Players_Type7 WHERE surrogate_key = current_key;

メリット: 柔軟性と効率性のバランス
デメリット: 複雑なキー管理

まとめ:各SCDタイプの選択指針

SCDタイプ 適用場面 メリット デメリット
Type 1 履歴不要、最新データのみ必要 シンプル、高効率 履歴なし
Type 2 完全な履歴が必要 完全履歴、時系列分析可 ストレージ大、複雑
Type 3 直前値との比較が重要 シンプル、比較容易 限定履歴
Type 4 パフォーマンス重視 高速クエリ 管理複雑
Type 5 頻繁変化属性の分離 効率的、柔軟 設計複雑
Type 6 包括的な要件 最大柔軟性 最も複雑
Type 7 履歴と現在データ両方 バランス良好 キー管理複雑

実装時の考慮点

パフォーマンス:Type 2は大量データでクエリが重くなる可能性があります。適切なインデックス設計が重要です。

ストレージ:履歴を保持するタイプ(2,4,6,7)はストレージコストを考慮する必要があります。

ビジネス要件:法的要件や監査要件により履歴保持期間が決まる場合があります。

データ更新頻度:更新頻度が高い場合は、Type 5のミニディメンション手法を検討しましょう。

適切なSCDタイプの選択により、効率的で保守性の高いデータウェアハウスを構築できます。NBA選手データのような実世界のデータを通じて各手法を理解し、プロジェクトの要件に最適な手法を選択してください。

maturinによるPythonとRustの連携

前置き

sqlglotというSQLパーサーをいじっていたらバグを発見しました。
github.com

下記のように全角スペースを含むSQLをパースしようとするとパース出来ないエラーとなりました。(SELECTの後に全角スペースがあります。)

SELECT * FROM tbl;

しかし、以前試したときは特にエラーが起きないことを確認していたので違和感を覚えました。
verによる違いかなと考えましたが、changelogを見てもそれらしき差分が無かったためコードを深堀してみました。

結論としては、sqlglotのtokenizerは、Python実装のものとRust実装のものがあり、Rust実装のものは全角スペースを変数tokenと認識してしまっていたものによるものでした。
tokenizeの部分はPythonもRustもエラーを起こさずに通過しますが、Parseの部分でRustのtokenizerを通過したtokenはエラーとなるようです。
以前はPython実装のものを使っていたためエラーが出なかったようです。(インストールをGitHubに書かれているやり方ではなくPyPIでやったためこのような状況になったようです。)

Issueにあるようにvenvで環境を切り替えずともSQLGLOTRS_TOKENIZER環境変数で制御できるようです。
github.com



私がこの件で興味を持ったのは、PythonからRustを呼ぶ仕組みです。
この仕組みを習得すれば、Python言語のメリット(ライブラリが充実していること、簡単に書けること)を全面的に採用しつつ、パフォーマンスが要求される部分のみRust言語で書くという選択肢を得ることができそうです。

maturinの仕組み

調べたところmaturinというツールを使うことでPythonとRustを連携できるようです。
maturinとは、Rustで書かれたコードをPythonの拡張モジュールとしてビルド・配布するためのツールのようです。

こちらの記事にわかりやすい説明が記載されていました。
gihyo.jp

PyO3は、PythonとRustをバインディングするためのツールであり、maturinはそのPyO3を内部で利用し、RustプログラムをビルドしてPythonパッケージ化するツールという位置付けのようです。
また、Polarsやruffといった高速化を特徴とするツール(従来のpandasやblackに対抗するもの)も、PyO3を活用してRustとの連携が図られ、開発されているようです。

maturinの検証

venv環境を作成します。

$ cd project
$ python -m venv .venv
$ source .venv/bin/activate
$ pip install maturin

次にmaturinでプロジェクトの初期化を行います。

$ maturin init
$ tree
.
├── Cargo.toml
├── README.md
├── pyproject.toml
└── src
    └── lib.rs

1 directory, 4 files

src/lib.rsは下記のようになっています。

use pyo3::prelude::*;

/// Formats the sum of two numbers as string.
#[pyfunction]
fn sum_as_string(a: usize, b: usize) -> PyResult<String> {
    Ok((a + b).to_string())
}

/// A Python module implemented in Rust.
#[pymodule]
fn prime(m: &Bound<'_, PyModule>) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(sum_as_string, m)?)?;
    Ok(())
}

続いてRustプログラムをビルドします。
ビルドにはdevelopを使い、リリース時には--releaseフラグをつけることでデバッグ情報などを除外した最適化された生成物をリリースできるようです。
この操作により、.venvにバイナリがインストールされるようです。

$ maturin develop
~~

$ ls .venv/lib/python3.13/site-packages/prime/
 __init__.py  󰌠 __pycache__   prime.cpython-313-x86_64-linux-gnu.so

ちなみにprime.cpython-313-x86_64-linux-gnu.soのファイル形式の見方としては、

prime      .cpython-313     -x86_64     -linux-gnu     .so
──────      ───────────     ───────     ──────────     ───
モジュール  CPythonの       CPU         OS            共有オブ
名          バージョン      アーキ       環境           ジェクト
            (Python 3.13) テクチャ     (Linux)      ファイル
  • prime: Pythonモジュールとしてimport primeのように使用可能
  • .cpython-313: Pythonのバージョン
  • -x86_64: 64ビットのx86アーキテクチャ向けにコンパイルされていることを示す
  • -linux-gnu: LinuxのGNU Cライブラリ(glibc)を使用する環境向けであることを示す
  • .so: 共有オブジェクトファイル(Shared Object)の拡張子

プログラムサイズの削減やライブラリ更新の一元化を目的として、プログラムが実行時に動的にライブラリを読み込んで使用する方式を動的リンクと呼びます。
また、この動的リンクされた形式のファイルは共有オブジェクトファイルと呼ばれます。

次のようなmain.pyを作るとRustで記述したモジュールを呼ぶことができました。

import prime

if __name__ == "__main__":
    print(prime.sum_as_string(1, 2))

少し複雑なメソッドを追加するにはこのようにすると出来ました。

use pyo3::prelude::*;

/// Formats the sum of two numbers as string.
#[pyfunction]
fn sum_as_string(a: usize, b: usize) -> PyResult<String> {
    Ok((a + b).to_string())
}

#[pyfunction]
fn is_prime(n: u64) -> PyResult<bool> {
    // 2未満の数は素数ではない
    if n < 2 {
        return Ok(false);
    }
    
    // 2は素数
    if n == 2 {
        return Ok(true);
    }
    
    // 偶数は2以外素数ではない
    if n % 2 == 0 {
        return Ok(false);
    }
    
    // 3から数の平方根までの奇数で割り切れるかチェック
    let sqrt = (n as f64).sqrt() as u64;
    for i in (3..=sqrt).step_by(2) {
        if n % i == 0 {
            return Ok(false);
        }
    }
    
    Ok(true)
}

/// A Python module implemented in Rust.
#[pymodule]
fn prime(m: &Bound<'_, PyModule>) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(sum_as_string, m)?)?;
    m.add_function(wrap_pyfunction!(is_prime, m)?)?;
    Ok(())
}
import prime

if __name__ == "__main__":
    print(prime.sum_as_string(1, 2))
    print(prime.is_prime(7))  # True
    print(prime.is_prime(8))  # False

NBAスタッツデータ基盤を作ってた(る)話

概要

私はNBA(アメリカのプロバスケットボールリーグ)が大好きで、かれこれ10年近く情報を追い続けています。試合を観ることはもちろん好きですが、スタッツを分析することも一つの趣味となっています。

学生時代にはPythonを使った機械学習に取り組んでいたこともあり、「NBAのデータが手元にあれば、自分でスタッツを分析できるのになあ」と考えていました。エンジニアとしてのキャリアをスタートさせたのを機に、ついに自分自身でスタッツを分析するための基盤を構築することを決意しました。

これまで、スタッツ基盤を作っては改善し、また新しいアイデアを試すというサイクルを繰り返してきました。そして現在、第4次スタッツ基盤の構築に取り組んでいます。

この記事では、第3次までのスタッツ基盤の概要と、現在構想中の第4次スタッツ基盤について詳しくお伝えします。

第1次スタッツ基盤

はじめてNBAのスタッツ基盤を構築しようと思ったのは、エンジニア1年目のときでした。

学生時代、機械学習のデータ集めのためにPythonでWebスクレイピングをした経験はありましたが、データの管理方法としてはCSVやJSONといった静的ファイルに集約する方法しか知りませんでした。

業務で初めてMySQLを扱った際、データを効率的に管理できるその仕組みに感動しました。それまでCSVでしかデータを管理してこなかった私にとって、RDBの利便性は衝撃的でした。

この経験をきっかけに、RDBの学習を兼ねて第1次スタッツ基盤の構築に取り組むことになりました。

第1次スタッツ基盤は、下記のような構成です。

  • cronでPythonのスクレイピングプログラムを定期実行
    1. Seleniumでスタッツサイトを開く
    2. webスクレイピングしてスタッツ情報を取得
    3. ローカルに建てたMySQL tableにデータをSQLAlchemyで挿入
第1次スタッツ基盤

Seleniumを使っていた理由は、取得対象のスタッツサイトがフロントエンドレンダリングを行っていたためでした。
イメージとしては下記のようなテーブルをwebスクレイピングで情報を読み取っていました。

webページにこのようなテーブルが描画されるので、これをスクレイピングしてました

特に困難だったのは、webページによって表の自由度が高すぎてスクレイピングしづらいという問題です。下記の表をスクレイピングして、スタッツの行だけ取り出そうとすると大変だということが想像出来るかと思います。

表の自由度が高すぎてスクレイピングしづらい

また、想定していない行がいくつかあり、その度に対応できるようにプログラムを書き換えるなどの作業にも骨が折れました。

欠場は"Inactive"だけでなく"Did Not Dress"もある

第2次スタッツ基盤

この頃、YouTubeでは「説明動画」や「BarChartRace」と呼ばれるジャンルが非常に流行していました。これらのジャンルで驚くほどの再生回数を稼いでいる動画も多く見られました。また、NBAはYouTube上での素材利用に比較的寛容であることから、説明動画やBarChartRace動画との相性が良いと感じました。

そこで私は、「日々更新されるスタッツを基に動画を自動生成し、毎日投稿すればYouTubeドリームを楽して掴めるのではないか?」と本気で考え、動画作成パイプラインを重視したスタッツ基盤の再構築に着手しました。

スタッツ情報取得に関しては、nba_apiというものがあることを知り、webスクレイピングから切り替えました。
動画作成パイプラインについては、Adobe PhotoshopとAdobe Premiere ProをPythonで自動化するプログラムをつくり、手動で作ったテンプレート画像・動画のレイヤーを、入力スタッツに応じて書き換えられるようなプログラムを作りました。

第2次スタッツ基盤

動画生成パイプラインの詳細は下記の記事にまとめています。

動画に使う挿絵を自動で作る様子
BarChartRace動画をPremiere Proで自動作成する様子
BarChartRace動画のグラフの1位の時間ごとに表示する絵の置き換えをPremiere Proで制御している様子

出来上がった動画は下記のような感じです。今観るとつまらなすぎて我ながら恥ずかしいです。
各動画は動画作成パイプラインのおかげで、30分くらいで作成されています。
www.youtube.com
www.youtube.com
Fast API+D3.jsでLIVEダッシュボードを作り、試合のLIVE配信をしたりもしていました。
www.youtube.com

YouTubeドリームについては100動画を投稿してみて、38人しか登録者がつかなかったので夢観るのをやめました。
あと、動画のデザインセンスが壊滅的だったので、私には向いていないと思いました。

第3次スタッツ基盤

データエンジニアとなり、GCPでのデータ基盤の構築方法や、データレイクの概念を学びました。
そのため、データエンジニアリングの勉強という狙いを込め、下記の技術を使いました。

  • Terraform
  • BigQuery
  • Dagster
  • dbt
  • Looker Studio
第3次スタッツ基盤

BQのデータの様子はこんな感じになっています。

BQテーブルの様子

工夫した点としては、nba_apiを一般化して、(endpoint名, パラメタ辞書)引数のみですべてのendpointにアクセスできるようなラッパーを書きました。
また、デバッグ用とCloud Run関数用のAPIを切り替えられるようにもしました。

通常のnba_apiによるデータ取得は、欲しいAPIごとにインスタンスを変える必要がありました。

from nba_api.stats.endpoints import leaguegamelog, boxscoreadvancedv2
stats_leaguegamelog = leaguegamelog.LeagueGameLog().get_normalized_dict()
stats_boxscoreadvancedv2 = boxscoreadvancedv2.BoxScoreAdvancedV2(game_id="0022400635").get_normalized_dict()

print(stats_boxscoreadvancedv2["TeamStats"])
[{'GAME_ID': '0022400635', 'TEAM_ID': 1610612742, 'TEAM_NAME': 'Mavericks', 'TEAM_ABBREVIATION': 'DAL', 'TEAM_CITY': 'Dallas', 'MIN': '240.000000:00', 'E_OFF_RATING': 106.7, 'OFF_RATING': 110.3, 'E_DEF_RATING': 122.8, 'DEF_RATING': 124.5, 'E_NET_RATING': -16.1, 'NET_RATING': -14.2, 'AST_PCT': 0.462, 'AST_TOV': 1.38, 'AST_RATIO': 13.7, 'OREB_PCT': 0.346, 'DREB_PCT': 0.677, 'REB_PCT': 0.526, 'E_TM_TOV_PCT': 12.959, 'TM_TOV_PCT': 13.4, 'EFG_PCT': 0.489, 'TS_PCT': 0.533, 'USG_PCT': 1.0, 'E_USG_PCT': 0.199, 'E_PACE': 99.84, 'PACE': 97.5, 'PACE_PER40': 81.25, 'POSS': 97, 'PIE': 0.432}, {'GAME_ID': '0022400635', 'TEAM_ID': 1610612738, 'TEAM_NAME': 'Celtics', 'TEAM_ABBREVIATION': 'BOS', 'TEAM_CITY': 'Boston', 'MIN': '240.000000:00', 'E_OFF_RATING': 122.8, 'OFF_RATING': 124.5, 'E_DEF_RATING': 106.7, 'DEF_RATING': 110.3, 'E_NET_RATING': 16.1, 'NET_RATING': 14.2, 'AST_PCT': 0.75, 'AST_TOV': 6.6, 'AST_RATIO': 22.1, 'OREB_PCT': 0.323, 'DREB_PCT': 0.654, 'REB_PCT': 0.474, 'E_TM_TOV_PCT': 5.032, 'TM_TOV_PCT': 5.1, 'EFG_PCT': 0.524, 'TS_PCT': 0.548, 'USG_PCT': 1.0, 'E_USG_PCT': 0.198, 'E_PACE': 99.84, 'PACE': 97.5, 'PACE_PER40': 81.25, 'POSS': 98, 'PIE': 0.568}]

ラッパーによるデータ取得では、すべてrequestsモジュールからアクセスできます。また、URLをデバッグ用とCloudRun関数用に切り替えることができます。

import requests
response = requests.get(url="http://localhost:8080", json={"endpoint": "leaguegamelog"})
response = requests.get(url="http://localhost:8080", json={"endpoint": "boxscoreadvancedv2", "params": {"game_id": "0022400635"}})  # game_id以外のパラメタはデフォルトパラメタが適用される

print(response.json()["TeamStats"])
[{'AST_PCT': 0.462, 'AST_RATIO': 13.7, 'AST_TOV': 1.38, 'DEF_RATING': 124.5, 'DREB_PCT': 0.677, 'EFG_PCT': 0.489, 'E_DEF_RATING': 122.8, 'E_NET_RATING': -16.1, 'E_OFF_RATING': 106.7, 'E_PACE': 99.84, 'E_TM_TOV_PCT': 12.959, 'E_USG_PCT': 0.199, 'GAME_ID': '0022400635', 'MIN': '240.000000:00', 'NET_RATING': -14.2, 'OFF_RATING': 110.3, 'OREB_PCT': 0.346, 'PACE': 97.5, 'PACE_PER40': 81.25, 'PIE': 0.432, 'POSS': 97, 'REB_PCT': 0.526, 'TEAM_ABBREVIATION': 'DAL', 'TEAM_CITY': 'Dallas', 'TEAM_ID': 1610612742, 'TEAM_NAME': 'Mavericks', 'TM_TOV_PCT': 13.4, 'TS_PCT': 0.533, 'USG_PCT': 1.0}, {'AST_PCT': 0.75, 'AST_RATIO': 22.1, 'AST_TOV': 6.6, 'DEF_RATING': 110.3, 'DREB_PCT': 0.654, 'EFG_PCT': 0.524, 'E_DEF_RATING': 106.7, 'E_NET_RATING': 16.1, 'E_OFF_RATING': 122.8, 'E_PACE': 99.84, 'E_TM_TOV_PCT': 5.032, 'E_USG_PCT': 0.198, 'GAME_ID': '0022400635', 'MIN': '240.000000:00', 'NET_RATING': 14.2, 'OFF_RATING': 124.5, 'OREB_PCT': 0.323, 'PACE': 97.5, 'PACE_PER40': 81.25, 'PIE': 0.568, 'POSS': 98, 'REB_PCT': 0.474, 'TEAM_ABBREVIATION': 'BOS', 'TEAM_CITY': 'Boston', 'TEAM_ID': 1610612738, 'TEAM_NAME': 'Celtics', 'TM_TOV_PCT': 5.1, 'TS_PCT': 0.548, 'USG_PCT': 1.0}]


このラッパーのコアの実装は下記のようになっています。基本はnba_apiのデフォルトのパラメタが指定され、paramsが指定されていれば、そこだけ置き換わるようになります。

import inspect
from typing import Any

from nba_api.stats import endpoints


def request_nba_api(
    api_name: str, params: dict[str, Any] | None = None
) -> dict[str, Any] | None:
    """nba_apiのendpoint名とそのendpointへのリクエストに対するパラメタを受取り, nba_apiを叩く"""
    endpoint = getattr(endpoints, api_name)

    # endpointのクラス名を取得
    class_name: str = dir(endpoint)[[c.lower() for c in dir(endpoint)].index(api_name)]

    # Endpointクラスの__init__メソッドの引数とデフォルト値を取得
    _class = getattr(endpoint, class_name)
    signature = inspect.signature(_class.__init__)
    _params = dict()
    for k, v in signature.parameters.items():
        if k == "self":
            continue
        if params and k in params:
            _params[k] = params[k]
        else:
            _params[k] = v.default

    try:
        request: dict[str, Any] = _class(**_params).get_normalized_dict()
        return request
    except Exception as e:
        print(f"Error: {e}")

Dailyでパーティションを切っていましたが、1年で試合が約200日あり、それが1960年代から現代まで続いているため、単純計算でも60年 x 200日/年 = 1.2万パーティションとなります。そのため、BigQueryのテーブルを分割せざるを得ませんでした。

DagsterにおいてもDailyでパーティションを実装しましたが、1.2万のタスクが発生することでジョブごとの重複処理が無視できないレベルに達してしまいました。そこで、初期化ジョブとして、パーティション自体はDailyとしつつも、データの挿入単位をシーズン単位とするジョブを実装しました。

ただし、パーティションテーブルをWRITE_TRUNCATEで置き換える方法ではなく、増分更新を活用する形でもっと効率的に実現できたのではないかと考えています。

シーズン単位で入力できるジョブ

dbtは今回初めて使いましたが、非常に便利で感動しました。特にFastAPIのように、実装がそのままドキュメントに結びついているところや、ドキュメント化することで、descriptionやlineageが追える点です。
今後構築する際は、descriptionの伝播ができると言われているdbt-osmosisを採用したいと思っています。

dbtの様子

第4次スタッツ基盤

第4次スタッツ基盤では、第3次スタッツ基盤の課題を解決することをコンセプトにしています。
また、生成AIを使ったデータレイヤの構築を構想しています。現在は個々の技術に対してキャッチアップしているような状態です。

第4次スタッツ基盤で実現したいことは下記の通りです。

  • dbtのプラグインを活用する
  • Snowflakeを使う
  • 個人で運用できるような簡易的なワークフローにする
  • TerraformのCI/CDツールを導入する
  • DuckDBでローカル環境でのテストを行う
  • 生成AIによるスタッツ提案
  • Icebergの導入

dbtのプラグインを活用する

dbtのdocument化に力を入れたいです。とくにdbt-osmosisを使い、descriptionを可能な限り入力しておきたいです。description入力により、生成AIによるクエリ生成の恩恵を受けやすい環境を目指したいです。

Snowflakeを使う

BigQueryではDailyパーティションを切りましたが、パーティションを指定したクエリと指定しないクエリを比較しても、スキャン量に大きな違いが見られませんでした。

そのため、パーティションを意識せずにクエリを書くことができるDWHの機能を提供するSnowflakeの採用を検討しています。Snowflakeにはマイクロパーティションというデータの管理単位があり、この管理単位に対してメタデータが生成されます。クエリ実行時には、このメタデータを参照して、どのマイクロパーティションにアクセスするかを自動で判断してくれる仕組みがあるため、パーティションを意識せずとも自動的にスキャン量を減らしてくれます。

個人で運用できるような簡易的なワークフローにする

今回、ワークフローツールとして、Dagsterをローカルで運用していました。
ぶっちゃけ、Dagsterを個人で運用するのは、かなり面倒でした。
Cloud Composerを建てたり、GKEでホストすることも考えましたが、コスト面での心配も
GCPであればCloud Workflowなどより簡単に運用できそうなツールを代用していきたいです。

TerraformのCI/CDツールを導入する

今までは自身でplanとapplyを実行していましたが、少しでもラクにしたいという思いがあります。
Atlantisやtfcmtなどのツールを導入したいですが、Atlantisはサーバーを建てる必要があり個人開発では負担が大きそうなので、tfcmtを導入したいと考えています。

DuckDBでローカル環境でのテストを行う

第3次データ基盤のdbtでは本番環境しか持たなかったため、今後はdev環境も構築したいと考えています。最近では、dev用環境としてDuckDBを活用する記事を多く目にするようになりました。技術検証も兼ねて、試してみたいと思います。

ちなみに、ローカル環境でもParquetファイルを用いることで、140万行に対して高速にクエリを実行できることが確認できているため、問題なくローカルにdev環境を建てられるかと思います。

DuckDBならローカルで140万行あっても高速にクエリが実行できる

生成AIによるスタッツ提案

生成AIを使って、DMに該当するクエリを作れないか考えています。
この記事のように、その場限りでしか使わないようなスタッツデータを試合が終わるごとに生成できないか考えています。
memo.geso.site

astとtokenizeによるPythonコードの構造化

概要

下記のPythonコードをJSONに変換する方法を考えます。

from dataclasses import dataclass


@dataclass
class LogA:
    """Aの情報を表すログ"""

    __logname__ = "log_a"

    a1: int  # Aの1つ目の情報
    a2: str  # Aの2つ目の情報


@dataclass
class LogB:
    """Bの情報を表すログ"""

    __logname__ = "log_b"

    b1: int  # Bの1つ目の情報
    b2: str
    b3: float  # Bの3つ目の情報


@dataclass
class LogC:
    """Cの情報を表すログ"""

    __logname__ = "log_c"

    c1: int
    c2: str  # Cの2つ目の情報
    c3: float  # Cの3つ目の情報
    c4: bool  # Cの4つ目の情報
{
    "name": "log_a",
    "desc": "Aの情報を表すログ",
    "fields": [
        {"name": "a1", "desc": "Aの1つ目の情報"},
        {"name": "a2", "desc": "Aの2つ目の情報"},
    ]
}
{
    "name": "log_b",
    "desc": "Bの情報を表すログ",
    "fields": [
        {"name": "b1", "desc": "Bの1つ目の情報"},
        {"name": "b2", "desc": ""},
        {"name": "b3", "desc": "Bの3つ目の情報"},
    ]
}
{
    "name": "log_c",
    "desc": "Cの情報を表すログ",
    "fields": [
        {"name": "c1", "desc": ""},
        {"name": "c2", "desc": "Cの2つ目の情報"},
        {"name": "c3", "desc": "Cの3つ目の情報"},
        {"name": "c4", "desc": "Cの4つ目の情報"},
    ]
}

字句解析と構文解析

Pythonコードをプログラムとして動かす場合、ソースコードに対して字句解析が行われ、次に字句解析の結果に対して構文解析が行われます。

字句解析

字句解析とは、コードを入力として、トークンと呼ばれるコードの最小構成単位のリストに変換する処理です。
この時点ではクラスや関数などPythonコードにおける意味をなす単位でまとまっていません。

字句解析対象のPythonコード

# sample/sample.py
def plus_one(x: int) -> int:
    """引数に1を加えて返す"""
    return x + 1

a = 2  # aを初期化
plus_one(a)  # aに1を加える

字句解析を行うPythonコード

import tokenize

with tokenize.open("sample/sample.py") as f:
    tokens = tokenize.generate_tokens(f.readline)
    for token in tokens:
        print(token)

字句解析の結果

TokenInfo(type=1 (NAME), string='def', start=(1, 0), end=(1, 3), line='def plus_one(x: int) -> int:\n')
TokenInfo(type=1 (NAME), string='plus_one', start=(1, 4), end=(1, 12), line='def plus_one(x: int) -> int:\n')
TokenInfo(type=55 (OP), string='(', start=(1, 12), end=(1, 13), line='def plus_one(x: int) -> int:\n')
TokenInfo(type=1 (NAME), string='x', start=(1, 13), end=(1, 14), line='def plus_one(x: int) -> int:\n')
TokenInfo(type=55 (OP), string=':', start=(1, 14), end=(1, 15), line='def plus_one(x: int) -> int:\n')
TokenInfo(type=1 (NAME), string='int', start=(1, 16), end=(1, 19), line='def plus_one(x: int) -> int:\n')
TokenInfo(type=55 (OP), string=')', start=(1, 19), end=(1, 20), line='def plus_one(x: int) -> int:\n')
TokenInfo(type=55 (OP), string='->', start=(1, 21), end=(1, 23), line='def plus_one(x: int) -> int:\n')
TokenInfo(type=1 (NAME), string='int', start=(1, 24), end=(1, 27), line='def plus_one(x: int) -> int:\n')
TokenInfo(type=55 (OP), string=':', start=(1, 27), end=(1, 28), line='def plus_one(x: int) -> int:\n')
TokenInfo(type=4 (NEWLINE), string='\n', start=(1, 28), end=(1, 29), line='def plus_one(x: int) -> int:\n')
TokenInfo(type=5 (INDENT), string='    ', start=(2, 0), end=(2, 4), line='    """引数に1を加えて返す"""\n')
TokenInfo(type=3 (STRING), string='"""引数に1を加えて返す"""', start=(2, 4), end=(2, 20), line='    """引数に1を加えて返す"""\n')
TokenInfo(type=4 (NEWLINE), string='\n', start=(2, 20), end=(2, 21), line='    """引数に1を加えて返す"""\n')
TokenInfo(type=1 (NAME), string='return', start=(3, 4), end=(3, 10), line='    return x + 1\n')
TokenInfo(type=1 (NAME), string='x', start=(3, 11), end=(3, 12), line='    return x + 1\n')
TokenInfo(type=55 (OP), string='+', start=(3, 13), end=(3, 14), line='    return x + 1\n')
TokenInfo(type=2 (NUMBER), string='1', start=(3, 15), end=(3, 16), line='    return x + 1\n')
TokenInfo(type=4 (NEWLINE), string='\n', start=(3, 16), end=(3, 17), line='    return x + 1\n')
TokenInfo(type=63 (NL), string='\n', start=(4, 0), end=(4, 1), line='\n')
TokenInfo(type=63 (NL), string='\n', start=(5, 0), end=(5, 1), line='\n')
TokenInfo(type=6 (DEDENT), string='', start=(6, 0), end=(6, 0), line='a = 2  # aを初期化\n')
TokenInfo(type=1 (NAME), string='a', start=(6, 0), end=(6, 1), line='a = 2  # aを初期化\n')
TokenInfo(type=55 (OP), string='=', start=(6, 2), end=(6, 3), line='a = 2  # aを初期化\n')
TokenInfo(type=2 (NUMBER), string='2', start=(6, 4), end=(6, 5), line='a = 2  # aを初期化\n')
TokenInfo(type=62 (COMMENT), string='# aを初期化', start=(6, 7), end=(6, 14), line='a = 2  # aを初期化\n')
TokenInfo(type=4 (NEWLINE), string='\n', start=(6, 14), end=(6, 15), line='a = 2  # aを初期化\n')
TokenInfo(type=1 (NAME), string='plus_one', start=(7, 0), end=(7, 8), line='plus_one(a)  # aに1を加える\n')
TokenInfo(type=55 (OP), string='(', start=(7, 8), end=(7, 9), line='plus_one(a)  # aに1を加える\n')
TokenInfo(type=1 (NAME), string='a', start=(7, 9), end=(7, 10), line='plus_one(a)  # aに1を加える\n')
TokenInfo(type=55 (OP), string=')', start=(7, 10), end=(7, 11), line='plus_one(a)  # aに1を加える\n')
TokenInfo(type=62 (COMMENT), string='# aに1を加える', start=(7, 13), end=(7, 22), line='plus_one(a)  # aに1を加える\n')
TokenInfo(type=4 (NEWLINE), string='\n', start=(7, 22), end=(7, 23), line='plus_one(a)  # aに1を加える\n')
TokenInfo(type=0 (ENDMARKER), string='', start=(8, 0), end=(8, 0), line='')

上記を見ると、各トークンは以下の情報を持っていることがわかります。

  • type
    • NAME:識別子
    • OP:記号
    • COMMENT:コメント
  • string:トークンが表す文字列
  • start:文字列の始まり(y, x)
  • end:文字列の終わり(y, x)
  • line:トークンが属するPythonコードの1行

各トークンは自身の位置情報を持っているため、このトークン列からソースコードへは逆変換可能です。
また、トークンの状態では、コメント情報も保持されたままであることが分かります。

構文解析

構文解析は入力をトークンリストとし、出力を抽象構文木(AST: Abstract Syntax Tree)とする変換器です。
上記のトークンリストから抽象構文木を作っていきます。
defなどの予約語を解釈して、関数やクラスなどの単位ごとに木を構成していくイメージです。
また、ASTではプログラムの実行に直接関係ないコメントが廃棄されます。

構文解析を行うコード

import ast

def print_ast_nodes(node, indent=0):
    """ASTノードを再帰的に表示する"""
    print(
        "  " * indent
        + f"{type(node).__name__}: {ast.dump(node, annotate_fields=True, include_attributes=True)}"
    )
    for child in ast.iter_child_nodes(node):
        print_ast_nodes(child, indent + 1)

# ソースコードを取得
with open("sample/sample.py", "r") as f:
    source_code = f.read()

# ASTを作成
tree = ast.parse(source_code)

# ASTノードを表示
print_ast_nodes(tree)

構文解析結果

Module: Module(body=[FunctionDef(name='plus_one', args=arguments(args=[arg(arg='x', annotation=Name(id='int', ctx=Load(), lineno=1, col_offset=16, end_lineno=1, end_col_offset=19), lineno=1, col_offset=13, end_lineno=1, end_col_offset=19)]), body=[Expr(value=Constant(value='引数に1を加えて返す', lineno=2, col_offset=4, end_lineno=2, end_col_offset=38), lineno=2, col_offset=4, end_lineno=2, end_col_offset=38), Return(value=BinOp(left=Name(id='x', ctx=Load(), lineno=3, col_offset=11, end_lineno=3, end_col_offset=12), op=Add(), right=Constant(value=1, lineno=3, col_offset=15, end_lineno=3, end_col_offset=16), lineno=3, col_offset=11, end_lineno=3, end_col_offset=16), lineno=3, col_offset=4, end_lineno=3, end_col_offset=16)], returns=Name(id='int', ctx=Load(), lineno=1, col_offset=24, end_lineno=1, end_col_offset=27), lineno=1, col_offset=0, end_lineno=3, end_col_offset=16), Assign(targets=[Name(id='a', ctx=Store(), lineno=6, col_offset=0, end_lineno=6, end_col_offset=1)], value=Constant(value=2, lineno=6, col_offset=4, end_lineno=6, end_col_offset=5), lineno=6, col_offset=0, end_lineno=6, end_col_offset=5), Expr(value=Call(func=Name(id='plus_one', ctx=Load(), lineno=7, col_offset=0, end_lineno=7, end_col_offset=8), args=[Name(id='a', ctx=Load(), lineno=7, col_offset=9, end_lineno=7, end_col_offset=10)], lineno=7, col_offset=0, end_lineno=7, end_col_offset=11), lineno=7, col_offset=0, end_lineno=7, end_col_offset=11)])
  FunctionDef: FunctionDef(name='plus_one', args=arguments(args=[arg(arg='x', annotation=Name(id='int', ctx=Load(), lineno=1, col_offset=16, end_lineno=1, end_col_offset=19), lineno=1, col_offset=13, end_lineno=1, end_col_offset=19)]), body=[Expr(value=Constant(value='引数に1を加えて返す', lineno=2, col_offset=4, end_lineno=2, end_col_offset=38), lineno=2, col_offset=4, end_lineno=2, end_col_offset=38), Return(value=BinOp(left=Name(id='x', ctx=Load(), lineno=3, col_offset=11, end_lineno=3, end_col_offset=12), op=Add(), right=Constant(value=1, lineno=3, col_offset=15, end_lineno=3, end_col_offset=16), lineno=3, col_offset=11, end_lineno=3, end_col_offset=16), lineno=3, col_offset=4, end_lineno=3, end_col_offset=16)], returns=Name(id='int', ctx=Load(), lineno=1, col_offset=24, end_lineno=1, end_col_offset=27), lineno=1, col_offset=0, end_lineno=3, end_col_offset=16)
    arguments: arguments(args=[arg(arg='x', annotation=Name(id='int', ctx=Load(), lineno=1, col_offset=16, end_lineno=1, end_col_offset=19), lineno=1, col_offset=13, end_lineno=1, end_col_offset=19)])
      arg: arg(arg='x', annotation=Name(id='int', ctx=Load(), lineno=1, col_offset=16, end_lineno=1, end_col_offset=19), lineno=1, col_offset=13, end_lineno=1, end_col_offset=19)
        Name: Name(id='int', ctx=Load(), lineno=1, col_offset=16, end_lineno=1, end_col_offset=19)
          Load: Load()
    Expr: Expr(value=Constant(value='引数に1を加えて返す', lineno=2, col_offset=4, end_lineno=2, end_col_offset=38), lineno=2, col_offset=4, end_lineno=2, end_col_offset=38)
      Constant: Constant(value='引数に1を加えて返す', lineno=2, col_offset=4, end_lineno=2, end_col_offset=38)
    Return: Return(value=BinOp(left=Name(id='x', ctx=Load(), lineno=3, col_offset=11, end_lineno=3, end_col_offset=12), op=Add(), right=Constant(value=1, lineno=3, col_offset=15, end_lineno=3, end_col_offset=16), lineno=3, col_offset=11, end_lineno=3, end_col_offset=16), lineno=3, col_offset=4, end_lineno=3, end_col_offset=16)
      BinOp: BinOp(left=Name(id='x', ctx=Load(), lineno=3, col_offset=11, end_lineno=3, end_col_offset=12), op=Add(), right=Constant(value=1, lineno=3, col_offset=15, end_lineno=3, end_col_offset=16), lineno=3, col_offset=11, end_lineno=3, end_col_offset=16)
        Name: Name(id='x', ctx=Load(), lineno=3, col_offset=11, end_lineno=3, end_col_offset=12)
          Load: Load()
        Add: Add()
        Constant: Constant(value=1, lineno=3, col_offset=15, end_lineno=3, end_col_offset=16)
    Name: Name(id='int', ctx=Load(), lineno=1, col_offset=24, end_lineno=1, end_col_offset=27)
      Load: Load()
  Assign: Assign(targets=[Name(id='a', ctx=Store(), lineno=6, col_offset=0, end_lineno=6, end_col_offset=1)], value=Constant(value=2, lineno=6, col_offset=4, end_lineno=6, end_col_offset=5), lineno=6, col_offset=0, end_lineno=6, end_col_offset=5)
    Name: Name(id='a', ctx=Store(), lineno=6, col_offset=0, end_lineno=6, end_col_offset=1)
      Store: Store()
    Constant: Constant(value=2, lineno=6, col_offset=4, end_lineno=6, end_col_offset=5)
  Expr: Expr(value=Call(func=Name(id='plus_one', ctx=Load(), lineno=7, col_offset=0, end_lineno=7, end_col_offset=8), args=[Name(id='a', ctx=Load(), lineno=7, col_offset=9, end_lineno=7, end_col_offset=10)], lineno=7, col_offset=0, end_lineno=7, end_col_offset=11), lineno=7, col_offset=0, end_lineno=7, end_col_offset=11)
    Call: Call(func=Name(id='plus_one', ctx=Load(), lineno=7, col_offset=0, end_lineno=7, end_col_offset=8), args=[Name(id='a', ctx=Load(), lineno=7, col_offset=9, end_lineno=7, end_col_offset=10)], lineno=7, col_offset=0, end_lineno=7, end_col_offset=11)
      Name: Name(id='plus_one', ctx=Load(), lineno=7, col_offset=0, end_lineno=7, end_col_offset=8)
        Load: Load()
      Name: Name(id='a', ctx=Load(), lineno=7, col_offset=9, end_lineno=7, end_col_offset=10)
        Load: Load()

astによる構造化

astは構文解析結果にアクセスできるPythonモジュールです。
構文解析木は関数やクラスなどプログラムとしてのまとまりがある形で情報を提供してくれる代わりに、コメントが廃棄されるという特徴がありました。
従って、下記のようなコメントのないクラスを構造化するときに役に立ちます。

from sqlalchemy.schema import Column
from sqlalchemy.types import Integer, String, Date, Float
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()


class Product(Base):
    """製品を表すテーブル"""

    __tablename__ = "product"  # テーブル名を指定
    product_id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(255), nullable=False, doc="製品名")
    description = Column(String(500), doc="製品説明")
    price = Column(Float, nullable=False, doc="価格")
    stock_quantity = Column(Integer, default=0, doc="在庫数")


class Order(Base):
    """注文を表すテーブル"""

    __tablename__ = "order"  # テーブル名を指定
    order_id = Column(Integer, primary_key=True, autoincrement=True)
    user_id = Column(Integer, nullable=False, doc="ユーザーID")
    order_date = Column(Date, nullable=False, doc="注文日")
    total_amount = Column(Float, nullable=False, doc="合計金額")


class Review(Base):
    """レビューを表すテーブル"""

    __tablename__ = "review"  # テーブル名を指定
    review_id = Column(Integer, primary_key=True, autoincrement=True)
    product_id = Column(Integer, nullable=False, doc="製品ID")
    user_id = Column(Integer, nullable=False, doc="ユーザーID")
    rating = Column(Integer, nullable=False, doc="評価 (1-5)")
    comment = Column(String(500), doc="コメント")
    review_date = Column(Date, nullable=False, doc="レビュー日")


class Sample:
    """Baseを継承していないクラス"""

    hoge = 1

astにはvisitor patternというものがあり、ast.NodeVisitorを継承したクラスでvisit_ClassDef(self, node)というメソッドをオーバーライドすると、構文解析木の全ノードのうちclassのnodeのみにアクセスできるようになります。
visit_ClassDefメソッド内で、「Baseクラスを継承しているクラスか?」を判定し、対象としているクラスであればdocstringなどを取得するという処理を書くことでメタデータを取得することができます。

import ast
import json


class ModelVisitor(ast.NodeVisitor):
    def __init__(self):
        self.models = []

    def visit_ClassDef(self, node):
        # Baseクラスを継承しているか
        if not any(
            base.id == "Base" for base in node.bases if isinstance(base, ast.Name)
        ):
            return

        # 取得したい情報を初期化
        model_info = {
            "name": "",
            "description": ast.get_docstring(node) or "",
            "fields": [],
        }

        # クラス内のノードをたどる
        for class_node in node.body:
            # """で囲まれている部分はskip
            if isinstance(class_node, ast.Expr) and isinstance(
                class_node.value, ast.Str
            ):
                continue

            # __tablename__を取得
            if (
                isinstance(class_node, ast.Assign)
                and len(class_node.targets) > 0
                and isinstance(class_node.targets[0], ast.Name)
                and class_node.targets[0].id == "__tablename__"
                and isinstance(class_node.value, ast.Str)
            ):
                model_info["name"] = class_node.value.s

            # カラム情報を取得
            elif isinstance(class_node, ast.Assign):
                if (
                    len(class_node.targets) > 0
                    and isinstance(class_node.targets[0], ast.Name)
                    and isinstance(class_node.value, ast.Call)
                    and isinstance(class_node.value.func, ast.Name)
                    and class_node.value.func.id == "Column"
                ):

                    column_name = class_node.targets[0].id
                    column_desc = ""

                    for keyword in class_node.value.keywords:
                        if keyword.arg == "doc" and isinstance(keyword.value, ast.Str):
                            column_desc = keyword.value.s

                    model_info["fields"].append(
                        {"name": column_name, "desc": column_desc}
                    )

        # modelsに追加
        if model_info["name"]:
            self.models.append(model_info)


def extract_model_info(source_code):
    tree = ast.parse(source_code)
    visitor = ModelVisitor()
    visitor.visit(tree)
    return visitor.models


if __name__ == "__main__":
    with open("sample/models.py", "r", encoding="utf-8") as f:
        source_code = f.read()

    models = extract_model_info(source_code)

    print(json.dumps(models, ensure_ascii=False, indent=2))

結果は次のようになります。

[
  {
    "name": "product",
    "desc": "製品を表すテーブル",
    "fields": [
      {"name": "product_id", "desc": ""},
      {"name": "name", "desc": "製品名"},
      {"name": "description", "desc": "製品説明"},
      {"name": "price", "desc": "価格"},
      {"name": "stock_quantity", "desc": "在庫数"}
    ]
  },
  {
    "name": "order",
    "desc": "注文を表すテーブル",
    "fields": [
      {"name": "order_id", "desc": ""},
      {"name": "user_id", "desc": "ユーザーID"},
      {"name": "order_date", "desc": "注文日"},
      {"name": "total_amount", "desc": "合計金額"}
    ]
  },
  {
    "name": "review",
    "desc": "レビューを表すテーブル",
    "fields": [
      {"name": "review_id", "desc": ""},
      {"name": "product_id", "desc": "製品ID"},
      {"name": "user_id", "desc": "ユーザーID"},
      {"name": "rating", "desc": "評価 (1-5)"},
      {"name": "comment", "desc": "コメント"},
      {"name": "review_date", "desc": "レビュー日"}
    ]
  }
]

astとtokenizeによる構造化

astでの構造化ではコメント情報が取得できませんでした。
最初に示したようなコメントにフィールド情報が書かれているようなコードに対しては字句解析を扱うtokenizeモジュールを使う必要があります。

from dataclasses import dataclass


@dataclass
class LogA:
    """Aの情報を表すログ"""

    __logname__ = "log_a"

    a1: int  # Aの1つ目の情報
    a2: str  # Aの2つ目の情報


@dataclass
class LogB:
    """Bの情報を表すログ"""

    __logname__ = "log_b"

    b1: int  # Bの1つ目の情報
    b2: str
    b3: float  # Bの3つ目の情報


@dataclass
class LogC:
    """Cの情報を表すログ"""

    __logname__ = "log_c"

    c1: int
    c2: str  # Cの2つ目の情報
    c3: float  # Cの3つ目の情報
    c4: bool  # Cの4つ目の情報

メタデータ取得処理としては、まずtokenizeによってコメントが出現する行番号を記録します。
次にastのvisit patternによって、クラスごとにノードを辿っていきます。
各フィールドの行数に対して、同じ行にtokenizeで記録したコメントがあればそのコメントをフィールドの説明として用いるというものです。

import ast
import json
import tokenize
from io import StringIO


class LogVisitor(ast.NodeVisitor):
    def __init__(self, code):
        self.code = code
        self.tokens = list(tokenize.generate_tokens(StringIO(code).readline))
        self.logs = []
        self.current_line = None  # 現在処理している行番号
        self.comments = {}

        # トークンからコメントを抽出
        self.extract_comments()

    def extract_comments(self):
        """トークンを走査して、行ごとのコメントを抽出"""
        for token in self.tokens:
            if token.type == tokenize.COMMENT:
                self.comments[token.start[0]] = token.string.strip("# ").strip()

    def visit_ClassDef(self, node):
        """クラス定義を訪問して、クラス名とそのフィールド情報を収集"""
        print(ast.dump(node))

        # クラスの説明を取得
        class_desc = ""
        if isinstance(node.body[0], ast.Expr) and isinstance(
            node.body[0].value, ast.Str
        ):
            class_desc = node.body[0].value.s.strip()

        # クラス名とその説明
        log = {"name": node.name.lower(), "desc": class_desc, "fields": []}

        # クラスのフィールド(アサインメント)を処理
        for body_node in node.body:
            if isinstance(body_node, ast.AnnAssign):
                column_name = body_node.target.id
                line_no = body_node.lineno
                column_desc = self.get_comment_for_line(line_no)
                log["fields"].append({"name": column_name, "desc": column_desc})

        self.logs.append(log)
        self.generic_visit(node)

    def get_comment_for_line(self, line_number):
        """指定された行番号にあるコメントを取得"""
        return self.comments.get(line_number, "")


def extract_log_info():
    # 入力のPythonコード
    with open("sample/sample.py") as f:
        code = f.read()

    # AST解析とトークン化を行い、ログ情報を取得
    visitor = LogVisitor(code)
    visitor.visit(ast.parse(code))

    # 結果の出力
    print(json.dumps(visitor.logs, ensure_ascii=False, indent=2))


if __name__ == "__main__":
    extract_log_info()

結果はこのようになります。

[
  {
    "name": "loga",
    "desc": "Aの情報を表すログ",
    "fields": [
      {
        "name": "a1",
        "desc": "Aの1つ目の情報"
      },
      {
        "name": "a2",
        "desc": "Aの2つ目の情報"
      }
    ]
  },
  {
    "name": "logb",
    "desc": "Bの情報を表すログ",
    "fields": [
      {
        "name": "b1",
        "desc": "Bの1つ目の情報"
      },
      {
        "name": "b2",
        "desc": ""
      },
      {
        "name": "b3",
        "desc": "Bの3つ目の情報"
      }
    ]
  },
  {
    "name": "logc",
    "desc": "Cの情報を表すログ",
    "fields": [
      {
        "name": "c1",
        "desc": ""
      },
      {
        "name": "c2",
        "desc": "Cの2つ目の情報"
      },
      {
        "name": "c3",
        "desc": "Cの3つ目の情報"
      },
      {
        "name": "c4",
        "desc": "Cの4つ目の情報"
      }
    ]
  }

生成AIを活用して複雑なスタッツを分析する

概要

この記事では、生成AI(Chat GPT無料枠)を用いてNBAのスタッツを分析した際の知見を記しています。

NBAスタッツ分析について

NBAでは、トラッキング技術により試合のあらゆるイベントが記録されています。
例えば、八村塁選手のデータに関して下記のような情報にアクセスすることができます。

数年間NBAに在籍していた渡邊雄太選手曰く、試合前にチームから分厚いルールブックが渡され、
そこには「相手選手Aは、左手のシュートの成功率が低いため、左手でシュートを打たせるように右側にディフェンスせよ」などスタッツをもとにした戦術がいくつも書かれており、それを頭に叩き込んで試合に臨んでいたようです。

チームがスタッツに注目する一方で、メディアやファンもスタッツに注目をしています。
例えば「ステフィン・カリーが3P成功数でレイ・アレンの2973本を越して歴代1位に」(キャリア総合ランキング)や「ルカ・ドンチッチが73得点をとり1試合の得点数で歴代4位に」(1試合における最高得点のランキング)などは王道スタッツとして、これらが更新されると大いに盛り上がります。
www.youtube.com

その一方で、王道スタッツほど単純なスタッツではないですが、選手やチームが「なにかスゴイ活躍をした」際に「どうすごいのか?」を、複数のスタッツや期間などを組合せて説明するニッチなスタッツもNBAを楽しむ要素となります。
例えば、

  • クリスマスの日に40得点以上、15リバウンド以上、5回以上の3Pを決めたのはウェンバンヤマのみ

  • 1シーズンに60得点を3回以上達成した選手は歴代、リラードとチェンバレンのみ

  • ヨキッチとマレーは共に30得点のトリプルダブル(得点・アシスト・リバウンドが2桁)をした史上初のチームメイト

  • CavsはWarriors以来初めて開幕から無敗で10連勝した


などが挙げられます。

分析の動機

王道スタッツであれば、スタッツサイトで簡単に確認できます。
しかし、ニッチなスタッツの場合、複数の条件を組み合わせたり、グルーピングが必要になるため、スタッツサイトでは確認が困難です。
例えば、「1シーズンで60得点を3回以上達成した選手は3人だが、50得点の場合はどれくらいいるのか?」や「同じチームでトリプルダブルを3人が達成した試合はあるのだろうか?」といった疑問は、発表されたスタッツだけでは答えが得られません。
そこで、自分自身でスタッツ収集基盤を構築し、いつでも計算できる環境を整えました。

こうしたニッチなスタッツを計算するには、複雑なSQLが必要です。
今回は効率化を図るために、生成AIを活用してSQLを作成することにしました。
そのためには、まず対象となるテーブルの情報を生成AIに提供する必要があります。

スタッツの計算

ChatGPTへの事前入力内容 前提

今回、検証するのはleaguegamelog_playerleaguegamelog_teamというテーブルです。
このテーブルには、選手単位のスタッツ、チーム単位のスタッツが1960年から現在にかけて全試合分入っています。

ChatGPTでクエリを書いてもらう上で、まずテーブルについての事前情報を説明します。
説明内容は2つで

  • メタデータ
  • データ

になります。

メタデータ

  • カラム名
  • カラムの型
  • NULL許容
  • カラムの説明

などを言います。
データとは、実際にテーブルに入っているレコードを言います。すべてのレコードを入力するわけには行かないので、入力するのは数行で問題ないと思っています。

本来、メタデータの情報が充実していれば、事前知識として与えるのはメタデータのみで十分だと考えられています。
現にGemini in BigQueryなどのクエリジェネレータのための生成AIはメタデータのみしか参照しないようです。

ただし、メタデータの情報だけでは、AIに事前知識を十分に与えられない場合があります。
それは、実際に入る値がどのようなものか?という情報についてです。
例えば、「SEASON_ID」というカラムには「22022」や「42022」などの値が入っています。先頭の2はレギュラーシーズンを、4はプレーオフを表し、2022は2022-23シーズンを表しています。(NBAのシーズンは10月から始まり年を跨ぎます。)
このような値に対する事前知識がないと、「2022-23シーズンのレコードを抽出するクエリを作って」と指示しても、「WHERE SEASON_ID=”2022-23”」などと適当に予測した値の条件句を提案してくる場合があります。

そんなときは、データそのものを事前知識として入力してあげると、実際のデータの様子を伝えることが可能となります。
ただし、データそのものの入力はセキュリティやプライバシーの問題、トークンを必要以上に消費してしまう等の問題を抱えているため、業務において実施するのが難しい場合があります。
今回は、趣味の延長線上のため、メタデータに加えてデータ例も事前知識として入力し、生成AIでクエリを作成しました。

ChatGPTへの事前入力内容

まず、メタデータの知識を共有します。
まず事前知識として提供します。

現在、BigQueryにleaguegamelog_player、 leaguegamelog_teamの2つのテーブルがあります。
カラムのメタデータについて情報提供します。

```python
from google.cloud import bigquery

client = bigquery.Client()
table_id = "sample.nba.leaguegamelog_player"
table = client.get_table(table_id)
for schema_field in table.schema:
    print(f"{schema_field.name}\t{schema_field.field_type}\t{schema_field.mode}\t{schema_field.description}")
```

上記のPythonスクリプトを叩くと、下記の結果が出ます。


```
AST	INTEGER	NULLABLE	選手の合計アシスト数(例: 23)
BLK	INTEGER	NULLABLE	選手の合計ブロックショット数(例: 4)
DREB	INTEGER	NULLABLE	選手の合計ディフェンシブリバウンド数(例: 31)
FANTASY_PTS	FLOAT	NULLABLE	ファンタジーポイント - 各種統計を weighted sum した値(例: 245.5)
FG3A	INTEGER	NULLABLE	選手の3ポイントショット試投数(例: 29)
FG3M	INTEGER	NULLABLE	選手の3ポイントショット成功数(例: 10)
FG3_PCT	FLOAT	NULLABLE	選手の3ポイントショット成功率 - FG3M/FG3A(例: 0.345)
FGA	INTEGER	NULLABLE	選手のフィールドゴール試投数(例: 90)
FGM	INTEGER	NULLABLE	選手のフィールドゴール成功数(例: 41)
FG_PCT	FLOAT	NULLABLE	選手のフィールドゴール成功率 - FGM/FGA(例: 0.456)
FTA	INTEGER	NULLABLE	選手のフリースロー試投数(例: 20)
FTM	INTEGER	NULLABLE	選手のフリースロー成功数(例: 15)
FT_PCT	FLOAT	NULLABLE	選手のフリースロー成功率 - FTM/FTA(例: 0.75)
GAME_DATE	DATE	REQUIRED	試合日 YYYY-MM-DD形式(例: 2023-10-24)
GAME_ID	STRING	REQUIRED	試合を一意に識別するID(例: 0022300061)
MATCHUP	STRING	NULLABLE	対戦カード(例: LAL vs. DEN)
MIN	INTEGER	NULLABLE	選手の総プレイ時間(例: 240分)
OREB	INTEGER	NULLABLE	選手の合計オフェンシブリバウンド数(例: 13)
PF	INTEGER	NULLABLE	選手の合計ファウル数(例: 18)
PLAYER_ID	INTEGER	NULLABLE	選手を一意に識別するID(例: 2544)
PLAYER_NAME	STRING	NULLABLE	選手名(例: LeBron James)
PLUS_MINUS	INTEGER	NULLABLE	プラスマイナス(得点差)(例: -12)
PTS	INTEGER	NULLABLE	選手の合計得点(例: 107)
REB	INTEGER	NULLABLE	選手の合計リバウンド数 - OREB + DREB(例: 44)
SEASON_ID	STRING	NULLABLE	シーズンを示すID、先頭の2は2000年代を表す(例: 22023は2023-24シーズン)
STL	INTEGER	NULLABLE	選手の合計スティール数(例: 5)
TEAM_ABBREVIATION	STRING	NULLABLE	選手の略称(例: LAL)
TEAM_ID	INTEGER	NULLABLE	選手を一意に識別するID(例: 1610612747)
TEAM_NAME	STRING	NULLABLE	選手の正式名称(例: Los Angeles Lakers)
TOV	INTEGER	NULLABLE	選手の合計ターンオーバー数(例: 12)
VIDEO_AVAILABLE	BOOLEAN	NULLABLE	ビデオハイライトが利用可能かどうか(例: true)
WL	STRING	NULLABLE	試合の勝敗(例: L [W:勝利、L:敗北])
_SEASON_TYPE	STRING	REQUIRED	シーズンタイプ(例: Regular Season)
```

同様にsample.nba.leaguegamelog_teamテーブルに対して同じスクリプトを叩くと下記の結果になります。
```
AST	INTEGER	NULLABLE	チームが記録したアシスト数
BLK	INTEGER	NULLABLE	チームが記録したブロックショット数
DREB	INTEGER	NULLABLE	チームが記録したディフェンスリバウンド数
FG3A	INTEGER	NULLABLE	チームが試投した3ポイントシュートの本数
FG3M	INTEGER	NULLABLE	チームが成功させた3ポイントシュートの本数
FG3_PCT	FLOAT	NULLABLE	チームの3ポイントシュート成功率(%)
FGA	INTEGER	NULLABLE	チームが試投したフィールドゴールの本数
FGM	INTEGER	NULLABLE	ゲーム中に選手が成功させたフィールドゴールの本数
FG_PCT	FLOAT	NULLABLE	チームのフィールドゴール成功率(%)
FTA	INTEGER	NULLABLE	チームが試投したフリースローの本数
FTM	INTEGER	NULLABLE	チームが成功させたフリースローの本数
FT_PCT	FLOAT	NULLABLE	チームのフリースロー成功率(%)
GAME_DATE	DATE	REQUIRED	ゲームの日付
GAME_ID	STRING	REQUIRED	ゲームを識別する一意のID
MATCHUP	STRING	NULLABLE	試合の対戦カード(例:'LAL vs BOS')
MIN	INTEGER	NULLABLE	チームがプレーした分数
OREB	INTEGER	NULLABLE	チームが記録したオフェンスリバウンド数
PF	INTEGER	NULLABLE	チームが犯した個人ファウル数
PLUS_MINUS	INTEGER	NULLABLE	チームの得失点差
PTS	INTEGER	NULLABLE	チームが記録した得点
REB	INTEGER	NULLABLE	チームが記録したリバウンド総数
SEASON_ID	STRING	NULLABLE	シーズンを識別するID(例:'22023')
STL	INTEGER	NULLABLE	ゲーム中に選手が記録したスティール数
TEAM_ABBREVIATION	STRING	NULLABLE	チームの略称(例:'LAL')
TEAM_ID	INTEGER	NULLABLE	チームを識別する一意のID
TEAM_NAME	STRING	NULLABLE	チーム名(例:'Los Angeles Lakers')
TOV	INTEGER	NULLABLE	チームが記録したターンオーバー数
VIDEO_AVAILABLE	BOOLEAN	NULLABLE	ゲーム映像が利用可能かどうか(true/false)
WL	STRING	NULLABLE	ゲーム結果(勝利:'W'、敗北:'L')
_SEASON_TYPE	STRING	REQUIRED	シーズンの種類(例:'Regular Season', 'Playoffs')
```


次にデータを知識として入力します。
ここでは数行与えればよいため、なるべくスキャン量は減らしたいです。
データ量を絞るにはLIMIT句を使えば良さそうに見えます。
ただし、LIMIT句は出力レコード数を抑えているつもりでもフルスキャンをしてしまい破産してしまう恐れがあるため、SAMPLINGによるレコード数制限が鍵となります。
BQストレージはレコードが1GB単位でブロックとしてまとめられており、SAMPLINGはブロック全体に対してN%のブロックを選択する仕組みになっています。
特定のブロックのみをスキャンするため、フルスキャンの心配はありません。(全レコード数が1ブロック以下であればフルスキャンとなりますが、その場合はスキャン量が小さいため問題となりません。)

次にデータ例を提供します。 
```sql
SELECT * FROM nba.leaguegamelog_player TABLESAMPLE SYSTEM (0.01 PERCENT) WHERE rand() < 0.1 LIMIT 10
```
上記のBigQueryのSQLを叩くと下記の結果が出ます。
```
AST	BLK	DREB	FANTASY_PTS	FG3A	FG3M	FG3_PCT	FGA	FGM	FG_PCT	FTA	FTM	FT_PCT	GAME_DATE	GAME_ID	MATCHUP	MIN	OREB	PF	PLAYER_ID	PLAYER_NAME	PLUS_MINUS	PTS	REB	SEASON_ID	STL	TEAM_ABBREVIATION	TEAM_ID	TEAM_NAME	TOV	VIDEO_AVAILABLE	WL	_SEASON_TYPE
0	0	4	21.8	0	0		10	4	0.4	0	0		2013-03-04	0021200894	CHA @ POR	26	5	1	202687	Bismack Biyombo	-9	8	9	22012	1	CHA	1610612766	Charlotte Bobcats	0	true	L	Regular Season
0	0	0	0.0	0	0		0	0		0	0		2013-03-04	0021200892	MIA @ MIN	2	0	0	2034	Mike Miller	0	0	0	22012	0	MIA	1610612748	Miami Heat	0	false	W	Regular Season
5	0	0	8.5	1	0	0.0	4	1	0.25	0	0		2013-03-04	0021200891	ORL @ NOH	12	0	1	2757	Beno Udrih	5	2	0	22012	0	ORL	1610612753	Orlando Magic	1	true	W	Regular Season
0	0	0	0.0	1	0	0.0	4	0	0.0	0	0		2013-03-04	0021200895	TOR @ GSW	8	0	1	203082	Terrence Ross	-7	0	0	22012	0	TOR	1610612761	Toronto Raptors	0	true	L	Regular Season
1	0	1	2.7	0	0		1	0	0.0	0	0		2013-03-04	0021200895	TOR @ GSW	7	0	1	202361	Landry Fields	-13	0	1	22012	0	TOR	1610612761	Toronto Raptors	0	true	L	Regular Season
1	0	4	12.5	2	1	0.5	6	3	0.5	0	0		2013-03-04	0021200893	DEN vs. ATL	27	1	0	201568	Danilo Gallinari	9	7	5	22012	0	DEN	1610612743	Denver Nuggets	2	true	W	Regular Season
0	0	9	25.6	0	0		11	5	0.455	0	0		2013-03-04	0021200893	DEN vs. ATL	26	4	5	202702	Kenneth Faried	3	10	13	22012	1	DEN	1610612743	Denver Nuggets	3	true	W	Regular Season
0	0	1	2.2	0	0		3	1	0.333	0	0		2013-03-04	0021200892	MIN vs. MIA	6	0	2	202419	Chris Johnson	-1	2	1	22012	0	MIN	1610612750	Minnesota Timberwolves	1	false	L	Regular Season
1	0	1	2.7	0	0		1	0	0.0	0	0		2013-03-04	0021200892	MIN vs. MIA	9	0	2	201880	Greg Stiemsma	-6	0	1	22012	0	MIN	1610612750	Minnesota Timberwolves	0	false	L	Regular Season
1	0	2	5.1	0	0		0	0		0	0		2013-03-04	0021200891	NOH vs. ORL	7	1	0	202498	Lance Thomas	-2	0	3	22012	0	NOH	1610612740	New Orleans Hornets	0	true	L	Regular Season
```

同じようにチーム単位のスタッツのデータは次のクエリで確認できます。
```sql
SELECT * FROM nba.leaguegamelog_team TABLESAMPLE SYSTEM (1 PERCENT) WHERE rand() < 0.1 LIMIT 10
```

```
AST	BLK	DREB	FG3A	FG3M	FG3_PCT	FGA	FGM	FG_PCT	FTA	FTM	FT_PCT	GAME_DATE	GAME_ID	MATCHUP	MIN	OREB	PF	PLUS_MINUS	PTS	REB	SEASON_ID	STL	TEAM_ABBREVIATION	TEAM_ID	TEAM_NAME	TOV	VIDEO_AVAILABLE	WL	_SEASON_TYPE
19	2	31	27	9	0.333	72	31	0.431	27	24	0.889	2014-11-17	0021400149	MIA @ BKN	240	10	24	12	95	41	22014	6	MIA	1610612748	Miami Heat	15	true	W	Regular Season
27	8	46	26	10	0.385	81	36	0.444	21	18	0.857	2014-11-17	0021400152	SAS vs. PHI	240	6	17	25	100	52	22014	9	SAS	1610612759	San Antonio Spurs	18	true	W	Regular Season
22	4	33	21	7	0.333	81	42	0.519	18	11	0.611	2017-04-05	0021601166	DET vs. TOR	240	12	21	-3	102	45	22016	5	DET	1610612765	Detroit Pistons	14	true	L	Regular Season
24	3	33	31	11	0.355	86	38	0.442	18	14	0.778	2017-04-05	0021601172	DAL @ LAC	240	11	26	-11	101	44	22016	5	DAL	1610612742	Dallas Mavericks	16	true	L	Regular Season
22	0	35	31	12	0.387	75	38	0.507	31	24	0.774	2017-04-05	0021601172	LAC vs. DAL	240	5	19	11	112	40	22016	11	LAC	1610612746	LA Clippers	15	true	W	Regular Season
22	5	28	24	7	0.292	85	38	0.447	23	17	0.739	2015-11-02	0021500046	PHI vs. CLE	240	9	18	-7	100	37	22015	7	PHI	1610612755	Philadelphia 76ers	14	true	L	Regular Season
32	13	55	25	11	0.44	84	43	0.512	30	22	0.733	2015-11-02	0021500051	GSW vs. MEM	240	10	15	50	119	65	22015	8	GSW	1610612744	Golden State Warriors	14	true	W	Regular Season
16	1	30	17	4	0.235	70	25	0.357	19	10	0.526	2007-03-03	0020600886	IND @ LAC	240	7	20	-23	64	37	22006	4	IND	1610612754	Indiana Pacers	15	false	L	Regular Season
14	4	23	10	4	0.4	74	38	0.514	25	18	0.72	2007-03-03	0020600884	ORL @ DAL	240	10	22	-5	98	33	22006	4	ORL	1610612753	Orlando Magic	12	false	L	Regular Season
24	6	36	4	2	0.5	70	31	0.443	32	23	0.719	2007-03-03	0020600886	LAC vs. IND	240	11	26	23	87	47	22006	13	LAC	1610612746	Los Angeles Clippers	12	false	W	Regular Season
```

スタッツの計算1

クリスマスの日に40得点以上、15リバウンド以上、5回以上の3Pを決めたのはウェンバンヤマのみ

まず、簡単なクエリ生成から始めました。

クリスマスに40得点以上、15リバウンド以上、5回以上の3Pを決めた選手一覧を抽出するクエリを作ってください。

単純なWHERE程度のクエリは問題なく作れるようです。
「クリスマス」という言葉は、12/24と12/25で曖昧ですが、NBAでは歴史的に12/24は試合がなく、12/25にビッグなマッチアップが組まれ、これをクリスマスゲームと言います。
この知識がAI側にあったためWHERE句の指定を12/25と指定できたのかと思います。

SELECT
   GAME_DATE,
   MATCHUP,
   PLAYER_NAME,
   PTS AS POINTS,
   REB AS REBOUNDS,
   FG3M AS THREE_POINTERS_MADE
FROM
   `nba.leaguegamelog_player`
WHERE
   EXTRACT(MONTH FROM GAME_DATE) = 12
   AND EXTRACT(DAY FROM GAME_DATE) = 25
   AND PTS >= 40
   AND REB >= 15
   AND FG3M >= 5
ORDER BY
   GAME_DATE DESC;

結果は次のようになりました。

GAME_DATE MATCHUP PLAYER_NAME POINTS REBOUNDS THREE_POINTERS_MADE
2024-12-25 SAS @ NYK Victor Wembanyama 42 18 6

スタッツの計算2

1シーズンに60得点を3回以上成した選手は歴代、リラードとチェンバレンのみ

上記のスタッツを求めてもらうクエリを教えてもらいました。
事前知識を与えた上で、次の質問をしたら1回で正解のクエリが返ってきました。

60得点以上を1シーズンで3回以上達成した選手を抽出してください。 出力は、選手名、シーズンID、そのシーズンに60得点以上を何回達成したかの3カラムにしてください。

多少厳密さは欠くものの上出来なクエリを提案してもらえました。
PLAYER_NAMEでなく、PLAYER_IDでグルーピングして欲して欲しかったです。
AIが提案するクエリは、このように結果的に正しいものであっても、一般化できない場合があるため現時点ではやはり参考程度にすべきだと思います。

SELECT
   PLAYER_NAME,
   SEASON_ID,
   COUNT(*) AS games_with_60_plus_points
FROM
   `nba.leaguegamelog_player`
WHERE
   PTS >= 60
GROUP BY
   PLAYER_NAME, SEASON_ID
HAVING
   COUNT(*) >= 3
ORDER BY
   games_with_60_plus_points DESC;

クエリの結果は下記のようになりました。
Xのポストだと「60得点以上を1シーズンに3回以上取った選手は史上2人しかいない」ということしか分かりませんでしたが、そのうちの1人は2シーズンも達成していることや、3回を大きく超え15回、9回と達成しているというニュースでは分からない追加の情報を得ることもできました。

PLAYER_NAME SEASON_ID games_with_60_plus_points
Wilt Chamberlain 21961 15
Wilt Chamberlain 21962 9
Damian Lillard 22019 3

スタッツの計算3

ヨキッチとマレーは共に30得点とトリプルダブル(得点・アシスト・リバウンドが2桁)をした史上初のチームメイト

次に少し難しいスタッツを試しました。
1人単位のスタッツではなく、同チームで2人以上という単位のスタッツを求めました。

30得点以上のトリプルダブルを1試合で、同チームの2人が達成した試合を抽出してください。

生成されたクエリは次のようになりました。
TEAM_IDでグルーピングされていないため、一見クエリに「同一チーム」という制約が入っていないように思えましたが、MATCHUPは team Aとteam Bの試合で、teamAがHOMEの場合はteamAのレコードには「teamA vs. teamB」teamBのレコードには「teamB @ teamA」、team AがAWAYの場合はteamAのレコードには「teamA @ teamB」teamBのレコードには「teamB vs. teamA」となるため、MATHUPでチーム単位のグルーピングが出来ていることがクエリから学ぶことができました。
この制約はメタデータでは説明しきれていなかったため、データを渡したことによるメリットと捉えることができます。

SELECT
   GAME_DATE,
   MATCHUP,
   ARRAY_AGG(STRUCT(PLAYER_NAME AS name, PTS AS points, AST AS assists, REB AS rebounds)) AS players_stats
FROM
   nba.leaguegamelog_player
WHERE
   PTS >= 30
   AND AST >= 10
   AND REB >= 10
GROUP BY
   GAME_DATE, MATCHUP
HAVING
   COUNT(PLAYER_NAME) >= 2

クエリの結果は下記となります。
3列目はBQの関数でJSON型にまとめてくれたため、見やすくなっています。
クエリを生成してもらうことのメリットの一つはこのように、自身の想定以上の出力を見ることにより学びを得ることができることにあります。

GAME_DATE MATCHUP players_stats
2023-06-07 DEN @ MIA {players_stats: [{name: Nikola Jokić,points: 32,assists: 10,rebounds: 21}, {name: Jamal Murray,points: 34,assists: 10,rebounds: 10}]}

スタッツの計算4

Cavsは1シーズン73勝したWarriors以来初めて開幕から無敗で10連勝した

次に選手単位ではなくチーム単位のスタッツについてクエリを考えてもらいました。

leaguegamelog_teamテーブルから、(シーズン、チーム)ごとに開幕から一敗もせずに連勝した数を算出するクエリを作って下さい。

下記のクエリが提案されました。
Window関数を含むクエリの提案も問題なくできるようです。
「無敗のままの連勝数」をどう捉えるかが提案クエリを読むことで理解することができました。
複雑だったので、コメントを私が入れています。

-- (シーズン, チーム)ごとに試合日に順番を付ける
WITH GameResults AS (
 SELECT
   SEASON_ID,
   TEAM_ID,
   TEAM_NAME,
   GAME_DATE,
   WL,
   ROW_NUMBER() OVER (PARTITION BY SEASON_ID, TEAM_ID ORDER BY GAME_DATE) AS game_order
 FROM
   `nba.leaguegamelog_team`
 WHERE
   _SEASON_TYPE = "Regular Season"
),
-- 負けの数を計算
-- (シーズン, チーム)で一番最初の行から和を計算していく
-- 1回でも以前に負けたことがあると, その行は1以上になる
WinStreaks AS (
 SELECT
   SEASON_ID,
   TEAM_ID,
   TEAM_NAME,
   GAME_DATE,
   game_order,
   WL,
   CASE WHEN WL = 'L' THEN 0 ELSE 1 END AS win_flag,
   SUM(CASE WHEN WL = 'L' THEN 1 ELSE 0 END) OVER (
     PARTITION BY SEASON_ID, TEAM_ID ORDER BY game_order ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
   ) AS loss_count
 FROM
   GameResults
),
-- それまでの試合で負けが無い試合をフィルタ
FilteredStreaks AS (
 SELECT
   SEASON_ID,
   TEAM_ID,
   TEAM_NAME,
   game_order,
   win_flag,
   loss_count
 FROM
   WinStreaks
 WHERE
   loss_count = 0
),
-- それまでに1敗も無い試合数の行数をカウントすると, シーズン開幕からの連勝数と解釈できる
FinalResult AS (
 SELECT
   SEASON_ID,
   TEAM_ID,
   TEAM_NAME,
   COUNT(*) AS opening_win_streak
 FROM
   FilteredStreaks
 GROUP BY
   SEASON_ID, TEAM_ID, TEAM_NAME
)
SELECT
 SEASON_ID,
 TEAM_ID,
 TEAM_NAME,
 opening_win_streak
FROM
 FinalResult
ORDER BY
 opening_win_streak DESC,
 SEASON_ID
LIMIT 10;

結果は下記のようになりました。
今シーズン話題になった、Cleveland Cavaliersの開幕から15連勝は歴代で2位タイの記録のようです。
1位は2015-16シーズンのGolden State Warriorsでした。
この年のWarriorsは73勝9敗という驚異的な強さを誇っていたので、開幕24戦無敗というのも頷けます。
また、1996-97シーズンのChicago BullsはMichael Jordanのピーク時のものなので、それよりも今シーズンのCavaliersが高いということが分かり、歴史における今回の記録がどれだけ偉大であるかということが分かりました。

SEASON_ID TEAM_ID TEAM_NAME opening_win_streak
22015 1610612744 Golden State Warriors 24
21948 1610610036 Washington Capitols 15
21993 1610612745 Houston Rockets 15
22024 1610612739 Cleveland Cavaliers 15
21957 1610612738 Boston Celtics 14
22002 1610612742 Dallas Mavericks 14
21982 1610612760 Seattle SuperSonics 12
21996 1610612741 Chicago Bulls 12
21964 1610612738 Boston Celtics 11
21990 1610612757 Portland Trail Blazers 11

生成AIとastモジュールによるPythonコードの効率的な構造化

前回の記事で下記のPythonコードから、astとtokenizeモジュールを用いてJSONに構造化する方法を記しました。

from dataclasses import dataclass


@dataclass
class LogA:
    """Aの情報を表すログ"""

    __logname__ = "log_a"

    a1: int  # Aの1つ目の情報
    a2: str  # Aの2つ目の情報


@dataclass
class LogB:
    """Bの情報を表すログ"""

    __logname__ = "log_b"

    b1: int  # Bの1つ目の情報
    b2: str
    b3: float  # Bの3つ目の情報


@dataclass
class LogC:
    """Cの情報を表すログ"""

    __logname__ = "log_c"

    c1: int
    c2: str  # Cの2つ目の情報
    c3: float  # Cの3つ目の情報
    c4: bool  # Cの4つ目の情報

このPythonコードを次のJSONに変換する問題を考えます。

{
    "name": "log_a",
    "desc": "Aの情報を表すログ",
    "columns": [
        {"name": "a1", "desc": "Aの1つ目の情報"},
        {"name": "a2", "desc": "Aの2つ目の情報"},
    ]
}
{
    "name": "log_b",
    "desc": "Bの情報を表すログ",
    "columns": [
        {"name": "b1", "desc": "Bの1つ目の情報"},
        {"name": "b2", "desc": ""},
        {"name": "b3", "desc": "Bの3つ目の情報"},
    ]
}
{
    "name": "log_c",
    "desc": "Cの情報を表すログ",
    "columns": [
        {"name": "c1", "desc": ""},
        {"name": "c2", "desc": "Cの2つ目の情報"},
        {"name": "c3", "desc": "Cの3つ目の情報"},
        {"name": "c4", "desc": "Cの4つ目の情報"},
    ]
}

本記事では、これを生成AIで構造化する方法を記します。
最初に示したPythonコードの例のようにクラスのみがまとまっていれば構造化しやすそうですが、実際のPythonコードにはimport文や関数など構造化対象の部分以外の余計な記述がたくさんあります。
変換対象ではない余計な入力が多いと、生成AIを惑わせることになり、精度の悪い生成結果をもたらしてしまいます。

from dataclasses import dataclass
import xxx
import yyy
import zzz


@dataclass
class LogA:
    """Aの情報を表すログ"""

    __logname__ = "log_a"

    a1: int  # Aの1つ目の情報
    a2: str  # Aの2つ目の情報


def hoge():
    print(1)

def fuga():
    print(2)

@dataclass
class LogB:
    """Bの情報を表すログ"""

    __logname__ = "log_b"

    b1: int  # Bの1つ目の情報
    b2: str
    b3: float  # Bの3つ目の情報


def poyo():
    print(3)


@dataclass
class LogC:
    """Cの情報を表すログ"""

    __logname__ = "log_c"

    c1: int
    c2: str  # Cの2つ目の情報
    c3: float  # Cの3つ目の情報
    c4: bool  # Cの4つ目の情報

上記のようなコードからクラスの部分を切り取るのに再びastモジュールが役に立ちます。
astモジュールを用いることで、クラスごとのノードを取得でき、同時にノードの開始行数、終了行数を取得することができます。
これにより、上記のコードからクラスの先頭行、終了行を取得し、対象行数のスライスを取り、その部分を結合し下記を得ます。

@dataclass
class LogA:
    """Aの情報を表すログ"""

    __logname__ = "log_a"

    a1: int  # Aの1つ目の情報
    a2: str  # Aの2つ目の情報
@dataclass
class LogB:
    """Bの情報を表すログ"""

    __logname__ = "log_b"

    b1: int  # Bの1つ目の情報
    b2: str
    b3: float  # Bの3つ目の情報
@dataclass
class LogC:
    """Cの情報を表すログ"""

    __logname__ = "log_c"

    c1: int
    c2: str  # Cの2つ目の情報
    c3: float  # Cの3つ目の情報
    c4: bool  # Cの4つ目の情報

これを順に生成AIに入力していきます。
トークン作成には下記のような文字列を使います。

token="""
Pythonコードが入力として与えられます。
Pythonコードに書かれているclassの情報を解析して「ログ名」「ログの説明」、各フィールドに対して「フィールド名」「フィールドの説明」の情報を表すJSONを作成してください。

[コード例]が与えられた場合、[出力例]のようなJSONを返して下さい。
[コード例]
```python
@dataclass
class LogX:
    \"\"\"Xの情報を表すログ\"\"\"

    __logname__ = "log_x"

    x1: int  # Xの1つ目の情報
    x2: str  # Xの2つ目の情報
    x3: str  # Xの3つ目の情報
```

[出力例]
```
{
    "name": "log_x",
    "desc": "Xの情報を表すログ",
    "columns": [
        {"name": "x1", "desc": "Xの1つ目の情報"},
        {"name": "x2", "desc": "Xの2つ目の情報"},
        {"name": "x3", "desc": "Xの3つ目の情報"}
    ]
}
```

下記がJSON化してほしいPythonコードになります。
```python
@dataclass
class LogA:
    \"\"\"Aの情報を表すログ\"\"\"

    __logname__ = "log_a"

    a1: int  # Aの1つ目の情報
    a2: str  # Aの2つ目の情報
```

jsonの部分のみを返してください。
出力結果に```や```jsonは必要ありません。
columnsの最後の要素に,は必要ありません。
"""

key, valueが交互にあるリストから辞書に変換する方法

概要

key, valueが交互に現れるリスト

["key1", "value1", "key2", "value2", "key3", "value3"]

これを辞書にするアルゴリズムがエレガントだと思ったのでメモします。

{"key1": "value1", "key2": "value2", "key3": "value3"}

詳細

まず、どういうときにこのリストが現れるかを説明します。
Google スプレッドシートで1セルに改行ありのkey:valueを入れると次のようになります。

1セルに複数行入っているパターンを考える

スプレッドシートをTSVとしてダウンロードします。
ダウンロードしたTSVを端末で表示してみると、1セル内での改行はSpaceに置き換わっていることが分かります。

1セル内の改行はspaceに置き換わっていることが分かる

もともと改行であった空白と、: にある空白でsplit()すると最初に示したリストになりました。

import csv


def main():
    with open("無題のスプレッドシート - シート1 (1).tsv", "r") as f:
        reader = csv.reader(f, delimiter="\t")
        for row in reader:
            _, kvs = row
            print(kvs.replace(":", "").split(" "))


if __name__ == "__main__":
    main()


# ['key1', 'value1', 'key2', 'value2', 'kye3', 'value3']
# ['key4', 'value4', 'key5', 'value5']

次にこのリストを辞書にしてみます。
奇数はkeyに偶数はvalueにするというアルゴリズムを考えつきます。

a_list = ['key1', 'value1', 'key2', 'value2', 'kye3', 'value3']
k_list = [a for i, a in enumerate(a_list) if i % 2 == 0]
v_list = [a for i, a in enumerate(a_list) if i % 2 == 1]

# 下記2つは同じ
print({k: v for k, v in zip(k_list, v_list)})  # {'key1': 'value1', 'key2': 'value2', 'kye3': 'value3'}
print(dict(zip(k_list, v_list)))  # {'key1': 'value1', 'key2': 'value2', 'kye3': 'value3'}

しかし、これ以上にエレガントな解法を見つけたので記します。
イテレータを一つ宣言して、zipに同じイテレータを2つ渡すというやり方です。
片方はkey、片方はvalueを返すイテレータのように扱えます。

a_list = ['key1', 'value1', 'key2', 'value2', 'kye3', 'value3']
it = iter(a_list)
print(dict(zip(it, it)))  # {'key1': 'value1', 'key2': 'value2', 'kye3': 'value3'}

NBAのレギュラーシーズン開幕からの連勝数の導出

Cleveland Cavaliersがレギュラーシーズン開幕から15連勝を記録しました。
www.youtube.com

「歴代の開幕からの連勝数」を導出するクエリを考えたので防備録を残します。
前提として、全試合のチーム単位でのスタッツをnba.leaguegamelog_teamというテーブルに持っています。

SELECT * FROM `nba.leaguegamelog_team` WHERE SEASON_ID = "22023" LIMIT 5

|AST|BLK|DREB|FG3A|FG3M|FG3_PCT|FGA|FGM|FG_PCT|FTA|FTM|FT_PCT|GAME_DATE|GAME_ID|MATCHUP|MIN|OREB|PF|PLUS_MINUS|PTS|REB|SEASON_ID|STL|TEAM_ABBREVIATION|TEAM_ID|TEAM_NAME|TOV|VIDEO_AVAILABLE|WL|SEASON_TYPE|
|--|--|--|--|--|--|--|--|--|--|--|--|--|--|--|--|--|--|--|--|--|--|--|--|--|--|--|--|--|--|
|29|5|36|31|12|0.387|85|42|0.494|32|24|0.75|2024-03-01|0022300862|MIN vs. SAC|265|8|22|-4|120|44|22023|4|MIN|1610612750|Minnesota Timberwolves|13|true|L|Regular Season|
|29|6|31|29|11|0.379|100|50|0.5|24|13|0.542|2024-03-01|0022300862|SAC @ MIN|265|14|29|4|124|45|22023|8|SAC|1610612758|Sacramento Kings|8|true|W|Regular Season|
|25|6|41|41|18|0.439|83|33|0.398|30|26|0.867|2024-03-01|0022300857|CLE @ DET|240|12|20|10|110|53|22023|7|CLE|1610612739|Cleveland Cavaliers|16|true|W|Regular Season|
|24|7|33|36|10|0.278|89|33|0.371|28|24|0.857|2024-03-01|0022300857|DET vs. CLE|240|9|23|-10|100|42|22023|10|DET|1610612765|Detroit Pistons|9|true|L|Regular Season|
|24|6|25|33|14|0.424|84|43|0.512|17|14|0.824|2024-03-01|0022300858|CHA @ PHI|240|7|13|-7|114|32|22023|5|CHA|1610612766|Charlotte Hornets|5|true|L|Regular Season|


第一に、SEASON_ID, TEAM_IDごとに各試合日に対して試合の日付が早い順に一意の番号を付与します。

WITH GameResults AS (
  SELECT
    SEASON_ID,
    TEAM_ID,
    TEAM_NAME,
    GAME_DATE,
    WL,
    ROW_NUMBER() OVER (PARTITION BY SEASON_ID, TEAM_ID ORDER BY GAME_DATE) AS game_order
  FROM
    `nba.leaguegamelog_team`
  WHERE
    _SEASON_TYPE = "Regular Season"
)

次のように、(シーズン, チーム)ごとに試合の早い順に行に番号が振られます。

SELECT * FROM GameResults WHERE SEASON_ID = "22023" LIMIT 10

|SEASON_ID|TEAM_ID|TEAM_NAME|GAME_DATE|WL|game_order|
|--|--|--|--|--|--|
|22023|1610612756|Phoenix Suns|2023-10-24|W|1|
|22023|1610612756|Phoenix Suns|2023-10-26|L|2|
|22023|1610612756|Phoenix Suns|2023-10-28|W|3|
|22023|1610612756|Phoenix Suns|2023-10-31|L|4|
|22023|1610612756|Phoenix Suns|2023-11-02|L|5|
|22023|1610612756|Phoenix Suns|2023-11-04|L|6|
|22023|1610612756|Phoenix Suns|2023-11-05|W|7|
|22023|1610612756|Phoenix Suns|2023-11-08|W|8|
|22023|1610612756|Phoenix Suns|2023-11-10|L|9|
|22023|1610612756|Phoenix Suns|2023-11-12|L|10|

第二にWL (勝ち負け)を判定し、負けていたら0、勝っていたら1とするようなwin_flagという列を導入します。BOOL値を二値の整数値に変えるような変換です。
このwin_flagは次の列での計算を分かりやすくするための導入で、次の列が本質になります。
まず、OVER (PARTITION BY SEASON_ID, TEAM_ID ORDER BY game_order ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)の部分についてです。
「(SEASON_ID, TEAM_ID)でグルーピングし、それをgame_orderでソートし、その中から、現在の行と一番最初の行の間の全行に対して」という意味になります。
この範囲に対して、SUM()を適用します。SUM()の中身はwin_flagです。そこまでの行の負けの累計を計算しています。

WinStreaks AS (
  SELECT
    SEASON_ID,
    TEAM_ID,
    TEAM_NAME,
    GAME_DATE,
    game_order,
    WL,
    CASE WHEN WL = 'L' THEN 0 ELSE 1 END AS win_flag,
    SUM(CASE WHEN WL = 'L' THEN 1 ELSE 0 END) OVER (
      PARTITION BY SEASON_ID, TEAM_ID ORDER BY game_order ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) AS loss_count
  FROM
    GameResults
)

このWITH文を覗くと次のようになっています。
win_flagで0が登場したところでloss_countが+1されていることが分かります。

SELECT * FROM WinStreaks WHERE SEASON_ID = "22024" AND TEAM_NAME = "Boston Celtics" ORDER BY TEAM_NAME, GAME_DATE LIMIT 20

|SEASON_ID|TEAM_ID|TEAM_NAME|GAME_DATE|game_order|WL|win_flag|loss_count|
|--|--|--|--|--|--|--|--|
|22024|1610612738|Boston Celtics|2024-10-22|1|W|1|0|
|22024|1610612738|Boston Celtics|2024-10-24|2|W|1|0|
|22024|1610612738|Boston Celtics|2024-10-26|3|W|1|0|
|22024|1610612738|Boston Celtics|2024-10-28|4|W|1|0|
|22024|1610612738|Boston Celtics|2024-10-30|5|L|0|1|
|22024|1610612738|Boston Celtics|2024-11-01|6|W|1|1|
|22024|1610612738|Boston Celtics|2024-11-02|7|W|1|1|
|22024|1610612738|Boston Celtics|2024-11-04|8|W|1|1|
|22024|1610612738|Boston Celtics|2024-11-06|9|L|0|2|
|22024|1610612738|Boston Celtics|2024-11-08|10|W|1|2|
|22024|1610612738|Boston Celtics|2024-11-10|11|W|1|2|
|22024|1610612738|Boston Celtics|2024-11-12|12|L|0|3|
|22024|1610612738|Boston Celtics|2024-11-13|13|W|1|3|
|22024|1610612738|Boston Celtics|2024-11-16|14|W|1|3|
|22024|1610612738|Boston Celtics|2024-11-19|15|W|1|3|
|22024|1610612738|Boston Celtics|2024-11-22|16|W|1|3|
|22024|1610612738|Boston Celtics|2024-11-24|17|W|1|3|

第三に、loss_countが0のレコードのみを集めます。
loss_count=0ということは、「最初の行から負けがない」ことを意味します。
今回の場合、(TEAM, SEASON)という単位でグルーピングした上での最初の行(最初の試合日)でしたので、「チームのシーズン開幕からの負けるまでの連勝数」という解釈をすることができます。

FilteredStreaks AS (
  SELECT
    SEASON_ID,
    TEAM_ID,
    TEAM_NAME,
    game_order,
    win_flag,
    loss_count
  FROM
    WinStreaks
  WHERE
    loss_count = 0
),

上記WITH文を覗いてみると、各チームの「開幕から負けていないときのスタッツ」を取得できます。

SELECT * FROM FilteredStreaks WHERE SEASON_ID = "22024"

|SEASON_ID|TEAM_ID|TEAM_NAME|game_order|win_flag|loss_count|
|--|--|--|--|--|--|
|22024|1610612749|Milwaukee Bucks|1|1|0|
|22024|1610612737|Atlanta Hawks|1|1|0|
|22024|1610612737|Atlanta Hawks|2|1|0|
|22024|1610612747|Los Angeles Lakers|1|1|0|
|22024|1610612747|Los Angeles Lakers|2|1|0|
|22024|1610612747|Los Angeles Lakers|3|1|0|
|22024|1610612744|Golden State Warriors|1|1|0|
|22024|1610612744|Golden State Warriors|2|1|0|
|22024|1610612739|Cleveland Cavaliers|1|1|0|
|22024|1610612739|Cleveland Cavaliers|2|1|0|
|22024|1610612739|Cleveland Cavaliers|3|1|0|
|22024|1610612739|Cleveland Cavaliers|4|1|0|
|22024|1610612739|Cleveland Cavaliers|5|1|0|
|22024|1610612739|Cleveland Cavaliers|6|1|0|
|22024|1610612739|Cleveland Cavaliers|7|1|0|
|22024|1610612739|Cleveland Cavaliers|8|1|0|
|22024|1610612739|Cleveland Cavaliers|9|1|0|
|22024|1610612739|Cleveland Cavaliers|10|1|0|
|22024|1610612739|Cleveland Cavaliers|11|1|0|
|22024|1610612739|Cleveland Cavaliers|12|1|0|
|22024|1610612739|Cleveland Cavaliers|13|1|0|
|22024|1610612739|Cleveland Cavaliers|14|1|0|
|22024|1610612739|Cleveland Cavaliers|15|1|0|
|22024|1610612740|New Orleans Pelicans|1|1|0|
|22024|1610612740|New Orleans Pelicans|2|1|0|
|22024|1610612756|Phoenix Suns|1|1|0|
|22024|1610612766|Charlotte Hornets|1|1|0|
|22024|1610612763|Memphis Grizzlies|1|1|0|
|22024|1610612738|Boston Celtics|1|1|0|
|22024|1610612738|Boston Celtics|2|1|0|
|22024|1610612738|Boston Celtics|3|1|0|
|22024|1610612738|Boston Celtics|4|1|0|
|22024|1610612742|Dallas Mavericks|1|1|0|
|22024|1610612753|Orlando Magic|1|1|0|
|22024|1610612753|Orlando Magic|2|1|0|
|22024|1610612754|Indiana Pacers|1|1|0|
|22024|1610612760|Oklahoma City Thunder|1|1|0|
|22024|1610612760|Oklahoma City Thunder|2|1|0|
|22024|1610612760|Oklahoma City Thunder|3|1|0|
|22024|1610612760|Oklahoma City Thunder|4|1|0|
|22024|1610612760|Oklahoma City Thunder|5|1|0|
|22024|1610612760|Oklahoma City Thunder|6|1|0|
|22024|1610612760|Oklahoma City Thunder|7|1|0|

ここまで来たら、簡単です。
(TEAM, SEASON)で集約し、「開幕から負けていない試合のスタッツ」の数を数えればよいです。
すなわち、COUNT()をとればよいです。

FinalResult AS (
  SELECT
    SEASON_ID,
    TEAM_ID,
    TEAM_NAME,
    COUNT(*) AS opening_win_streak
  FROM
    FilteredStreaks
  GROUP BY
    SEASON_ID, TEAM_ID, TEAM_NAME
)

これを覗いてみると、今シーズンの「開幕からけていない試合の数」すなわち「開幕からの負けるまでの連勝数」が見やすく表示されます。

SELECT * FROM FinalResult WHERE SEASON_ID = "22024"

|SEASON_ID|TEAM_ID|TEAM_NAME|opening_win_streak|
|--|--|--|--|
|22024|1610612756|Phoenix Suns|1|
|22024|1610612763|Memphis Grizzlies|1|
|22024|1610612760|Oklahoma City Thunder|7|
|22024|1610612739|Cleveland Cavaliers|15|
|22024|1610612753|Orlando Magic|2|
|22024|1610612754|Indiana Pacers|1|
|22024|1610612747|Los Angeles Lakers|3|
|22024|1610612740|New Orleans Pelicans|2|
|22024|1610612738|Boston Celtics|4|
|22024|1610612737|Atlanta Hawks|2|
|22024|1610612742|Dallas Mavericks|1|
|22024|1610612744|Golden State Warriors|2|
|22024|1610612766|Charlotte Hornets|1|
|22024|1610612749|Milwaukee Bucks|1|

最後に、ソートして「開幕からの負けるまでの連勝数」を歴代で見てみましょう。

SELECT
  CONCAT( SUBSTR(SEASON_ID, 2), '-', LPAD(CAST(CAST(SUBSTR(SEASON_ID, 4) AS INT64) + 1 AS STRING), 2, '0') ) AS season,
  TEAM_ID,
  TEAM_NAME,
  opening_win_streak
FROM
  FinalResult
ORDER BY
  opening_win_streak DESC,
  SEASON_ID

|season|TEAM_ID|TEAM_NAME|opening_win_streak|
|--|--|--|--|
|2015-16|1610612744|Golden State Warriors|24|
|1948-49|1610610036|Washington Capitols|15|
|1993-94|1610612745|Houston Rockets|15|
|2024-25|1610612739|Cleveland Cavaliers|15|
|1957-58|1610612738|Boston Celtics|14|
|2002-03|1610612742|Dallas Mavericks|14|
|1982-83|1610612760|Seattle SuperSonics|12|
|1996-97|1610612741|Chicago Bulls|12|
|1964-65|1610612738|Boston Celtics|11|
|1990-91|1610612757|Portland Trail Blazers|11|

任意個のIterableを受け取り全通りをIterateする関数をREST APIのリクエストに応用する

from itertools import product

def iterate_combinations(*iterables):
    yield from product(*iterables)

これをすることで、本来3重ループを書かなければならないところを1重ループで書くことができ、きれいにコードを書くことができます。
また、任意の個数のイテラブルを受取ることが出来るため、3重ループでも4重ループでも5重ループでも同じように書くことが可能です。
全通りの列挙

a_list = [1, 2, 3]
b_list = [4, 5, 6]
c_list = [7, 8, 9]

for combination in iterate_combinations(a_list, b_list, c_list):
    print(combination)
(1, 4, 7)
(1, 4, 8)
(1, 4, 9)
(1, 5, 7)
(1, 5, 8)
(1, 5, 9)
(1, 6, 7)
(1, 6, 8)
(1, 6, 9)
(2, 4, 7)
(2, 4, 8)
(2, 4, 9)
(2, 5, 7)
(2, 5, 8)
(2, 5, 9)
(2, 6, 7)
(2, 6, 8)
(2, 6, 9)
(3, 4, 7)
(3, 4, 8)
(3, 4, 9)
(3, 5, 7)
(3, 5, 8)
(3, 5, 9)
(3, 6, 7)
(3, 6, 8)
(3, 6, 9)

これを、nba_apiのリクエストに応用できることに気づきました。

例えば、leaguegamelogエンドポイントに対して下記のパラメタすべての結果がほしいとします。

  • player_or_team_abbreviation
    • T
    • P
  • season
    • 1990-91
    • 1991-92
    • ...
    • 2024-25
  • season_type
    • Regular Season
    • Playoffs
from nba_api.stats.endpoints import leaguegamelog
from itertools import product

def iterate_combinations(*iterables):
    yield from product(*iterables)

player_or_team_abbreviation = ["T", "P"]
seasons = [f"{year}-{str(year + 1)[-2:]}" for year in range(1990, 2025)]
season_types = ["Regular Season", "Playoffs"]

results = []
for player_team, season, season_type in iterate_combinations(
    player_or_team_abbreviation, seasons, season_types
):
    response = leaguegamelog.LeagueGameLog(
        player_or_team_abbreviation=player_team,
        season=season,
        season_type_all_star=season_type
    )
    game_log = response.get_data_frames()[0]
    results.append((player_team, season, season_type, game_log))

print(results[0][0], results[0][1], results[0][2])
print(results[0][3].head())

DagsterでYearlyPartitionsDefinitionを実装する

概要

Dagsterでbackfillの単位として、パーティションという概念があります。
docs.dagster.io
その中でも時間単位で制御したい場合、下記が提供されています。

  • HourlyPartitionsDefinition
  • DailyPartitionsDefinition
  • WeeklyPartitionsDefinition
  • MonthlyPartitionsDefinition
  • TimeWindowPartitionsDefinition

時間、日、週、月までの時間単位はDagster側で提供されていますが、年次パーティションは現在実装されていませんでした。
年次パーティションはどのように実装したらよいか調べていたら同じ問題を抱えていた人のISSUEを見つけました。
github.com
年次単位のように長い期間のパーティション単位や2週間ごとのように中途半端な期間のパーティションなど、Dagsterが提供しているパーティション単位以上にカスタムしたい場合は、TimeWindowPartitionsDefinitionを使い、自身で設定する必要があるようです。

実装

下記の実装で1970年から1年ごとのパーティションが実装できました。

@asset(
    partitions_def=TimeWindowPartitionsDefinition(
        start=datetime(1970, 1, 1),  # 1970年1月1日を起点
        fmt="%Y",   # 表示は1970のように年単位で
        cron_schedule="0 0 1 1 *"  # 毎年1月1日0時0分ごとに
    )
)

上記assetを含むJobのbackfillは次のようになりました。
2024年は現在進行中なので2023年までしか出ていません。

実行している年のパーティション値がほしい場合は、次のように取得可能です。

partition_next_year: int = context.partition_time_window.start.year + 1

DagsterでBQテーブルの鮮度チェックを実装する

概要

下記記事について、SnowflakeのテーブルのSourceAssetに対して「2時間以内に更新があったか?」という鮮度チェックの例が載っていました。
個人の環境では、SnowflakeではなくBigQueryをメインに使っているため、BigQueryでも同じ処理をやりたいなと思い立ち、実装してみることにしました。
docs.dagster.io

実装

Snowflakeは(テーブル, 最終更新日)のタプルをイテレートしてくれる関数が用意されているようで、簡単に実装できそうでした。
BigQueryではそのメソッドが用意されていなかったため、INFORMATION_SCHEMAを参照して情報を取得しました。
コードはこちらになります。

import os
from datetime import datetime, timedelta

from dagster import (
    AssetSelection,
    AssetSpec,
    Definitions,
    EnvVar,
    MetadataValue,
    ObserveResult,
    ScheduleDefinition,
    build_last_update_freshness_checks,
    define_asset_job,
    multi_observable_source_asset,
)
from dotenv import load_dotenv
from google.cloud import bigquery

# BigQueryクライアントの初期化
client = bigquery.Client()

PROJECT = os.environ.get("PROJECT")
DATASET = os.environ.get("DATASET")
table_names = ["hoge", "salary"]
asset_specs = [AssetSpec(table_name) for table_name in table_names]


def fetch_last_updated_timestamps(project, dataset, tables):
    tables_in_sql = "(" + ", ".join(['"' + table + '"' for table in tables]) + ")"
    query = f"""
    SELECT table_id, TIMESTAMP_MILLIS(last_modified_time) as last_updated
    FROM `{project}.{dataset}.__TABLES__`
    WHERE table_id IN {tables_in_sql}
    """
    query_job = client.query(query)
    return {result.table_id: result.last_updated for result in query_job.result()}


@multi_observable_source_asset(specs=asset_specs)
def source_tables():
    freshness_results = fetch_last_updated_timestamps(
        project=PROJECT,
        dataset=DATASET,
        tables=table_names,
    )
    for table_name, last_updated in freshness_results.items():
        yield ObserveResult(
            asset_key=table_name,
            metadata={
                "dagster/last_updated_timestamp": MetadataValue.timestamp(last_updated)
            },
        )


source_tables_observation_schedule = ScheduleDefinition(
    job=define_asset_job(
        "source_tables_observation_job",
        selection=AssetSelection.assets(source_tables),
    ),
    cron_schedule="* * * * *",
)


source_table_freshness_checks = build_last_update_freshness_checks(
    assets=[source_tables],
    lower_bound_delta=timedelta(hours=2),
)


defs = Definitions(
    assets=[source_tables],
    asset_checks=source_table_freshness_checks,
    schedules=[source_tables_observation_schedule],
)
Assetとしてhogeとsalaryが表示される, materializeではなくobserveボタンになっている

実験

hogeテーブルのみを更新しobserveボタンを押しました。
結果はhogeテーブルのみがasset_checkを通過し、salaryは落ちることが分かりました。

hogeテーブルのasset_checkは通過し、salaryのasset_checkは落ちていることが分かる

考察

鮮度チェックの機能は現時点でEXPERIMENTALなので、本番環境では導入し難いです。
しかし、この概念が実装されていないがためにasset関数で「N時間以内に更新がなかったらエラー」という内容を書いてしまっているのが現状です。
この機能がGAになったら、是非取り入れたいです。

Dagster Asset Checksについて調べた

概要

Dagsterのver upに伴い、今までのverでは無かったAsset Checksの機能について調べました。
より正確に言うと、Asset Checksは既にありましたが、Asset Checksが落ちた場合にDAGを中止するオプションが無かったため使っていませんでした。

基本 asset_checks関数

asset_checksとは、assetがmaterializeされた時点で発火するassetのテストのことで、下記のように書くことが出来ます。

from dagster import AssetCheckResult, Definitions, asset, asset_check


@asset
def hoge():
    return 1


@asset_check(asset=hoge)
def hoge_check():
    return AssetCheckResult(
        passed=True,
    )


defs = Definitions(
    assets=[hoge],
    asset_checks=[hoge_check],
)

asset hogeをmaterializeすると、下記画面のようにasset_checkもhogeがmaterializeされた時点で発火するようになっています。
asset_checkのデフォルトの名前は[asset関数名]_[asset_check関数名]になるようです。

assetをmaterializeするとasset_checkが発火する

asset関数の返り値を使いたい場合はasset_check関数の引数に取ります。

@asset
def hoge():
    return 1


@asset_check(asset=hoge)
def hoge_check(hoge):
    return AssetCheckResult(
        passed=bool(hoge == 2),
    )


defs = Definitions(
    assets=[hoge],
    asset_checks=[hoge_check],
)

この例では、AssetCheckResultをFalseになるように返していますが、AssetCheckの実行結果としては正常に終了したため実行結果の画面は緑で表現されます。(ややこしいです)
変わりに、ログに赤色でAssetCheckが失敗したことが表現されています。

asset, asset_checkの実行結果としては正常終了したため緑で表される
ログには赤でasset_checkが失敗したことが表される

1つのasset_check関数内に複数のチェック項目を設ける

1つの関数内で複数のチェック項目を設けたい場合、@multi_asset_checkを使います。
multi_asset_checkにチェック項目をまとめることで、無駄な計算ノードの起動・終了に伴う無駄を省くことができます。
また、同階層のアセットを一つのmulti_asset_checkの依存先にすることで、階層ごとにアセットが満たすべき制約を守れているか保証することができます。

@asset
def my_asset_one():
    return 1

@asset
def my_asset_two():
    return 2


@multi_asset_check(
    specs=[
        AssetCheckSpec(name="asset_check_one", asset="my_asset_one"),
        AssetCheckSpec(name="asset_check_two", asset="my_asset_two"),
    ]
)
def the_check(context: AssetCheckExecutionContext) -> Iterable[AssetCheckResult]:
    yield AssetCheckResult(
        passed=False,
        severity=AssetCheckSeverity.WARN,
        description="The asset is over 0.5",
        asset_key="my_asset_one",
    )

    yield AssetCheckResult(
        passed=True,
        description="The asset is fresh.",
        asset_key="my_asset_two",
    )


defs = Definitions(
    assets=[my_asset_one, my_asset_two],
    asset_checks=[the_check],
)

リネージとログは次のようになります。

multi_asset_checkのDAG
multi_asset_checkのログ

asset関数内にcheckまで書く

asset_check関数を書かずに、asset関数内にチェック項目を書くことも出来ます。
こちらはDagsterドキュメントの例をそのまま使います。

import pandas as pd

from dagster import (
    AssetCheckResult,
    AssetCheckSpec,
    AssetExecutionContext,
    Definitions,
    Output,
    asset,
)


@asset(check_specs=[AssetCheckSpec(name="orders_id_has_no_nulls", asset="orders")])
def orders(context: AssetExecutionContext):
    orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})

    # save the output and indicate that it's been saved
    orders_df.to_csv("orders")
    yield Output(value=None)

    # check it
    num_null_order_ids = orders_df["order_id"].isna().sum()
    yield AssetCheckResult(
        passed=bool(num_null_order_ids == 0),
    )


defs = Definitions(assets=[orders])
asset関数の実行結果(asset_check関数のノードは表示されない)
asset関数の実行ログにAssetCheckResultを返した跡が確認できる

asset_check factory pattern

一つのasset_check関数のテンプレートに対して、複数の設定項目を使いまわしたいときに有効なfactory patternです。
公式ドキュメントの実装をそのまま記しました。
この例では、接続先DBのtable(SQL)を変えてテストを行っています。

@assetdef orders(): ...


@asset
def items(): ...


def make_check(check_blob: Mapping[str, str]) -> AssetChecksDefinition:
    @asset_check(
        name=check_blob["name"],
        asset=check_blob["asset"],
        required_resource_keys={"db_connection"},
    )
    def _check(context):
        rows = context.resources.db_connection.execute(check_blob["sql"])
        return AssetCheckResult(passed=len(rows) == 0, metadata={"num_rows": len(rows)})

    return _check


check_blobs = [
    {
        "name": "orders_id_has_no_nulls",
        "asset": "orders",
        "sql": "select * from orders where order_id is null",
    },
    {
        "name": "items_id_has_no_nulls",
        "asset": "items",
        "sql": "select * from items where item_id is null",
    },
]

defs = Definitions(
    assets=[orders, items],
    asset_checks=[make_check(check_blob) for check_blob in check_blobs],
    resources={"db_connection": MagicMock()},
)

AssetCheckResultをカスタマイズする

エラーレベル

ERRORやWARNINGなどエラーレベルを設定することができます。
asset_check関数が返すAssetCheckResultのseverityにエラーレベルを設定することができます。

is_serious = ...
return AssetCheckResult(
    passed=False,
    severity=AssetCheckSeverity.ERROR if is_serious else AssetCheckSeverity.WARN,
)

メタデータ

メタデータを付加することも可能です。
AssetCheckResultのmetadataに辞書を設定します。

return AssetCheckResult(
    passed=bool(num_null_order_ids == 0),
    metadata={
        "num_null_order_ids": int(num_null_order_ids),
    },
)

下流アセットのブロック

上流asset Aに対するasset_check関数であるasset check Aがpassed=Falseになると、asset Aに依存する下流assetはmaterializeされることがなくなります。
コードとしては、@asset_checkデコレータの引数にblocking=Trueに設定することで実現します。

@asset
def upstream_asset():
    pass


@asset_check(asset=upstream_asset, blocking=True)
def check_upstream_asset():
    return AssetCheckResult(passed=False)


@asset(deps=[upstream_asset])
def downstream_asset():
    pass


defs = Definitions(
    assets=[upstream_asset, downstream_asset], asset_checks=[check_upstream_asset]
)
上記コードのリネージ、asset_checkはassetの追加情報として表現される
上流assetのasset_checkが落ちたため、下流assetがmaterializeされない

asset_checkを含めた・含めない実行

from dagster import (
    AssetSelection,
    Definitions,
    ScheduleDefinition,
    asset,
    asset_check,
    define_asset_job,
)


@asset
def my_asset(): ...


@asset_check(asset=my_asset)
def check_1(): ...


@asset_check(asset=my_asset)
def check_2(): ...


# my_assetとそのassetに対するasset_checkをジョブに含める
my_job = define_asset_job("my_job", selection=AssetSelection.assets(my_asset))


# asset_checkを含めずに, my_assetを含める
my_asset_only_job = define_asset_job(
    "my_asset_only_job",
    selection=AssetSelection.assets(my_asset).without_checks(),
)

# my_assetを含めずにasset_checkのみを含める
checks_only_job = define_asset_job(
    "checks_only_job", selection=AssetSelection.checks_for_assets(my_asset)
)

# my_assetを含めずに, asset_check check1 のみを含める
check_1_job = define_asset_job("check_1_job", selection=AssetSelection.checks(check_1))

# schedule my_job to run every day at midnight
basic_schedule = ScheduleDefinition(job=my_job, cron_schedule="0 0 * * *")

defs = Definitions(
    assets=[my_asset],
    asset_checks=[check_1, check_2],
    jobs=[my_job, my_asset_only_job, checks_only_job, check_1_job],
    schedules=[basic_schedule],
)
job一覧
check_1_jobのリネージ
my_asset_only_jobのリネージ
checks_only_jobのリネージ
my_jobのリネージ

assetを含むjobのみがmaterializeボタンがあることが確認できます。
また、jobにassetが無いときに限り、asset_check関数がノードとして現れることが分かります。assetがある場合にはassetのノードの下にcheckという項目がつくことが分かります。

asset checkのサブセット化

multi_asset_checkではcan_subset=Trueを設定するとアセットの一部だけのチェックを実行できます。

@multi_asset_check(
    specs=[
        AssetCheckSpec(name="asset_check_one", asset="asset_one"),
        AssetCheckSpec(name="asset_check_two", asset="asset_two"),
    ],
    can_subset=True,
)
def the_check(context: AssetCheckExecutionContext) -> Iterable[AssetCheckResult]:
    if (
        AssetCheckKey(AssetKey("asset_one"), "asset_check_one")
        in context.selected_asset_check_keys
    ):
        yield AssetCheckResult(
            passed=True, metadata={"foo": "bar"}, check_name="asset_check_one"
        )
    if (
        AssetCheckKey(AssetKey("asset_two"), "asset_check_two")
        in context.selected_asset_check_keys
    ):
        yield AssetCheckResult(
            passed=True, metadata={"foo": "bar"}, check_name="asset_check_two"
        )

上記のコードを解説します。
まず、AssetCheckKeyAssetCheckKey(アセットキー, アセットチェックキー)でassetに対するasset_checkを指定できます。
asset_checkは複数のassetに掛けることが出来ますので、assetまで指定してAssetCheckKeyオブジェクトとなります。
AssetCheckExecutionContextcontext.selected_asset_check_keysはどのアセットチェックが選択されているかの情報が含まれているため下記のコードは、「asset_oneに対するasset_check_oneが現在のアセットチェックの実行に含まれていたら、asset_check_oneという名のAssetCheckResultを返す」という実装になっています。

    if (
        AssetCheckKey(AssetKey("asset_one"), "asset_check_one")
        in context.selected_asset_check_keys
    ):
        yield AssetCheckResult(
            passed=True, metadata={"foo": "bar"}, check_name="asset_check_one"
        )

このように実装することで、asset_oneまたは、asset_twoのどちらか一方のアセットがmaterializeされたときに一方のチェック結果を返すことができます。

multi_assetの中で各assetに対してasset checkを行う

@multi_assetは複数のassetを出力する関数でした。
この出力のそれぞれに対してasset checkを行いたい場合の実装です。

@multi_asset(
    specs=[
        AssetSpec("multi_asset_piece_1", group_name="asset_checks", skippable=True),
        AssetSpec("multi_asset_piece_2", group_name="asset_checks", skippable=True),
    ],
    check_specs=[AssetCheckSpec("my_check", asset="multi_asset_piece_1")],
    can_subset=True,
)
def multi_asset_1_and_2(context: AssetExecutionContext):
    if AssetKey("multi_asset_piece_1") in context.selected_asset_keys:
        yield MaterializeResult(asset_key="multi_asset_piece_1")
    # The check will only execute when multi_asset_piece_1 is materialized
    if (
        AssetCheckKey(AssetKey("multi_asset_piece_1"), "my_check")
        in context.selected_asset_check_keys
    ):
        yield AssetCheckResult(passed=True, metadata={"foo": "bar"})
    if AssetKey("multi_asset_piece_2") in context.selected_asset_keys:
        # No check on multi_asset_piece_2
        yield MaterializeResult(asset_key="multi_asset_piece_2")

1番目と3番目のifは、assetがmaterializeされている場合に発火しMaterializedResultを返します。
2番目のifはmulti_asset_piece_1がmaterializeされて、それのasset_checkであるmy_checkが実行されている場合、AssetCheckResultを返します。

multi_asset_piece_1のみをmaterializeするとasset_checkが実行される
multi_asset_piece_2のみをmaterializeするとasset_checkが実行されない
multi_asset_piece_1とmulti_asset_piece_2をmaterializeするとasset_checkが実行される

結果はすべてこのようなグラフになる。

AssetやSource Assetの鮮度チェックを行う

Source AssetやAssetにいつ変更があったのかをチェックすることができます。
下記のように書くことで、source_tablesというsource_tablesというAssetに対して鮮度チェックを行うことができます。

from datetime import timedelta
from dagster import build_last_update_freshness_checks, AssetCheckDefinition

source_table_freshness_checks: AssetCheckDefinition = build_last_update_freshness_checks(
    assets=[source_tables],
    lower_bound_delta=timedelta(hours=2),
)

source_tables assetは、ObserveResultをyieldし、そのmetadataとして最後に更新された時刻の情報を載せます。

@multi_observable_source_asset(specs=asset_specs)
def source_tables(snowflake: SnowflakeResource):
    with snowflake.get_connection() as conn:
        freshness_results = fetch_last_updated_timestamps(
            snowflake_connection=conn.cursor(),
            tables=table_names,
            schema=TABLE_SCHEMA,
        )
        for table_name, last_updated in freshness_results.items():
            yield ObserveResult(
                asset_key=table_name,
                metadata={
                    "dagster/last_updated_timestamp": MetadataValue.timestamp(
                        last_updated
                    )
                },
            )

なお、Snowflakeにはdagster_snowflake.fetch_last_updated_timestampsというテーブルが最後に更新された時刻を簡単に取得してくれる関数があるようです。