※ 本記事はAIによって書かれています。
PythonでBigQueryを使う際、クエリ結果を扱うRowIteratorの挙動を正しく理解していますか?この記事では、RowIteratorの内部実装を詳しく解説し、ネットワーク通信とローカルデータアクセスのタイミングを明らかにします。
- はじめに
- RowIteratorの基本的な挙動
- クラス階層構造
- 重要な処理のコード解説
- DataFrameへの変換
- 実行フローの可視化
- 実践的な使用パターン
- パフォーマンスのTips
- まとめ
- 参考リンク
はじめに
BigQueryのPythonクライアントライブラリを使ってクエリを実行すると、結果はRowIteratorオブジェクトとして返されます。
query_job = client.query("SELECT * FROM dataset.table") results = query_job.result() # RowIteratorを取得 for row in results: print(row.name, row.age)
このシンプルなコードの裏側で、実は複雑なページング処理が行われています。本記事では、その仕組みを徹底的に解説します。
RowIteratorの基本的な挙動
遅延評価とページング
RowIteratorは 遅延評価(lazy evaluation) と ページング を組み合わせたイテレータです。重要なポイントは以下の通りです:
- 初回アクセス時にのみネットワーク通信が発生
- ページ単位でデータを取得
- 取得したページはメモリにキャッシュ
- 次のページが必要になったタイミングで追加取得
データ取得フローの可視化
RowIteratorがどのタイミングでネットワーク通信を行い、どのタイミングでローカルキャッシュを使用するかを図で見てみましょう。

この図から、以下のことが分かります:
- 🌐 ネットワーク通信: 最初のイテレーションとページ境界でのみ発生
- 💾 ローカルキャッシュ: 同一ページ内では配列からの高速アクセス
クラス階層構造
RowIteratorは以下のような継承関係になっています:
google.api_core.page_iterator.Iterator
↓
google.api_core.page_iterator.HTTPIterator
↓
google.cloud.bigquery.table.RowIteratorこの階層構造により、HTTP通信によるページング処理が実装されています。
重要な処理のコード解説
1. Iterator基底クラス - イテレーションの骨格
まず、基底となる`Iterator`クラスを見ていきます。
class Iterator: """ページングされたAPIメソッドのための基底イテレータ""" def __init__( self, client, item_to_value=_item_to_value_identity, page_token=None, max_results=None, ): self._started = False # イテレーション開始フラグ self.__active_iterator = None # 💾 現在アクティブなイテレータ self.page_number = 0 # 現在のページ番号 self.next_page_token = page_token # 🔑 次のページトークン self.num_results = 0 # 取得済み結果数 def __iter__(self): """イテレータ自身を返す""" if self._started: raise ValueError("Iterator has already started") self._started = True # 💡 ページイテレータを取得して行イテレータを作成 self.__active_iterator = self._items_iter() return self.__active_iterator def _items_iter(self): """各アイテムをイテレートするジェネレータ""" # 🌐 ページを順次取得しながら、各ページ内のアイテムをyield for page in self._page_iter(increment=True): for item in page: self.num_results += 1 yield item if self.max_results and self.num_results >= self.max_results: return
ポイント:
_items_iter(): 行単位でイテレートする最上位ジェネレータ_page_iter(): ページ単位でイテレートする中間ジェネレータ- 再イテレート不可:
_startedフラグで制御
2. HTTPIterator - ページング実装の核心
次に、HTTP通信を行うHTTPIteratorの_next_page()メソッドを見てみましょう。
class HTTPIterator(Iterator): """HTTP/JSON API用のイテレータ""" def _next_page(self): """🌐 次のページをHTTP経由で取得 ⭐ これがネットワーク通信が発生するメソッド! """ # 次のページトークンがなければ終了 if self.next_page_token is None: return None # 🔧 リクエストパラメータを構築 params = self.extra_params.copy() if self.extra_params else {} params[self._PAGE_TOKEN] = self.next_page_token # max_resultsを考慮してページサイズを調整 if self.max_results is not None: remaining = self.max_results - self.num_results if self._page_size is not None: params[self._MAX_RESULTS] = min(remaining, self._page_size) else: params[self._MAX_RESULTS] = remaining # 🌐 HTTPリクエストを実行! response = self.api_request( method=self._HTTP_METHOD, # "GET" path=self.path, query_params=params ) # レスポンスからデータを抽出 items = response.get(self._items_key, ()) # 🔑 次のページトークンを更新 self.next_page_token = response.get(self._next_token) # 💾 Pageオブジェクトを作成 page = Page(self, items, self.item_to_value) return page
重要ポイント:
- このメソッドだけがHTTPリクエストを送信
next_page_tokenでページング状態を管理next_page_tokenがNoneになったら全ページ取得完了
3. Pageクラス - ローカルデータのイテレーション
ページ内のデータは、単純な配列アクセスで取得されます。
class Page: """単一ページの結果を表すクラス""" def __init__(self, parent, items, item_to_value, raw_page=None): self._parent = parent self._items = items # 💾 メモリ上のデータ self._item_to_value = item_to_value self._index = 0 # 現在の読み取り位置 def __next__(self): """💾 次のアイテムをローカルから取得 ⭐ ネットワーク通信は発生しない! """ if self._index >= len(self._items): raise StopIteration # 💾 メモリ上の配列から取得 item = self._items[self._index] self._index += 1 # アイテムを変換して返す return self._item_to_value(self._parent, item)
ポイント:
_itemsは💾メモリ上の配列__next__()は配列から順次取得するだけ(高速)- ネットワーク通信は一切発生しない
4. RowIterator - BigQuery固有の実装
BigQuery専用のRowIteratorクラスは、JSON行データをRowオブジェクトに変換します。
class RowIterator(HTTPIterator): """BigQueryの行イテレータ""" def __init__( self, client, api_request, path, schema, page_token=None, max_results=None, page_size=None, **kwargs ): # 親クラスを初期化 super(RowIterator, self).__init__( client, api_request, path, item_to_value=_item_to_row, # ⭐ JSON → Row変換 items_key="rows", # ⭐ BigQueryは"rows"キー page_token=page_token, max_results=max_results, page_size=page_size, ) self._schema = schema self._field_to_index = self._make_field_to_index() def _item_to_row(iterator, resource): """JSONアイテムをRowオブジェクトに変換""" return Row( _row_tuple_from_json(resource, iterator.schema), iterator._field_to_index )
DataFrameへの変換
to_dataframe() - 全データを一度に取得
def _to_dataframe_tabledata_list(self, dtypes): """tabledata.list APIでDataFrameを構築""" column_names = [field.name for field in self.schema] frames = [] # 🌐 全ページをループで取得 for page in self.pages: # 💾 ページからDataFrameを作成 page_dataframe = self._page_to_dataframe( page, column_names, dtypes ) frames.append(page_dataframe) # 💾 全DataFrameを結合 if frames: return pandas.concat(frames, ignore_index=True) else: return pandas.DataFrame(columns=column_names)
⚠️ 注意: このメソッドは全ページを一度にメモリに読み込みます。大きなテーブルではメモリ不足の可能性があります。
to_dataframe_iterable() - ストリーム処理
メモリ効率的な処理には、`to_dataframe_iterable()`を使用します。
def to_dataframe_iterable(self, bqstorage_client=None, dtypes=None): """ページ単位でDataFrameをストリーム処理 ✅ メモリ効率的! """ column_names = [field.name for field in self.schema] # 🌐 ページを1つずつ取得 for page in self.pages: # 💾 1ページ分のDataFrameを作成 page_df = self._page_to_dataframe(page, column_names, dtypes) # ⭐ yieldで返す = メモリを保持しない yield page_df # 使用例 for df_chunk in row_iterator.to_dataframe_iterable(): # 💾 1ページ分だけメモリに存在 process_chunk(df_chunk) # ループを抜けるとdf_chunkは破棄される
実行フローの可視化
パターンA: 行単位のイテレーション
ユーザーコード: for row in iterator
↓
Iterator.__iter__()
↓ _items_iter()を作成
_items_iter()
↓ _page_iter()を呼び出し
_page_iter()
↓ 🌐 _next_page()を呼び出し
HTTPIterator._next_page()
↓ 🌐 HTTPリクエスト送信
BigQuery API
↓ レスポンス返却
Page作成 💾
↓ Pageから1行取り出し
_item_to_row() → Row変換
↓
ユーザーコードにRow返却
[ページ内の残りの行]
↓ 💾 Pageから順次取り出し (ネットワーク通信なし)
[ページ終了]
↓ _page_iter()が次のページ要求
↓ 🌐 _next_page()再度呼び出し
パターンB: DataFrameへの一括変換
ユーザーコード: iterator.to_dataframe()
↓
_to_dataframe_tabledata_list()
↓
for page in self.pages:
Page 1取得 🌐 → DataFrame変換 💾
Page 2取得 🌐 → DataFrame変換 💾
Page 3取得 🌐 → DataFrame変換 💾
↓
pandas.concat() 💾 # 全て結合
↓
完全なDataFrame返却
実践的な使用パターン
パターン1: 基本的なイテレーション(推奨)
# シンプルで効率的 query_job = client.query("SELECT * FROM table") for row in query_job.result(): print(f"{row.name}: {row.age}") # ページングは自動的に処理される
パターン2: 全データをDataFrameに
# ⚠️ 注意: 全データをメモリに読み込む df = query_job.result().to_dataframe() print(df.head())
パターン3: ストリーム処理(大量データ推奨)
# メモリ効率的 for df_chunk in query_job.result().to_dataframe_iterable(): # 各ページを処理 process(df_chunk) # 処理後、メモリは解放される
パターン4: ページサイズの制御
# 1ページ500行ずつ取得 results = query_job.result(page_size=500) for row in results: process(row)
パフォーマンスのTips
ネットワーク通信が発生するタイミング 🌐
- `HTTPIterator._next_page()` - ページ取得時
- 初回イテレーション時
- ページ境界を超える時
ローカルデータのイテレーション 💾
Page.__next__()- ページ内の行取得- 同一ページ内でのforループ
_items配列へのアクセス
最適化のポイント
-
-
- 適切なpage_sizeを設定
-
デフォルトは環境依存ですが、通常数千行です。
results = query_job.result(page_size=1000)
-
-
- BigQuery Storage APIを使用
-
高速化のために利用できます。
df = query_job.result().to_dataframe(create_bqstorage_client=True)
-
-
- 大量データは`to_dataframe_iterable()`を使用
-
メモリ効率的な処理が可能です。
-
-
- 再イテレート不可に注意
-
RowIteratorは1回しかイテレートできません。
# ❌ 2回目のイテレートは不可 iterator = query_job.result() for row in iterator: print(row) # 1回目: OK for row in iterator: print(row) # 2回目: StopIteration例外 # ✅ 複数回使う場合はリスト化 rows = list(query_job.result()) for row in rows: print(row) # 何度でもOK
まとめ
RowIteratorの3層構造
Iterator._items_iter() ← 行単位のイテレーション
↓
Iterator._page_iter() ← ページ単位のイテレーション
↓
HTTPIterator._next_page() ← HTTPリクエスト実行
重要なクラスの役割
| クラス名 | 役割 |
|---|---|
| Iterator | イテレーションの骨格を提供 |
| HTTPIterator | HTTP通信によるページング実装 |
| Page | 1ページ分のデータをメモリに保持 |
| RowIterator | BigQuery固有の行処理 |
キーポイント
- ネットワーク通信は
_next_page()のみで発生 - ページ内は単純な配列アクセス(高速)
- 遅延評価により必要な時だけデータを取得
- ページングは自動で行われる
この構造により、BigQueryは大量データを効率的に処理できます。
*1:https://github.com/googleapis/python-bigquery/blob/main/google/cloud/bigquery/table.py:title=RowIteratorソースコード
*2:https://googleapis.dev/python/google-api-core/latest/_modules/google/api_core/page_iterator.html:title=HTTPIteratorソースコード
*3:https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.table.RowIterator:title=公式ドキュメント