Python BigQuery ClientのRowIteratorを理解する - ページングとデータ取得の仕組み

※ 本記事はAIによって書かれています。

PythonでBigQueryを使う際、クエリ結果を扱うRowIteratorの挙動を正しく理解していますか?この記事では、RowIteratorの内部実装を詳しく解説し、ネットワーク通信とローカルデータアクセスのタイミングを明らかにします。

はじめに

BigQueryのPythonクライアントライブラリを使ってクエリを実行すると、結果はRowIteratorオブジェクトとして返されます。

query_job = client.query("SELECT * FROM dataset.table")
results = query_job.result()  # RowIteratorを取得

for row in results:
    print(row.name, row.age)

このシンプルなコードの裏側で、実は複雑なページング処理が行われています。本記事では、その仕組みを徹底的に解説します。

RowIteratorの基本的な挙動

遅延評価とページング

RowIteratorは 遅延評価(lazy evaluation)ページング を組み合わせたイテレータです。重要なポイントは以下の通りです:

  1. 初回アクセス時にのみネットワーク通信が発生
  2. ページ単位でデータを取得
  3. 取得したページはメモリにキャッシュ
  4. 次のページが必要になったタイミングで追加取得
データ取得フローの可視化

RowIteratorがどのタイミングでネットワーク通信を行い、どのタイミングでローカルキャッシュを使用するかを図で見てみましょう。

フロー図

この図から、以下のことが分かります:

  1. 🌐 ネットワーク通信: 最初のイテレーションとページ境界でのみ発生
  2. 💾 ローカルキャッシュ: 同一ページ内では配列からの高速アクセス

クラス階層構造

RowIteratorは以下のような継承関係になっています:

google.api_core.page_iterator.Iterator
    ↓
google.api_core.page_iterator.HTTPIterator
    ↓
google.cloud.bigquery.table.RowIterator

この階層構造により、HTTP通信によるページング処理が実装されています。

重要な処理のコード解説

1. Iterator基底クラス - イテレーションの骨格

まず、基底となる`Iterator`クラスを見ていきます。

class Iterator:
    """ページングされたAPIメソッドのための基底イテレータ"""
    
    def __init__(
        self,
        client,
        item_to_value=_item_to_value_identity,
        page_token=None,
        max_results=None,
    ):
        self._started = False           # イテレーション開始フラグ
        self.__active_iterator = None   # 💾 現在アクティブなイテレータ
        self.page_number = 0            # 現在のページ番号
        self.next_page_token = page_token  # 🔑 次のページトークン
        self.num_results = 0            # 取得済み結果数
    
    def __iter__(self):
        """イテレータ自身を返す"""
        if self._started:
            raise ValueError("Iterator has already started")
        self._started = True
        
        # 💡 ページイテレータを取得して行イテレータを作成
        self.__active_iterator = self._items_iter()
        return self.__active_iterator
    
    def _items_iter(self):
        """各アイテムをイテレートするジェネレータ"""
        # 🌐 ページを順次取得しながら、各ページ内のアイテムをyield
        for page in self._page_iter(increment=True):
            for item in page:
                self.num_results += 1
                yield item
                
                if self.max_results and self.num_results >= self.max_results:
                    return

ポイント:

  • _items_iter(): 行単位でイテレートする最上位ジェネレータ
  • _page_iter(): ページ単位でイテレートする中間ジェネレータ
  • 再イテレート不可: _startedフラグで制御
2. HTTPIterator - ページング実装の核心

次に、HTTP通信を行うHTTPIterator_next_page()メソッドを見てみましょう。

class HTTPIterator(Iterator):
    """HTTP/JSON API用のイテレータ"""
    
    def _next_page(self):
        """🌐 次のページをHTTP経由で取得
        
        ⭐ これがネットワーク通信が発生するメソッド!
        """
        # 次のページトークンがなければ終了
        if self.next_page_token is None:
            return None
        
        # 🔧 リクエストパラメータを構築
        params = self.extra_params.copy() if self.extra_params else {}
        params[self._PAGE_TOKEN] = self.next_page_token
        
        # max_resultsを考慮してページサイズを調整
        if self.max_results is not None:
            remaining = self.max_results - self.num_results
            if self._page_size is not None:
                params[self._MAX_RESULTS] = min(remaining, self._page_size)
            else:
                params[self._MAX_RESULTS] = remaining
        
        # 🌐 HTTPリクエストを実行!
        response = self.api_request(
            method=self._HTTP_METHOD,  # "GET"
            path=self.path,
            query_params=params
        )
        
        # レスポンスからデータを抽出
        items = response.get(self._items_key, ())
        
        # 🔑 次のページトークンを更新
        self.next_page_token = response.get(self._next_token)
        
        # 💾 Pageオブジェクトを作成
        page = Page(self, items, self.item_to_value)
        
        return page

重要ポイント:

  • このメソッドだけがHTTPリクエストを送信
  • next_page_tokenでページング状態を管理
  • next_page_tokenNoneになったら全ページ取得完了
3. Pageクラス - ローカルデータのイテレーション

ページ内のデータは、単純な配列アクセスで取得されます。

class Page:
    """単一ページの結果を表すクラス"""
    
    def __init__(self, parent, items, item_to_value, raw_page=None):
        self._parent = parent
        self._items = items  # 💾 メモリ上のデータ
        self._item_to_value = item_to_value
        self._index = 0  # 現在の読み取り位置
    
    def __next__(self):
        """💾 次のアイテムをローカルから取得
        
        ⭐ ネットワーク通信は発生しない!
        """
        if self._index >= len(self._items):
            raise StopIteration
        
        # 💾 メモリ上の配列から取得
        item = self._items[self._index]
        self._index += 1
        
        # アイテムを変換して返す
        return self._item_to_value(self._parent, item)

ポイント:

  • _itemsは💾メモリ上の配列
  • __next__()は配列から順次取得するだけ(高速)
  • ネットワーク通信は一切発生しない
4. RowIterator - BigQuery固有の実装

BigQuery専用のRowIteratorクラスは、JSON行データをRowオブジェクトに変換します。

class RowIterator(HTTPIterator):
    """BigQueryの行イテレータ"""
    
    def __init__(
        self,
        client,
        api_request,
        path,
        schema,
        page_token=None,
        max_results=None,
        page_size=None,
        **kwargs
    ):
        # 親クラスを初期化
        super(RowIterator, self).__init__(
            client,
            api_request,
            path,
            item_to_value=_item_to_row,  # ⭐ JSON → Row変換
            items_key="rows",             # ⭐ BigQueryは"rows"キー
            page_token=page_token,
            max_results=max_results,
            page_size=page_size,
        )
        
        self._schema = schema
        self._field_to_index = self._make_field_to_index()


def _item_to_row(iterator, resource):
    """JSONアイテムをRowオブジェクトに変換"""
    return Row(
        _row_tuple_from_json(resource, iterator.schema),
        iterator._field_to_index
    )

DataFrameへの変換

to_dataframe() - 全データを一度に取得
def _to_dataframe_tabledata_list(self, dtypes):
    """tabledata.list APIでDataFrameを構築"""
    column_names = [field.name for field in self.schema]
    frames = []
    
    # 🌐 全ページをループで取得
    for page in self.pages:
        # 💾 ページからDataFrameを作成
        page_dataframe = self._page_to_dataframe(
            page, column_names, dtypes
        )
        frames.append(page_dataframe)
    
    # 💾 全DataFrameを結合
    if frames:
        return pandas.concat(frames, ignore_index=True)
    else:
        return pandas.DataFrame(columns=column_names)

⚠️ 注意: このメソッドは全ページを一度にメモリに読み込みます。大きなテーブルではメモリ不足の可能性があります。

to_dataframe_iterable() - ストリーム処理

メモリ効率的な処理には、`to_dataframe_iterable()`を使用します。

def to_dataframe_iterable(self, bqstorage_client=None, dtypes=None):
    """ページ単位でDataFrameをストリーム処理
    
    ✅ メモリ効率的!
    """
    column_names = [field.name for field in self.schema]
    
    # 🌐 ページを1つずつ取得
    for page in self.pages:
        # 💾 1ページ分のDataFrameを作成
        page_df = self._page_to_dataframe(page, column_names, dtypes)
        
        # ⭐ yieldで返す = メモリを保持しない
        yield page_df


# 使用例
for df_chunk in row_iterator.to_dataframe_iterable():
    # 💾 1ページ分だけメモリに存在
    process_chunk(df_chunk)
    # ループを抜けるとdf_chunkは破棄される

実行フローの可視化

パターンA: 行単位のイテレーション
ユーザーコード: for row in iterator
    ↓
Iterator.__iter__()
    ↓ _items_iter()を作成
_items_iter()
    ↓ _page_iter()を呼び出し
_page_iter()
    ↓ 🌐 _next_page()を呼び出し
HTTPIterator._next_page()
    ↓ 🌐 HTTPリクエスト送信
BigQuery API
    ↓ レスポンス返却
Page作成 💾
    ↓ Pageから1行取り出し
_item_to_row() → Row変換
    ↓
ユーザーコードにRow返却

[ページ内の残りの行]
    ↓ 💾 Pageから順次取り出し (ネットワーク通信なし)
    
[ページ終了]
    ↓ _page_iter()が次のページ要求
    ↓ 🌐 _next_page()再度呼び出し
パターンB: DataFrameへの一括変換
ユーザーコード: iterator.to_dataframe()
    ↓
_to_dataframe_tabledata_list()
    ↓
for page in self.pages:
    Page 1取得 🌐 → DataFrame変換 💾
    Page 2取得 🌐 → DataFrame変換 💾
    Page 3取得 🌐 → DataFrame変換 💾
    ↓
pandas.concat() 💾 # 全て結合
    ↓
完全なDataFrame返却

実践的な使用パターン

パターン1: 基本的なイテレーション(推奨)
# シンプルで効率的
query_job = client.query("SELECT * FROM table")
for row in query_job.result():
    print(f"{row.name}: {row.age}")
    # ページングは自動的に処理される
パターン2: 全データをDataFrameに
# ⚠️ 注意: 全データをメモリに読み込む
df = query_job.result().to_dataframe()
print(df.head())
パターン3: ストリーム処理(大量データ推奨)
# メモリ効率的
for df_chunk in query_job.result().to_dataframe_iterable():
    # 各ページを処理
    process(df_chunk)
    # 処理後、メモリは解放される
パターン4: ページサイズの制御
# 1ページ500行ずつ取得
results = query_job.result(page_size=500)
for row in results:
    process(row)

パフォーマンスのTips

ネットワーク通信が発生するタイミング 🌐
  1. `HTTPIterator._next_page()` - ページ取得時
  2. 初回イテレーション時
  3. ページ境界を超える時
ローカルデータのイテレーション 💾
  1. Page.__next__() - ページ内の行取得
  2. 同一ページ内でのforループ
  3. _items配列へのアクセス
最適化のポイント
      1. 適切なpage_sizeを設定

デフォルトは環境依存ですが、通常数千行です。

results = query_job.result(page_size=1000)
      1. BigQuery Storage APIを使用

高速化のために利用できます。

df = query_job.result().to_dataframe(create_bqstorage_client=True)
      1. 大量データは`to_dataframe_iterable()`を使用

メモリ効率的な処理が可能です。

      1. 再イテレート不可に注意

RowIteratorは1回しかイテレートできません。

# ❌ 2回目のイテレートは不可
iterator = query_job.result()
for row in iterator:
    print(row)  # 1回目: OK

for row in iterator:
    print(row)  # 2回目: StopIteration例外

# ✅ 複数回使う場合はリスト化
rows = list(query_job.result())
for row in rows:
    print(row)  # 何度でもOK

まとめ

RowIteratorの3層構造
Iterator._items_iter()     ← 行単位のイテレーション
    ↓
Iterator._page_iter()      ← ページ単位のイテレーション  
    ↓
HTTPIterator._next_page()  ← HTTPリクエスト実行
重要なクラスの役割
クラス名 役割
Iterator イテレーションの骨格を提供
HTTPIterator HTTP通信によるページング実装
Page 1ページ分のデータをメモリに保持
RowIterator BigQuery固有の行処理
キーポイント
  1. ネットワーク通信は_next_page()のみで発生
  2. ページ内は単純な配列アクセス(高速)
  3. 遅延評価により必要な時だけデータを取得
  4. ページングは自動で行われる

この構造により、BigQueryは大量データを効率的に処理できます。

参考リンク