BigQueryのsamplingについて調べた

サンプリングとは

BigQueryのサンプリングについて理解を深めたので記します。
サンプリングとは、すべてのレコードを対象にするまでもないが、任意に抽出したn%のレコードに対してクエリを適用したいときに使う技術です。
クエリの最後にTABLESAMPLE SYSTEM (N PERCENT)(Nは小数でもOK)をつけることによってサンプリングの指示をすることができます。
cloud.google.com

主な使用用途として、Dataplexによる品質測定やデータプロファイルが挙げられます。
品質測定では、すべてのレコードでテストするには料金がかかりすぎるので、任意に抽出したレコードでテストを通過すれば全体で通過したと見なすという考えで使われます。
cloud.google.com
データプロファイルでは、標本の統計値を測定して母集団の統計値とする考えで使われています。
cloud.google.com
TerraformのDataplexの項目にもsampling_percentという設定項目があります。
registry.terraform.io

サンプリングに関する誤解

サンプリングは全体のレコードから指定した%のレコードを取得するものではありません。

それを示すために実験を行います。
まず50個のレコードを持つテーブルを用意します。

SELECT
  COUNT(*)
FROM
  `sample-project.sample_dataset.sample_table`
-- 50

このテーブルに対してTABLESAMPLE SYSTEを使用しても結果は5ではなく、50のままです。

SELECT
  COUNT(*)
FROM
  `sample-project.sample_dataset.sample_table`
TABLESAMPLE SYSTEM (10 PERCENT)
-- 50

この現象はなぜ起こるのかと言うと、サンプリング%はストレージブロックの割合を示しているからです。
下記の資料に書かれている通り、BigQueryのテーブル及びパーティションテーブルは1GBごとにストレージブロックに自動的に分けられています。

通常、BigQuery は、テーブルまたはテーブル パーティションが約 1 GB より大きい場合、ブロックに分割します。小さいテーブルは、1 つのデータブロックで構成されている場合があります。その場合、TABLESAMPLE 句はテーブル全体を読み取ります。サンプリングの割合がゼロより大きく、テーブルが空でない場合、テーブル サンプリングで常に結果が返されます。

cloud.google.com
上記の実験で使ったテーブルは50レコードと小さすぎるため、1つのストレージブロックの中に全レコードがあると考えられます。
そのため、「全体で1つのストレージブロックの中から、10%分のストレージブロックを選択して」と命令しても、対象は1つのストレージブロックのままだったというわけです。

サンプリングの料金は?

BigQueryはスキャンした分に応じて料金がかかります。
サンプリングの場合、抽出されたストレージブロック分のみがスキャンの対象になるようです。
パーティションをまたいでいても、スキャンしたブロック単位の料金となるようです。
ユーザーがクエリで物理領域に関して指定できる最小の単位がパーティションだと思っていましたが、サンプリングを使うと更に小さい単位であるブロックまでを抽出単位にすることが出来るようです。
ただ、ブロックはパーティションと違い、IDで指定することがは不可能なため、正確な物理領域の指定は出来ません。

下図のように赤い部分のみがスキャンされ、その分のスキャン料金が加算されるようです。(全体のうち50%が指定されるのか、パーティションがあった場合は、各パーティションのうち50%が指定されるのかは分かっていません。)

20個ブロックがある場合に50%サンプリングすると10個ブロックが選択される

Dagsterのjob factory patternを通常のasset関数から徐々に理解する

概要

Dagsterのデザインパターンであるjobクラス+asset_factoryメソッドのパターンを理解するために、最も基本的な概念であるasset関数から徐々に理解していくための解説を書きました。

asset

もっともシンプルなasset関数1つを含んだasset_jobの実装になります。
weather_dataはopen wheather apiにリクエストし、結果を返すassetになります。

import os

import requests
from dagster import (
    AssetExecutionContext,
    Config,
    Definitions,
    EnvVar,
    MetadataValue,
    asset,
    define_asset_job,
)


class WeatherApiConfig(Config):
    api_key: str
    city: str


@asset(name="weather_data")
def weather_data(context: AssetExecutionContext, config: WeatherApiConfig):
    """
    OpenWeatherMap APIから天気情報を取得するDagsterアセット
    """
    base_url = "http://api.openweathermap.org/data/2.5/weather"

    params = {
        "q": config.city,
        "appid": config.api_key,
        "units": "metric",  # 摂氏温度で取得
    }

    response = requests.get(base_url, params=params)
    response.raise_for_status()  # エラーチェック

    weather_data = response.json()

    # 結果を表示
    context.log.info(weather_data)


asset_job = define_asset_job(
    name="weather_data_download_job",
    selection="weather_data",
    config={
        "ops": {
            "weather_data": {
                "config": {
                    "api_key": os.environ.get("OPENWEATHERMAP_API_KEY"),
                    "city": "Tokyo",
                }
            }
        }
    },
)


defs = Definitions(
    assets=[weather_data],
    jobs=[asset_job],
)

asset factory pattern

前章の天気取得対象は東京のみでしたが、複数都市(東京、ニューヨーク)の天気を取得対象としたいと思います。
weather_dataのasset関数は使いまわせそうなので、configのみを変えれば良いことが分かります。
API_KEYは全体で使うためconfigから外し、変わりに摂氏で取得するか?華氏で取得するか?のGETパラメタをconfigに入れることにします。

import os

import requests
from dagster import (
    AssetExecutionContext,
    Config,
    Definitions,
    EnvVar,
    MetadataValue,
    asset,
    define_asset_job,
)


cities = ["Tokyo", "New York"]


class WeatherApiConfig(Config):
    city: str
    units: str


def weather_data_asset_factory(city_name: str):
    @asset(name=f"weather_data_in_{city_name}")
    def weather_data(context: AssetExecutionContext, config: WeatherApiConfig):
        """
        OpenWeatherMap APIから天気情報を取得するDagsterアセット
        """
        base_url = "http://api.openweathermap.org/data/2.5/weather"

        params = {
            "q": config.city,
            "appid": os.environ.get("OPENWEATHERMAP_API_KEY"),
            "units": config.units,  # 摂氏温度で取得
        }

        response = requests.get(base_url, params=params)
        response.raise_for_status()  # エラーチェック

        weather_data = response.json()

        # 結果を表示
        context.log.info(weather_data)

    return weather_data


# asset関数を作成
weather_datas = [weather_data_asset_factory(city.replace(" ", "_")) for city in cities]

# config作成
config = {
    "ops": {
        f"weather_data_in_{city.replace(" ", "_")}": {
            "config": {
                "city": city,
                "units": {"Tokyo": "metric", "New York": "imperial"}[city],
            }
        }
        for city in cities
    }
}

asset_job = define_asset_job(
    name="major_cities_weather_data_download_job",
    selection=weather_datas,
    config=config,
)

defs = Definitions(
    assets=weather_datas,
    jobs=[asset_job],
)
jobに2つのassetがあることが確認できる
launchpadでは、2つのassetに対してconfigを設定していることがわかる
job実行画面では2つのassetがmaterializeされていることがわかる

job factory pattern

最後にjob facotryのクラスを作成し、そのメソッドとしてasset factoryを定義していきます。
また、configの情報を別途JSONファイルにまとめたいと思います。

{
  "ops": {
    "weather_in_Tokyo": {
      "config": {
        "city": "Tokyo",
        "units": "metric"
      }
    },
    "weather_in_New_York": {
      "config": {
        "city": "New York",
        "units": "imperial"
      }
    }
  }
}

JobFactoryクラスに必要なのは、Jobを生成するcreate_jobメソッドと、Job内で仕様するasset factory関数、Asset一覧情報であるassetsのpropertyになります。

import json
import os
from pathlib import Path

import requests
from dagster import (
    AssetExecutionContext,
    AssetsDefinition,
    Config,
    Definitions,
    EnvVar,
    MetadataValue,
    asset,
    define_asset_job,
)

cities = ["Tokyo", "New York"]


class WeatherApiConfig(Config):
    city: str
    units: str


class WeatherJobFactory:

    def __init__(self, config_path: Path):
        self._config = json.loads(config_path.read_text())

        # job内のasset
        self._weather_assets: list[AssetsDefinition] = [
            self._weather_asset_factory(city) for city in ["Tokyo", "New_York"]
        ]

    @property
    def assets(self) -> AssetsDefinition:
        return self._weather_assets

    def create_asset_job(self):
        return define_asset_job(
            name="weather_data",
            selection=self.assets,
            config=self._config,
        )

    def _weather_asset_factory(self, city: str):
        @asset(name=f"weather_in_{city}")
        def weather_data(context: AssetExecutionContext, config: WeatherApiConfig):
            """
            OpenWeatherMap APIから天気情報を取得するDagsterアセット
            """
            base_url = "http://api.openweathermap.org/data/2.5/weather"

            params = {
                "q": config.city,
                "appid": os.environ.get("OPENWEATHERMAP_API_KEY"),
                "units": config.units,  # 摂氏温度で取得
            }

            response = requests.get(base_url, params=params)
            response.raise_for_status()  # エラーチェック

            weather_data = response.json()

            # 結果を表示
            context.log.info(weather_data)

        return weather_data


weather_job_factory = WeatherJobFactory(Path("./weather_config.json"))
defs = Definitions(
    assets=weather_job_factory.assets,
    jobs=[weather_job_factory.create_asset_job()],
)

こちらのコードのmeterialize結果もこのようになります。

job factory patternで生成したjobの実行結果

GitHub ActionsのScript Injectionを利用した攻撃

下記資料でGitHub Actionsのセキュリティの勉強をしていたら、GitHubリポジトリに設定されているsecretや環境変数をPull Requestによって盗めることが分かったので実験してみました。
speakerdeck.com
なお、本記事はGitHub Actionsを書く際のセキュリティ意識を高めるために記したものであり、悪用は厳禁です。


.github/workflows/print_pr_title.yamlを用意します。
これはジョブ単位でPASSWORDという環境変数をセットしています。PASSWORDはsecretsから取得しています。ステップ単位の説明ですが、まずパスワードを使った認証などの作業を行い、最後にレポートを出力するためにPull Requestのタイトル名を取得するような流れを簡略的に書いています。

name: Display PR Title

on:
  pull_request:
    types: [opened, edited, reopened]

jobs:
  display-pr-title:
    runs-on: ubuntu-latest
    env:
      PASSWORD: ${{ secrets.PASSWORD }}
    steps:
      - name: Checkout repository
        uses: actions/checkout@v4

      - name: Connect with password
        run: echo "passwordを使った処理";

      - name: Display Pull Request Title
        run: echo ${{ github.event.pull_request.title }}

secrets.PASSWORDは次のように設定されているとします。


通常の善意あるPRであれば、次のように表示されます。

このリポジトリに"${PASSWORD:0:1} ${PASSWORD#?}"というタイトルをつけてPull Requestを送ってみます。
下記画像の通り、d ummy_secretとsecretsの値が表示されてしまっています。
1文字目と2文字目に空白がありますが、実質すべて表示されているようなものです。

GitHub Actionsでは、ログの出力時にsecretsの値をマスクしますが、上記のように加工して出力されるとマスクの対象から外れてしまうためマスクの効果はあまり期待しないほうが良いです。


また、環境変数であればprintenvを使うと一覧を表示できます。
"a"; printenv; #というタイトルをつけると、一覧が表示されてしまいます。

dbt-osmosimのカラムレベルdescription伝播をSQLパーサーを用いてスクラッチ実装してみる

下記のtableとviewがあります。
salary viewはemployee tableとsalary tableを結合しています。
tableにはカラムレベルでdescriptionがありますが、viewには無い状態です。

employee table
salary table
salary view

viewは分析者が簡易的に作ることが多く、descriptionなどのメタデータが設定されないガチです。
そこで、依存tableのメタデータを依存先のviewに伝播させてしまおうという実験を行いました。

現状の状態 viewにカラムのdescriptionが設定されていない
やりたいこと viewのカラムdescriptionを依存テーブルのカラムdescriptionから引き継ぐ

dbt管理であればdbt-osmosisというプラグインにより、同じ名称のカラムに対して依存テーブルのdescriptionを伝播できるようです。
今回はdbt管理をしていない場合に、descriptionが伝播させるようスクラッチ実装してみました。

import argparse
import functools
import itertools
from dataclasses import dataclass

import sqlglot
from google.cloud import bigquery

client = bigquery.Client()


def validate_table_id(table_id: str):
    try:
        _, _, _ = table_id.split(".")
    except Exception as e:
        print(e)


def is_view(table_id: str) -> bool:
    table = get_table_ref(table_id)
    return table.table_type == "VIEW"


@functools.lru_cache
def get_table_ref(table_id: str) -> bigquery.table.Table:
    validate_table_id(table_id)
    table = client.get_table(table_id)
    return table


def get_dependencies(view_id: str) -> set[bigquery.table.Table]:
    """viewに依存しているtable一覧を抽出する"""
    view: bigquery.table.Table = get_table_ref(view_id)
    parsed = sqlglot.parse_one(view.view_query, read="bigquery")
    depending_tables = set()
    for node in parsed.find_all(sqlglot.exp.Table):
        if not node.parts or not node.db:
            continue
        # dataset.table
        if len(node.parts) == 2:
            table_id = (
                f'{view_id.split(".")[0]}.{node.parts[0].this}{node.parts[1].this}'
            )
        # project.dataset.table
        elif len(node.parts) == 3:
            table_id = ".".join([p.this for p in node.parts])
        else:
            raise ValueError()
        depending_tables.add(table_id)
    return depending_tables


def update_view_description(view_id: str):
    # 依存テーブル一覧
    depending_tables = get_dependencies(view_id)
    # 各依存テーブルのスキーマ
    schema_by_table_id: dict[str, list[bigquery.schema.SchemaField]] = {
        table_id: get_table_ref(table_id).schema for table_id in depending_tables
    }
    # カラム名: descriptionの辞書を作って平坦化
    depending_column_description_by_name: dict[str, str] = {
        column.name: column.description
        for column in itertools.chain.from_iterable(schema_by_table_id.values())
        if column.description
    }

    # view自身のスキーマ
    view = get_table_ref(view_id)
    view_schema: list[bigquery.schema.SchemaField] = view.schema

    # 新しく設定するカラムを設定
    new_schema = []
    for view_column in view_schema:
        # view columnのdescriptionが既にある or 依存テーブルの同じカラムにもdescriptionが無い
        if (
            view_column.description
            or view_column.name not in depending_column_description_by_name
        ):
            new_schema.append(view_column)
        else:
            # viewの対象カラムのdescriptionを追加してupdate
            new_description = depending_column_description_by_name[view_column.name]
            new_schema.append(
                bigquery.schema.SchemaField(
                    name=view_column.name,
                    field_type=view_column.field_type,
                    mode=view_column.mode,
                    description=new_description,
                )
            )

    # update
    view.schema = new_schema
    client.update_table(view, ["schema"])


def main(view_id: str):
    update_view_description(view_id)


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Update column descriptions for a BigQuery view."
    )
    parser.add_argument(
        "--view_id",
        required=True,
        help="The ID of the BigQuery view (project.dataset.view).",
    )

    args = parser.parse_args()
    main(args.view_id)

下記のように実行することでviewの各カラムのdescriptionが依存しているtableのカラムのdescriptionを引き継ぐことができます。

python update_view.py --view_id sample_project.sample_dataset.target_view
salary viewのスキーマ、descriptionが設定されたのが分かる

DataformとBigQueryでコメントアウトの記法が異なる罠を踏んだ

結論

BigQueryのSQLとDataformのsqlxはコメントの適用が異なることが分かりました。

これらはBigQueryでもDataformでもコメントとして認識されます。

-- SELECT 1

/*
SELECT 1
*/

#によるコメントは、BigQueryでは(標準SQL導入前のコメント記法で)コメントとして認識され実行できませんが、Dataformではクエリとして認識されるようです。

# SELECT 1

ブロックコメントの打ち消しは、BigQueryではクエリとして認識され実行できますが、Dataformではコメントとして認識されSQLにコンパイルできないようです。

# /*
SELECT 1
# */

実験

Dataform CLIで初期化を行い、下記のようなファイル構成を用意します。

   ~/src/tmp/dataform-exp2/sample-project   main ≡   23:00:33   0ms⠀
🦄  ls
 compile.json  📂 definitions  📂 includes   workflow_settings.yaml
   ~/src/tmp/dataform-exp2/sample-project   main ≡   23:00:35   11ms⠀
🦄  tree definitions/
definitions/
├── sample_view_00.sqlx
├── sample_view_01.sqlx
├── sample_view_02.sqlx
├── sample_view_03.sqlx
└── sample_view_04.sqlx

0 directories, 5 files
   ~/src/tmp/dataform-exp2/sample-project   main ≡   23:00:38   10ms⠀
🦄  bat definitions/*
───────┬───────────────────────────────────────────────────────────────────────────
       │ File: definitions/sample_view_00.sqlx
───────┼───────────────────────────────────────────────────────────────────────────
   1   │ config {
   2   │     type: 'view'
   3}
   45   │ SELECT 1
───────┴───────────────────────────────────────────────────────────────────────────
───────┬───────────────────────────────────────────────────────────────────────────
       │ File: definitions/sample_view_01.sqlx
───────┼───────────────────────────────────────────────────────────────────────────
   1   │ config {
   2   │     type: 'view'
   3}
   45   │ /*
   6   │ SELECT ${ref("sample_view_00")}
   7   │ */
───────┴───────────────────────────────────────────────────────────────────────────
───────┬───────────────────────────────────────────────────────────────────────────
       │ File: definitions/sample_view_02.sqlx
───────┼───────────────────────────────────────────────────────────────────────────
   1   │ config {
   2   │     type: 'view'
   3}
   45# /*
   6   │ SELECT ${ref("sample_view_00")}
   7# */
───────┴───────────────────────────────────────────────────────────────────────────
───────┬───────────────────────────────────────────────────────────────────────────
       │ File: definitions/sample_view_03.sqlx
───────┼───────────────────────────────────────────────────────────────────────────
   1   │ config {
   2   │     type: 'view'
   3}
   45# SELECT ${ref("sample_view_00")}
───────┴───────────────────────────────────────────────────────────────────────────
───────┬───────────────────────────────────────────────────────────────────────────
       │ File: definitions/sample_view_04.sqlx
───────┼───────────────────────────────────────────────────────────────────────────
   1   │ config {
   2   │     type: 'view'
   3}
   45-- SELECT ${ref("sample_view_00")}
───────┴───────────────────────────────────────────────────────────────────────────
   ~/src/tmp/dataform-exp2/sample-project   main ≡   23:00:41   23ms⠀
🦄

dataformコンソールでのエディタ表示

sample_view_00.sqlx (BQでもDataformでも実行可)
sample_view_01.sqlx (BQでもDataformでもコメントとして認識される)
sample_view_02.sqlx (BQでは実行可、Dataformではコメントとして認識される)
sample_view_03.sqlx (BQではコメントとして認識される、DataformではSQLにコンパイル可)
sample_view_04.sqlx (BQでもDataformでもコメントとして認識される)

compile結果

sqlx記法であるref関数が変換されているか否かを確認します。
sample_view_03.sqlxのみがsqlxとして認識され、SQLへコンパイルされていることが分かります。
その他はコメントとして認識され、コメントとしてコンパイルされています。

   ~/src/tmp/dataform-exp2/sample-project   main ≡   23:00:22   0ms⠀
🦄  dataform compile --json > compile.json
   ~/src/tmp/dataform-exp2/sample-project   main ≡   23:00:56   1.263s⠀
🦄  jq '.tables[] | {name: .target.name, query: .query}' compile.json
{
  "name": "sample_view_00",
  "query": "\n\nSELECT 1\n"
}
{
  "name": "sample_view_01",
  "query": "\n\n/*\nSELECT ${ref(\"sample_view_00\")}\n*/\n"
}
{
  "name": "sample_view_02",
  "query": "\n\n# /*\nSELECT ${ref(\"sample_view_00\")}\n# */\n"
}
{
  "name": "sample_view_03",
  "query": "\n\n# SELECT `dataform-sandbox.sample_dataset.sample_view_00`\n"
}
{
  "name": "sample_view_04",
  "query": "\n\n-- SELECT ${ref(\"sample_view_00\")}\n"
}
   ~/src/tmp/dataform-exp2/sample-project   main ≡   23:01:01   36ms⠀
🦄

dataformコンソールでのリネージ表示

リネージ

さらにヤバさがわかるように

今度は、さらにコトの重大さがわかるように、「意図していないクエリとして認識される」例を実験してみます。
BQでは、sample_view_02のSQLだけUNIONが有効となります。

🦄  bat definitions/*
───────┬────────────────────────────────────────────────────────────────────────────────────────────────
       │ File: definitions/sample_view_00.sqlx
───────┼────────────────────────────────────────────────────────────────────────────────────────────────
   1   │ config {
   2   │     type: 'view'
   3}
   45   │ SELECT 1
───────┴────────────────────────────────────────────────────────────────────────────────────────────────
───────┬────────────────────────────────────────────────────────────────────────────────────────────────
       │ File: definitions/sample_view_01.sqlx
───────┼────────────────────────────────────────────────────────────────────────────────────────────────
   1   │ config {
   2   │     type: 'view'
   3}
   45 + │ SELECT ${ref("sample_view_00")}
   6   │ /*
   7 + │ UNION ALL
   8   │ SELECT ${ref("sample_view_00")}
   9   │ */
───────┴────────────────────────────────────────────────────────────────────────────────────────────────
───────┬────────────────────────────────────────────────────────────────────────────────────────────────
       │ File: definitions/sample_view_02.sqlx
───────┼────────────────────────────────────────────────────────────────────────────────────────────────
   1   │ config {
   2   │     type: 'view'
   3}
   45 + │ SELECT ${ref("sample_view_00")}
   6# /*
   7 + │ UNION ALL
   8   │ SELECT ${ref("sample_view_00")}
   9# */
───────┴────────────────────────────────────────────────────────────────────────────────────────────────
───────┬────────────────────────────────────────────────────────────────────────────────────────────────
       │ File: definitions/sample_view_03.sqlx
───────┼────────────────────────────────────────────────────────────────────────────────────────────────
   1   │ config {
   2   │     type: 'view'
   3}
   45 + │ SELECT ${ref("sample_view_00")}
   6 + │ # UNION ALL
   7# SELECT ${ref("sample_view_00")}
───────┴────────────────────────────────────────────────────────────────────────────────────────────────
───────┬────────────────────────────────────────────────────────────────────────────────────────────────
       │ File: definitions/sample_view_04.sqlx
───────┼────────────────────────────────────────────────────────────────────────────────────────────────
   1   │ config {
   2   │     type: 'view'
   3}
   45 + │ SELECT ${ref("sample_view_00")}
   6 + │ -- UNION ALL
   7-- SELECT ${ref("sample_view_00")}
───────┴───────────────────────────────────────────────────────────────────────────────────────────────

これをdataform compileしてみると...

{
  "name": "sample_view_00",
  "query": "

SELECT 1
"
}
{
  "name": "sample_view_01",
  "query": "

SELECT `dataform-sandbox.sample_dataset.sample_view_00`
/*
UNION ALL
SELECT ${ref(\"sample_view_00\")}
*/
"
}
{
  "name": "sample_view_02",
  "query": "

SELECT `dataform-sandbox.sample_dataset.sample_view_00`
# /*
UNION ALL
SELECT ${ref(\"sample_view_00\")}
# */
"
}
{
  "name": "sample_view_03",
  "query": "

SELECT `dataform-sandbox.sample_dataset.sample_view_00`
# UNION ALL
# SELECT `dataform-sandbox.sample_dataset.sample_view_00`
"
}
{
  "name": "sample_view_04",
  "query": "

SELECT `dataform-sandbox.sample_dataset.sample_view_00`
-- UNION ALL
-- SELECT ${ref(\"sample_view_00\")}
"
}

おわかりいただけたであろうか
sample_view_02のUNIONはうまくコンパイルできず、逆にsample_view_03はUNION以降がコンパイルできます。
つまり、BQからDataformへ移植しようと思っていたクエリが別のクエリになってしまうのです。
以上の罠を気にしないと、BQとDataform間でのクエリ変換に躓くことになります。

さらに罠が...

上記はsqlxからSQLへの変換ですが、そのSQLが正しく動く保証はありません。
sample_view_02では、UNION以降が正しくコンパイルされなかったため実際にSQLを実行すると「refが不正です。」というエラーになります。
sample_view_03では、UNION以降で正しくコンパイルはできましたが、SQLを実行するとコメントのため実行はされません。

sqlglotを使いSQLのパーサーの仕組みを調べた

概要

sqlglotというSQLのパーサーツールを用いてSQLのパースの内部処理を体験してみました。
SQLパーサーの内部処理は大きく分けて次の3つがあります。

  • 字句解析
  • 構文解析
  • 意味解析


このうち意味解析はSQL中のテーブルが実際にあるか?やユニーク制約が満たされているか?など実際のテーブルの状態と照合する処理なので、SQL文字列のみを対象とするsqlglotは字句解析・構文解析を担当します。

準備

sqlglotを使います。
公式リポジトリで案内されるやり方だと、sqlglotrsも入り、tokenizerにRust実装のものを使うことになり本実験と若干結果(Tokenの構造)が変わってしまうので注意です。

pip install sqlglot==25.23.1

字句解析(Lexical Analysis)

SQL文字列をトークンに分解する処理です。

SELECT name, age FROM users WHERE age > 30;

例えば上記のSQLがあった場合、次のようにトークンが生成されます。
各トークンには、sqlglot内で定められているトークンタイプ(SELECTやVAR, COMMAなど)と、SQLのどの位置で現れるか(line, col, start, end)の情報が載っています。
TokenTypeはenum型でv25.23.1の時点で362種類もあるようです。

[
  <Token token_type: TokenType.SELECT, text: SELECT, line: 1, col: 6, start: 0, end: 5, comments: []>, 
  <Token token_type: TokenType.VAR, text: name, line: 1, col: 11, start: 7, end: 10, comments: []>, 
  <Token token_type: TokenType.COMMA, text: ,, line: 1, col: 12, start: 11, end: 11, comments: []>, 
  <Token token_type: TokenType.VAR, text: age, line: 1, col: 16, start: 13, end: 15, comments: []>, 
  <Token token_type: TokenType.FROM, text: FROM, line: 1, col: 21, start: 17, end: 20, comments: []>, 
  <Token token_type: TokenType.VAR, text: users, line: 1, col: 27, start: 22, end: 26, comments: []>, 
  <Token token_type: TokenType.WHERE, text: WHERE, line: 1, col: 33, start: 28, end: 32, comments: []>, 
  <Token token_type: TokenType.VAR, text: age, line: 1, col: 37, start: 34, end: 36, comments: []>, 
  <Token token_type: TokenType.GT, text: >, line: 1, col: 39, start: 38, end: 38, comments: []>, 
  <Token token_type: TokenType.NUMBER, text: 30, line: 1, col: 42, start: 40, end: 41, comments: []>, 
  <Token token_type: TokenType.SEMICOLON, text: ;, line: 1, col: 43, start: 42, end: 42, comments: []>
]

sqlglotではtokenize()でSQL文字列からToken型のリストにする作業を行っているようです。
ちなみにsqlglotrsをインストールしていると、Rustで実装されたTokenizerが働き上記と多少異なる結果になるようです。
github.com
具体的な解析アルゴリズムは_scan()で行っているようです。
github.com

まずSQL文字列全体をwhileで回しつつ、そのループの中でスペースをスキップする処理を入れます。
これによって、ループポインタを空白文字以外の先頭に持ってくる役割をしています。

while current < self.size:
    char = self.sql[current]
    if char.isspace() and (char == " " or char == "\t"):
        current += 1
    else:
        break

次にスキャン対象の位置を更新します。

offset = current - self._current if current > self._current else 1
self._start = current
self._advance(offset)

最後に文字の種類に応じた処理を行います。
上から、数値、identifier(カラム名やテーブル名)、SQLキーワード(SELECTやFROMやカンマ)という順でトークナイズされていきます。
各メソッドでは更にTokenTypeというEnum型で分類されていきます。

if not self._char.isspace():
    if self._char.isdigit():
        self._scan_number()
    elif self._char in self._IDENTIFIERS:
        self._scan_identifier(self._IDENTIFIERS[self._char])
    else:
        self._scan_keywords()

下記はClaudeに作ってもらった字句解析の流れを示した図になります。

字句解析では意味までは解釈してません。
そのため、下記のような文字列でも字句解析は通過します。

SELECTTTT SELECT FROM *, *name, age > 4 FROM aaa;

上記SQL(SQLではない)を字句解析に通すと次の結果になります。

[
  <Token token_type: TokenType.VAR, text: SELECTTTT, line: 1, col: 9, start: 0, end: 8, comments: []>, 
  <Token token_type: TokenType.SELECT, text: SELECT, line: 1, col: 16, start: 10, end: 15, comments: []>, 
  <Token token_type: TokenType.FROM, text: FROM, line: 1, col: 21, start: 17, end: 20, comments: []>, 
  <Token token_type: TokenType.STAR, text: *, line: 1, col: 23, start: 22, end: 22, comments: []>, 
  <Token token_type: TokenType.COMMA, text: ,, line: 1, col: 24, start: 23, end: 23, comments: []>, 
  <Token token_type: TokenType.STAR, text: *, line: 1, col: 26, start: 25, end: 25, comments: []>, 
  <Token token_type: TokenType.VAR, text: name, line: 1, col: 30, start: 26, end: 29, comments: []>, 
  <Token token_type: TokenType.COMMA, text: ,, line: 1, col: 31, start: 30, end: 30, comments: []>, 
  <Token token_type: TokenType.VAR, text: age, line: 1, col: 35, start: 32, end: 34, comments: []>, 
  <Token token_type: TokenType.GT, text: >, line: 1, col: 37, start: 36, end: 36, comments: []>, 
  <Token token_type: TokenType.NUMBER, text: 4, line: 1, col: 39, start: 38, end: 38, comments: []>, 
  <Token token_type: TokenType.FROM, text: FROM, line: 1, col: 44, start: 40, end: 43, comments: []>, 
  <Token token_type: TokenType.VAR, text: aaa, line: 1, col: 48, start: 45, end: 47, comments: []>, 
  <Token token_type: TokenType.SEMICOLON, text: ;, line: 1, col: 49, start: 48, end: 48, comments: []>
]

構文解析(Syntax Analysis)

構文解析では、字句解析で生成されたトークンと元のSQLを受け取り、SQL文が文法的に正しいかどうかをチェックします。
構文解析はsqlglot/parser.py_parser()メソッドで行われています。
github.com

構文解析の結果は次のようになります。

[
  Select(
    expressions=[
      Column(
        this=Identifier(this=name, quoted=False)),
      Column(
        this=Identifier(this=age, quoted=False))
    ],
    from=From(
      this=Table(
        this=Identifier(this=users, quoted=False)
      )
    ),
    where=Where(
      this=GT(
        this=Column(
          this=Identifier(this=age, quoted=False)
        ),
        expression=Literal(this=30, is_string=False)
      )
    )
  )
]

このPythonオブジェクトは下図のような構文木を表しています。

字句解析でエラーにならなかった下記文字列も構文解析ではエラーになります。

SELECTTTT SELECT FROM *, *name, age > 4 FROM aaa;
[
  <Token token_type: TokenType.VAR, text: SELECTTTT, line: 1, col: 9, start: 0, end: 8, comments: []>, 
  <Token token_type: TokenType.SELECT, text: SELECT, line: 1, col: 16, start: 10, end: 15, comments: []>, 
  <Token token_type: TokenType.FROM, text: FROM, line: 1, col: 21, start: 17, end: 20, comments: []>, 
  <Token token_type: TokenType.STAR, text: *, line: 1, col: 23, start: 22, end: 22, comments: []>, 
  <Token token_type: TokenType.COMMA, text: ,, line: 1, col: 24, start: 23, end: 23, comments: []>, 
  <Token token_type: TokenType.STAR, text: *, line: 1, col: 26, start: 25, end: 25, comments: []>, 
  <Token token_type: TokenType.VAR, text: name, line: 1, col: 30, start: 26, end: 29, comments: []>, 
  <Token token_type: TokenType.COMMA, text: ,, line: 1, col: 31, start: 30, end: 30, comments: []>, 
  <Token token_type: TokenType.VAR, text: age, line: 1, col: 35, start: 32, end: 34, comments: []>, 
  <Token token_type: TokenType.GT, text: >, line: 1, col: 37, start: 36, end: 36, comments: []>, 
  <Token token_type: TokenType.NUMBER, text: 4, line: 1, col: 39, start: 38, end: 38, comments: []>, 
  <Token token_type: TokenType.FROM, text: FROM, line: 1, col: 44, start: 40, end: 43, comments: []>, 
  <Token token_type: TokenType.VAR, text: aaa, line: 1, col: 48, start: 45, end: 47, comments: []>, 
  <Token token_type: TokenType.SEMICOLON, text: ;, line: 1, col: 49, start: 48, end: 48, comments: []>
]

Pythonの関数で単一プロセス内と複数インスタンス間でのキャッシュ

概要

Pytyhonの関数をキャッシュするデコレータについてです。
キャッシュを考える場合、次の2つの状況を考える必要があります。

  • 単一プロセス内でキャッシュする場合
  • 複数のインスタンス間で一つのPythonファイルを実行する場合

単一プロセスでのキャッシュ

プロセス内キャッシュは、メモリに保存する方法が有効です。
キャッシュの書き方は主に2つあります。

functools

デフォルトで使えるのが、functools.cachefunctools.lru_cacheです。
lruとはLeast Recently Usedの意で、maxsizeのキャッシュ個数を超えたときに、使われていないものから削除していくアルゴリズムになります。
max_sizeはデフォルトで128です。

from functools import lru_cache

@lru_cache(maxsize=64)  # maxsizeはキャッシュの最大サイズ
def expensive_function(x):
    print(f"Computing {x}")
    return x * x

print(expensive_function(4))  # 1回目はキャッシュされていないため時間がかかる
print(expensive_function(4))  # 同じ引数を再度入力すると、キャッシュがあるため結果が即得られる

cachetools

cachetoolsを用いると、より柔軟なキャッシュを実現できます。

from cachetools import cached, LRUCache

# LRUCacheインスタンスを作成
cache = LRUCache(maxsize=128)

@cached(cache)
def expensive_function(x):
    print(f"Computing {x}")
    return x * x

print(expensive_function(4))  # 1回目はキャッシュされていないため時間がかかる
print(expensive_function(4))  # 同じ引数を再度入力すると、キャッシュがあるため結果が即得られる

functoolsだと、キャッシュアルゴリズムとしてLRUしかありませんでしたが、cachetoolsを用いると下記表のキャッシュが使用できます。

cachetoolsで使用できるキャッシュ戦略 説明 使用シチュエーション 使用シチュエーションの例
LRU (Least Recently Used) 最も長い間使用されていないデータを優先して削除するキャッシュ戦略、データのアクセス頻度に基づいてキャッシュが管理される ユーザーが頻繁に同じデータにアクセスするが、古いデータは使われなくなる場合 最近の検索結果
TTL (Time-To-Live) 各キャッシュアイテムに期限(時間)を設定し、期限が切れると自動的に削除される戦略、アイテムの生存期間が決まっている 一定時間ごとに更新が必要なデータ APIから取得する外部サービスのレスポンス、ニュースや為替レートのデータ
LFU (Least Frequently Used) 使用頻度が最も低いデータを削除するキャッシュ戦略、データがどれくらい頻繁に使われているかに基づいてキャッシュが管理される データアクセスの偏りがある場合 一部の項目が頻繁にアクセスされ、他の項目があまり使われないデータベースキャッシュ

その他のfunctoolsのcacheに比べて、cachetoolsが優れいている点としては、引数の制約が緩いことが挙げられます。
functoolsのcacheは、hashableな引数しか受取ることができません。
一方で、cachetoolsはリスト型や辞書型のようなnon-hashableな引数でも受取ることが可能になります。

複数インスタンス間でのキャッシュ

複数インスタンス間で、それぞれが特定のPythonファイルを実行する場合、外部に記憶機構を設ける必要があります。
標準機能で実装するには、ファイルストレージにpickle形式として保存することかと思いますが、ファイルの命名規則などを決める必要があったり、IOに時間がかかったり、トランザクション管理が出来ていないことなど、不安が残ります。

KVSを使うことで、上記の問題がある程度緩和出来ます。
ここではRedisを使うことにします。
functoolsやcachetoolsと同様にデコレータで実装されていると、同じ使用感で利用できるため、デコレータで実装を行います。

import redis
import functools

# Redis接続を作成
r = redis.Redis(host='localhost', port=6379, db=0)

# Redisキャッシュデコレータを定義
def redis_cache(expiration=60):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # 引数からキャッシュキーを作成
            # "expensive_function:(4,):{}" がkeyとなる
            key = f"{func.__name__}:{args}:{kwargs}"
            
            # Redisからキャッシュを取得
            cached_value = r.get(key)
            if cached_value:
                print(f"Using cached value for {args}")
                return int(cached_value)
            
            # キャッシュがなければ関数を実行して結果をキャッシュ
            result = func(*args, **kwargs)
            r.setex(key, expiration, result)  # 有効期限を設けることでTTLキャッシュが可能
            print(f"Caching result for {args}")
            return result
        
        return wrapper
    return decorator

@redis_cache(expiration=300)  # キャッシュの有効期限: 300秒
def expensive_function(x):
    print(f"Computing {x}")
    return x * x

print(expensive_function(4))
print(expensive_function(4)) 

上記ファイルを動かすためにローカルにRedisサーバーを建てる必要があります。

docker run -d --name redis-container -p 6379:6379 redis

DagsterにおけるYahooオークションのスクレイピングパターンを考えた

問題設定

ヤフオクの特定の出品者の商品一覧を毎日決まった時刻に取得したいと考えます。

設計

時間による情報の変化をopに落とし込む

上記の問題を考えた場合、訪れるページの種類は主に3つ挙げられます。

No ページの種類 得たい情報 得たい情報に対してのURLの冪等性(時間によってURL先の内容が変化するか?) URL
A 出品者のトップページ ページ数 ページ数は出品している数に依存するため時間によって異なる Yahoo!オークション - barbarkunisanさんの出品リスト
B 出品者のNページ目 商品のリンク オークションの終了によってページNに掲載される商品は変わる Yahoo!オークション - barbarkunisanさんの出品リスト
C 商品のページ 商品の名前や画像 URL先の内容は変わらない(商品の詳細が変わる場合は考えない) Yahoo!オークション - メキシコ 1ペソ 硬貨1971年3枚 1970年1枚

一般的に、Data Workflowシステムにおいて、操作が冪等か否かを考慮する必要があります。
操作が冪等であれば、例えば通信エラーによってスクレイピングが失敗したとしても、再実行することにより失敗したときに得たかった情報を得ることができます。
ヤフオクの例で言うと、Cのみが時間によらず同じ情報を得られることになります。
つまり、URLを一度取得してしまえば、どのタイミングでリクエスト&スクレイピングを行っても同じ情報を得ることができるのです。

逆にA, Bは時間によって情報が変わってしまうため、リトライによって同じ情報が取得できる保証が無いということになります。

opの構成を考える

上節の通り、AとBはリクエストするタイミングによって得られる情報が変わってしまうため、なるべく同じ時刻にリクエストを行ったほうが良いです。
操作が失敗したとしても、実行時刻による結果の冪等性が無いため、一貫性の観点から「すべて情報(出品者の全商品リンク)を取得できるか?すべて情報を取得できないか?」を重点におき、一つのopとして設計したほうが良いです。

次にCは、リンクさえあればリトライで同じ情報(商品の詳細情報)が取得できるため、実行時刻による冪等性があります。
また、ABの結果(出品者の出品数)次第で量が変わるため、動的にopを生成する必要があります。
さらに、リクエストするopとレスポンスで帰ってきた情報をクラウドストレージなどに保存するopを分けると更に確かなものとなりそうです。

以上を踏まえて次のように設計しました。

DAG
import requests
from bs4 import BeautifulSoup
from dagster import (
    DynamicOut,
    DynamicOutput,
    Field,
    OpExecutionContext,
    Output,
    configured,
    job,
    op,
)
from typing import Any

# 定数
BASE_URL = "https://auctions.yahoo.co.jp/seller"  # 出品者のトップページ
ITEMS_PER_PAGE = 20  # 1ページあたりの掲載アイテム数


# 出品者のトップページから、すべてのアイテムURLを取得するasset
@op(out=DynamicOut(), config_schema={"seller": Field(str, is_required=True)})
def fetch_all_item_urls(context: OpExecutionContext) -> list[str]:
    auction_ids = []
    page_number = 1

    seller_url: str = f'{BASE_URL}/{context.op_config["seller"]}'
    context.log.info(f"seller_url: {seller_url}")

    while True:
        page_url = f"{seller_url}?sid={context.op_config['seller']}&is_postage_mode=1&dest_pref_code=13&b={(page_number-1)*50+1}&n=50"
        response = requests.get(page_url)
        soup = BeautifulSoup(response.content, "html.parser")
        context.log.info(f"soup: {soup}")

        # ページ内の商品リンクを抽出
        items_on_page = soup.find_all(
            "a", class_="Product__titleLink"
        )  # 適切なセレクターを指定
        if not items_on_page:  # 商品がない場合はループを終了
            break

        for item in items_on_page:
            auction_ids.append(item["data-auction-id"])

        # 次のページへ移動
        page_number += 1

    for auction_id in auction_ids:
        context.log.info(f"auction_id: {auction_id}")
        yield DynamicOutput(auction_id, mapping_key=auction_id)


# 各商品ページに対して情報を取得するdynamicアセット
@op
def fetch_item_detail(context: OpExecutionContext, auction_id: str):
    url = f"https://page.auctions.yahoo.co.jp/jp/auction/{auction_id}"
    response = requests.get(url)
    soup = BeautifulSoup(response.content, "html.parser")

    # 商品詳細情報を抽出する
    title = soup.find("h1", class_="ProductTitle__text").get_text(strip=True)
    price = soup.find("dd", class_="Price__value").get_text(strip=True)
    # context.log.info({"url": url, "title": title, "price": price})

    return {"url": url, "title": title, "price": price}


@op
def save_item_detail(context: OpExecutionContext, detail: dict[str, Any]):
    context.log.info(detail)

# 商品URLを使用して、各商品の詳細情報を取得するジョブ
@job
def fetch_all_items():
    item_urls = configured(fetch_all_item_urls, name="item_urls")(
        {"seller": "barbarkunisan"}
    )()
    results = item_urls.map(fetch_item_detail).map(save_item_detail)

こちらのopの実行は下記のようになります。
まず、出品者の全商品URL一覧を取得します。
次に商品分のみopが動的に生成されます。

この動的に生成された、商品URLから詳細情報をスクレイピングするopは冪等になっています。

opのスクレイピング処理が終わると、各opに対して保存を行うopが1対1で生成され実行されます。


全体の実行時間としては下記のようになります。

BigQuery 時間関数メモ

集約

  • Time Columnに関する関数
    • 1日を3時間単位に丸めるTIMESTAMP_BUCKET(・, INTERVAL 3 HOUR)
  • Value Columnに対する関数
    • 平均・最大・最小

3時間ごとに丸める

下記は「各郵便番号のある時刻における最低気温と最高気温」のテーブルです。
timeがTime Column、zip_code(住所)がPartitioning Column、他がValue Columnになります。

time zip_code temperature_min temperature_max
2020-09-08 00:00:00 60606 60 66
2020-09-08 03:00:00 60606 57 59
2020-09-08 06:00:00 60606 55 56
2020-09-08 00:00:00 94105 71 74
2020-09-08 03:00:00 94105 66 69
2020-09-08 06:00:00 94105 64 65

Time ColumnとPartition Columnを集約します。
その際にTime ColumnにTIMESTAMP_BUCKET(・, INTERVAL 3 HOUR)関数をかまします。

SELECT
  TIMESTAMP_BUCKET(time, INTERVAL 3 HOUR) AS time,
  zip_code,
  MIN(temperature) AS temperature_min,
  MAX(temperature) AS temperature_max,
FROM mydataset.environmental_data_hourly
GROUP BY zip_code, time
ORDER BY zip_code, time;
time zip_code temperature_min temperature_max
2020-09-08 00:00:00 60606 60 66
2020-09-08 03:00:00 60606 57 59
2020-09-08 06:00:00 60606 55 56
2020-09-08 09:00:00 60606 55 56
2020-09-08 12:00:00 60606 56 59
2020-09-08 15:00:00 60606 63 68
2020-09-08 18:00:00 60606 67 69
2020-09-08 21:00:00 60606 63 67


上記のように、1日を3時間ごとにまとめて集計します。
TIMESTAMP_BUCKET(time, INTERVAL 3 HOUR) AS time列には1日を3時間ごとに区切った時間でまとめてくれます。

3時間で丸める(カスタム調整)

単純にTIMESTAMP_BUCKET(・, INTERVAL 3 HOUR)と書くと、0時→3時→6時→9時→12時→15時→18時→21時という区切りになります。
オリジンを設定することで2時スタートの区切りを作れます。

SELECT
  TIMESTAMP_BUCKET(time, INTERVAL 3 HOUR, TIMESTAMP '2020-01-01 02:00:00') AS time,
  zip_code,
  CAST(AVG(aqi) AS INT64) AS aqi,
  CAST(AVG(temperature) AS INT64) AS temperature
FROM mydataset.environmental_data_hourly
GROUP BY zip_code, time
ORDER BY zip_code, time;
time zip_code aqi temperature
2020-09-07 23:00:00 60606 23 65
2020-09-08 02:00:00 60606 21 59
2020-09-08 05:00:00 60606 24 56
2020-09-08 08:00:00 60606 33 55
2020-09-08 11:00:00 60606 38 56
2020-09-08 14:00:00 60606 43 62
2020-09-08 17:00:00 60606 37 68
2020-09-08 20:00:00 60606 27 66
2020-09-08 23:00:00 60606 22 63

集約のイメージとしては下記の通りです。

充填

  • 欠損している時間帯に対してレコードを追加し、Value Columnの値を他のレコードの情報を用いて補完
    • locf充填
      • 指定範囲の前のレコードの値を用いる
    • 線型充填
      • 指定範囲にあるレコードの平均値にする

結合

  • Window結合
    • 2つのテーブルのTIME列を同じ時刻に丸めて結合
    • ON TIMESTAMP_BUCKET(t1.ts, INTERVAL 15 SECOND) = TIMESTAMP_BUCKET(t2.ts, INTERVAL 15 SECOND)
  • AS OF 結合
    • RANGE型列とTIME列を結合


RANGE型の時間に対する操作(RANGE型からRANGE型への計算)

  • 範囲データ結合
    • 重複期間のあるRANGE型カラムを結合
  • 範囲データ分割
    • RANGE型のデータを一定期間ごとに分割

OTFトーク 第1回〜第3回 聴講まとめ

Iceberg界隈で有名な方が、SpotifyにてOpen Table Formatの日本語Podcastを提供して下さっていたため勉強しました。
第1回から第3回まで聞いた内容のまとめを記します。

第1回 OTF誕生の背景

open.spotify.com
Icebergが生まれた背景

  • データレイクに対してTransactionの仕組みを導入したい
    • Netflixは2018年時点で60PBのデータをS3に保有
    • そのデータに対してhiveなど分散クエリエンジンで処理
    • Transactionの無さが問題に
      • 複数のデータ処理エンジンが同時に同じファイルに読み書きしてしまう
        • ex) 1テーブルを表す100万個のファイルに対して, Aのエンジンが更新している間にBのエンジンが削除してしまう
  • データ操作の効率化を目指していた
    • 従来のhive形式はシンプルだが難しい処理が出来なかった
    • 新機能を導入
      • タイムトラベル
      • schema evolution
    • エンジンに機能を載せるのではなく、ストレージ側に機能を載せ、ツール横断で利用できるようにした
  • パフォーマンス向上を目指した
    • Netflixでは2018年explainクエリだけで10分かかるようなレベル
    • メタデータなどを導入してより早くクエリを実行できるようにした

Hudiが生まれた背景

  • 上記の誕生理由に加えて、バッチとストリーミングを統合したかった
  • 従来のバッチとストリーミングの統合アーキテクチャ
    • lambdaアーキテクチャ
      • 本質的に同じデータに対して、ストリーミングとバッチで入ったデータをどう整合性つけるか?
    • kappaアーキテクチャ

第2回 OTFのこれから

open.spotify.com
3つの代表的なOTFの概要

Delta Lake

  • データレイクとデータウェアハウスを如何に統合するかという思想
  • 「データレイクハウス」というワードも提唱している

Hudi

  • ストリーミング処理を行いたいという思想
  • configuableな項目が多い(細かくチューニングできる)

Iceberg

  • SQLの延長で操作できるという思想
    • 今までIcebergに触れたことがなくても自然に操作できるという思想で作られている

歴史

  • 3つとも同じ時期に発表された
    • もともと同じ時期に作られていた
    • 各社とも同じ課題を抱えていた

OTFは本質には「フォーマット」
Iceberg:単一テーブルへのTransactionはあるが、複数テーブルをまたぐTransactionはまだない

第3回

open.spotify.com

OTFのこれまでと潮流
相互運用性 (inter operability)

  • 従来, 分散クエリエンジン(Spark, Presto)で共有できるTransactionが求められていた
  • オープンソース化により, それ以外のエンジンに対しても適用が求められていった
  • データプラットフォームにおけるuniversal standardを期待されている

カタログ(Iceberg)

  • 「データがどこに置かれているか?」
    • ファイル置き場自体はどこでもよい
      • S3
      • HDFS
      • ローカルストレージ
  • Transaction制御(ロックマネージャー)
    • どこか一箇所で制御をするものがある
    • エンジンは複数のものを使えるが、ロックマネージャだけは中央に一箇所必要
  • メタデータ
    • 複数のカタログが乱立したが、複数のカタログに対応するのは大変
    • RESTカタログの成立
      • APIのみ規定して裏は自由
        • →様々な機能をカタログに持たせようとする動き

SnowflakeでORDER BY LIMIT句を実行して良い理由

下記記事を読み、SnowflakeでORDER BY句を利用しても問題ない理由をSnowflakeのアーキテクチャを学びながら理解することができたのでまとめました。
zenn.dev

Snowflake マイクロパーティション

一般的にSQLは「ORDER BY LIMIT句は遅い」と言われています。
その理由としては、ORDERでソートをする際に、一旦結合が必要であるからと言われています。
しかし、Snowflakeではマイクロパーティションと呼ばれるデータ格納アーキテクチャを用いるため、ORDER BY LIMIT句をしてもパフォーマンスが落ちないようです。

マイクロパーティションとはなにか?を知り、ORDER BY LIMITが問題ない理由を勉強しました。

マイクロパーティションとは

  • テーブルデータのストレージへの格納形式
    • 行グループに分割→列ごとに圧縮
    • メタデータを計算
      • 自身のデータのmin, maxなどの値を保持
    • データを格納時に自動的にマイクロパーティションが構築される
  • プルーニング
    • クエリプラン生成時に不要なマイクロパーティションにアクセスしないようにする
    • 各マイクロパーティションのメタデータを覗き、WHERE句の条件に該当しないマイクロパーティションにはアクセスしないようにする
  • クラスタリング
    • レコード格納にあたり、日付カラムは順番に蓄積されることが多いため、マイクロパーティションと相性がよい
      • ex)
        • 古いマクロパーティションのメタデータ 2023-01-01 <= date <= 2023-01-02
        • 新しいマクロパーティションのメタデータ 2024-01-01 <= date <= 2024-01-02
    • 購入履歴テーブルで商品IDはバラバラのマイクロパーティションに格納されてしまう
    • 効率的に商品IDを探したい場合(プルーニングで効率的にプランを生成したい場合)、商品IDを基準にマイクロパーティションを再構築可能

SnowflakeでORDER BY LIMIT(Top-k)が速い理由

  • ex) 最も小さい値(Top-k)を探す場合
    • 戦闘から順次マイクロパーティションを読み込む
    • Top-kを保存
    • 次のマイクロパーティションを読み込んだ際に、メタデーったのminが保存しているTop-kより
      • 小さい場合はマイクロパーティションを読み込む
      • 大きい場合はマイクロパーティションを読み込まない
ORDER BYが早い理由

感想

LIMITが付くことについて

一般的にSQLで「ORDER BYは(なるべく)辞めよう」と言われることがあるが、「ORDER BY LIMITは辞めよう」とLIMIT句までセットで言われることはないです。
しかしながら、Snowflakeのマイクロパーティションの概念を学び、LIMIT句までセットに含めるべき理由が分かりました。
LIMIT句を付けずに「ORDER BY」だけではすべてのマイクロパーティションを参照してしまうことになるので、早くはならないことが分かりました。

BigQueryとSnowflakeの違い

普段私が使っているBigQueryと比べると、同じワードでも概念が若干異なることに気づきました。
例えば、「プルーニング」はBigQueryにも機能としてあります。
WHERE句でパーティションカラムの条件を指定することで、対象以外のブロックを検索対象から外すというものでした。
cloud.google.com

一方でSnowflakeの場合は、パーティション定義やWHERE句でのパーティションカラム条件指定はありません。
Snowflakeのマイクロパーティションでは、自らのマイクロパーティション中の各列のメタデータを保持しているため、すべての列がプルーニングの対象となり得ます。


また、「クラスタリング」という言葉は、BigQueryではパーティション内で特定のカラムをソートするようにすることでした。
一方、Snowflakeの場合は、全体の行の中で特定カラムの近い値が同じマイクロパーティションに配置されるようにすることでした。
「クラスタリング」については、Snowflakeでの概念はRDBのIndex(B-Treeの構築)に近い概念なのかなと思います。

全体的な感想

Snowflakeは、「クラウドベンダーに縛られない」ことが優位性の一つだと理解していましたが、アーキテクチャを学ぶことで「どのようなデータを保持するのに向いているか?」という視点も得られました。
マイクロパーティションは、自身が保持する列の値が近しいほどスキャンの際に効率的なプルーニングを提供します。
そのため、時刻列があり、INSERTされていく順に時刻列の値が単調増加していくようなデータを蓄積したいシナリオで優位性がありそうです。
BQのように、パーティションテーブル単位でTRUNCATEしたりLOADしたりという使い方は、Snowflakeでは向いていなそうです。

Dagster1.8 リリースノート

2024.08.09にDagster1.8がリリースされ, リリースノートから気になった箇所をまとめました.
dagster.io

Un-experimentalizing Pipes

Pipes APIはLambda, kubernetes, databricks上で動いているコードにdagster moduleをimportしログを吐くように実装すると, client側のdagsterコンソールでログをキャッチすることができるという機能です.
以前まではexperimentでしたがreleaseになるようです.

SDF Integration

SDFとはデータウェアハウスとオーケストレーションツールなどを一つの実行環境で操作できるようにするCLIツールのようです.
www.sdf.com
従来はデータオーケストレーションツールとしてはairflowのみの対応だったようですが、Dagsterでも扱えるようになったそうです.
blog.sdf.com

Data Catalog Improvements

メタデータの表示が下記の通りリッチになるようです.

  • アセットに関連するDBやテーブルをメタデータから追いやすくなる
  • アセットのソースコードを参照できるようになる

Data Quality and Reliability Improvements

メタデータの範囲チェックができるようになるそうです.
アセットのメタデータとは下記のようにアセットの値を返す際に, 行数などの付加情報をつけるものでした.

import json
import requests
import pandas as pd
from dagster import AssetExecutionContext, MetadataValue, asset, MaterializeResult


@asset(name="my_asset_1", deps=[fuga])
def hoge(context: AssetExecutionContext) -> MaterializeResult:
    ...
    return MaterializeResult(
        metadata={
            "num_records": 5000,
        }
    )

この行数などの数値範囲がチェックできるようになるようです.

from dagster import AssetKey, build_metadata_bounds_checks, AssetCheckSeverity

# アセットの定義
assets = [
    AssetKey("my_asset_1"),
    AssetKey("my_asset_2"),
]

# メタデータの範囲チェックを作成
checks = build_metadata_bounds_checks(
    assets=assets,
    severity=AssetCheckSeverity.WARN,
    metadata_key="row_count",
    min_value=1000,
    max_value=10000,
    exclusive_min=False,  # min_valueと等しい場合にチェックが失敗, デフォルトはFalse
    exclusive_max=False,
)

アセットが関数内でデータフレームを作成し, その行数をメタデータにするような下記リンクの例の場合は, メタデータのチェックではなくアセットそのもののチェックになるのでasset_checkを使うべきなのかと思います.

Stable Support: External Assets

外部プロセスをリネージに組み込みたい場合, 従来はSource Assetを使っていましたが, 今後はSource Assetが廃止されExternal Assetが使えるようになるようです.
Source Assetと違う点としては, 通常のAssetの下流にも配置できるみたいです.

Dagster now offers stable APIs for specifying assets that can’t be materialized from Dagster, but which are still part of the lineage graph. For example, this enables Dagster to know about a dashboard in a business intelligence tool that’s downstream of Dagster-orchestrated assets. You can specify a non-materializable asset by constructing an AssetSpec and passing it to a Definitions object.

ただ, External Assetの説明ページには「External AssetはExternal Assetにしか依存しない」と記述されているので実際はどう違うのかよく分かっていません.

External assets can depend only on other external assets.

docs.dagster.io

Merging Definitions

Definition()を結合し, より大きなジョブの集合を作ることができるようになるようです.

Declarative Automation

FreshnessPolicyが廃止されAutomationConditionになるようです.
assetがいつマテリアライズされるべきなのかを指定するインスタンスのようです.
以前よりカスタマイズが可能になり|のようにbool式の演算子が使えるようです.
下記の例では「一日単位」のcronと「コードが変更された」場合のセンサーによりマテリアライズされるようです.

@asset(automation_condition=AutomationCondition.eager())

def a(): ...

@asset(
    automation_condition=(
        AutomationCondition.on_cron("@daily") |
AutomationCondition.code_version_changed()
    )

)
def b(): ...

Jobless Automations

下記にあるようにDefinitionsにassetを登録する必要なくload_assets_from_modulesから参照できるようになるようです.
github.com

from dagster import (
    Definitions,
    ScheduleDefinition,
    load_assets_from_modules,
)

from . import assets

defs = Definitions(
    schedules=[
        ScheduleDefinition(
            name="hackernews_schedule",
            target=load_assets_from_modules([assets]),
            cron_schedule="0 * * * *",  # every hour
        )
    ]
)

Timeline Page Grouping

タイムラインのページでセンサーによるジョブなのかスケジュールによるジョブなのかを選択表示できるようになるようです.
ユーザー設定で「experimental navigation feature flag」を選択するとこの機能が使えるようになるそうです.

NetflixのWorkflow Engine Maestro を調査した

下記記事を読み、Maestroが他のData Orchestration(主にDagster)と異なる(であろう)点をまとめました.
atmarkit.itmedia.co.jp
netflixtechblog.com

  • 巡回ワークフローもサポートしている
  • ワークフロー定義はJSONで記述
  • サブワークフロー
    • ワークフローのステップで別のワークフローを実行できる機能
    • 例)条件分岐とサブワークフローの機能を利用したジョブの自動再実行ワークフロー

  • IDEのようなコードレベルのブレークポイントの設置
  • 集約view
    • 個々のワークフローインスタンスを集約してステップの状況を可視化できる
    • 例) Run1でstep3がfailし, Run2でstep3,4,5が実行された場合, 集約viewではstep1から通しで見ることが可能

  • ロールアップ
    • ワークフローインスタンスのステータスやステップ数を提供
    • リーフステップのみがカウントされる
    • サブワークフローは具体的なワークフローへのポインタとして機能

Data Catalog Tag Templateの値の初期入力と更新をPythonで行う

概要

下記ドキュメントを参考にして、タグの値に常に最新状態を載せるためのPythonプログラムを書きました。
フィールド値はドキュメントにある例を使ったため、複数のタグを作ったり、複数のテーブルに適用する場合は定義ファイルを先に作っておくと良いかと思います。
cloud.google.com

タグテンプレートの作成

まず.envを用意します。

PROJECT=sample-project
DATASET=sample_dataset
TABLE=sample_table
import os
from google.cloud import datacatalog_v1
from dotenv import load_dotenv
from google.api_core.exceptions import GoogleAPIError

load_dotenv()

project_id = os.getenv("PROJECT")
location = "us-central1"
tag_template_id = "example_tag_template_00"

datacatalog_client = datacatalog_v1.DataCatalogClient()

# タグテンプレートの作成
tag_template = datacatalog_v1.types.TagTemplate()
tag_template.display_name = "Demo Tag Template 00"

# source フィールドにSTRING型の"Source of data asset"というdisplay nameを設定
tag_template.fields["source"] = datacatalog_v1.types.TagTemplateField()
tag_template.fields["source"].display_name = "Source of data asset"
tag_template.fields["source"].type_.primitive_type = datacatalog_v1.types.FieldType.PrimitiveType.STRING

# num_rows フィールドにDOUBLE型の"Number of rows in data asset"というdisplay nameを設定
tag_template.fields["num_rows"] = datacatalog_v1.types.TagTemplateField()
tag_template.fields["num_rows"].display_name = "Number of rows in data asset"
tag_template.fields["num_rows"].type_.primitive_type = datacatalog_v1.types.FieldType.PrimitiveType.DOUBLE

# has_pii フィールドにBOOL型の"Has PII"というdisplay nameを設定
tag_template.fields["has_pii"] = datacatalog_v1.types.TagTemplateField()
tag_template.fields["has_pii"].display_name = "Has PII"
tag_template.fields["has_pii"].type_.primitive_type = datacatalog_v1.types.FieldType.PrimitiveType.BOOL

# pii_type フィールドにENUM型で("EMAIL", "SOCIAL SECURITY NUMBER", "NONE")という選択肢を設定
tag_template.fields["pii_type"] = datacatalog_v1.types.TagTemplateField()
tag_template.fields["pii_type"].display_name = "PII type"
tag_template.fields["pii_type"].type_.enum_type.allowed_values = [
    datacatalog_v1.types.FieldType.EnumType.EnumValue(display_name=dn)
    for dn in ["EMAIL", "SOCIAL SECURITY NUMBER", "NONE"]
]

# 指定したプロジェクトのlocationにタグテンプレートを作成
expected_template_name = datacatalog_v1.DataCatalogClient.tag_template_path(
    project_id, location, tag_template_id
)

try:
    tag_template = datacatalog_client.create_tag_template(
        parent=f"projects/{project_id}/locations/{location}",
        tag_template_id=tag_template_id,
        tag_template=tag_template,
    )
    print(f"Created template: {tag_template.name}")
except GoogleAPIError as e:
    print(f"Cannot create template: {expected_template_name}")
    print(f"Error: {e}")
タグテンプレートが作成される
テンプレートの詳細

テーブルへタグテンプレートをアタッチ

import os
from google.cloud import datacatalog_v1
from dotenv import load_dotenv

load_dotenv()

project_id = os.getenv("PROJECT")
dataset_id = os.getenv("DATASET")
table_id = os.getenv("TABLE")
location = "us-central1"
tag_template_id = "example_tag_template_00"

datacatalog_client = datacatalog_v1.DataCatalogClient()

# タグテンプレートの名前
tag_template_name = datacatalog_v1.DataCatalogClient.tag_template_path(
    project_id, location, tag_template_id
)

# Data Catalogでテーブルのエントリを取得
resource_name = (
    f"//bigquery.googleapis.com/projects/{project_id}"
    f"/datasets/{dataset_id}/tables/{table_id}"
)
table_entry = datacatalog_client.lookup_entry(
    request={"linked_resource": resource_name}
)

# テーブルにタグを付与
tag = datacatalog_v1.types.Tag()
tag.template = tag_template_name

# タグフィールドに値を設定
tag.fields["source"] = datacatalog_v1.types.TagField()
tag.fields["source"].string_value = "Copied from tlc_yellow_trips_2018"

tag.fields["num_rows"] = datacatalog_v1.types.TagField()
tag.fields["num_rows"].double_value = 113496874

tag.fields["has_pii"] = datacatalog_v1.types.TagField()
tag.fields["has_pii"].bool_value = False

tag.fields["pii_type"] = datacatalog_v1.types.TagField()
tag.fields["pii_type"].enum_value.display_name = "NONE"

# テーブルにタグを作成
tag = datacatalog_client.create_tag(parent=table_entry.name, tag=tag)
print(f"Created tag: {tag.name}")
テンプレートタグの値を初期入力して指定したテーブルにタグ付け

テーブルにアタッチしたタグの値の変更

import os
from dotenv import load_dotenv
from google.cloud import datacatalog_v1
load_dotenv()

project_id = os.getenv("PROJECT")
dataset_id = os.getenv("DATASET")
table_id = os.getenv("TABLE")

# Data Catalog クライアントの初期化
datacatalog_client = datacatalog_v1.DataCatalogClient()

# タグが付与されているリソースの名前を設定(BigQuery テーブルなど)
resource_name = (
    f"//bigquery.googleapis.com/projects/{project_id}"
    f"/datasets/{dataset_id}/tables/{table_id}"
)

# リソースに関連付けられているタグのリストを取得
tags = datacatalog_client.list_tags(parent=table_entry.name)

# 例えば最初のタグを取得
for tag in tags:
    # tag: <class 'google.cloud.datacatalog_v1.types.tags.Tag'>
    # 特定のタグ名に基づいてフィルタリングすることも可能
    if tag.template_display_name == "Demo Tag Template 00":  # タグテンプレートID
        # タグフィールドの値を変更する
        tag.fields["source"].string_value = "Updated Source Value"
        tag.fields["num_rows"].double_value = 2000000
        tag.fields["has_pii"].bool_value = True
        tag.fields["pii_type"].enum_value.display_name = "EMAIL"
        
        # 変更を反映する
        updated_tag = datacatalog_client.update_tag(tag=tag)
        print(f"Updated tag: {updated_tag.name}")
アタッチしたタグテンプレートの値を更新

Dagsterのjob内のasset/opのリセットの仕組み

概要

Dagsterのリセットポリシーについてまとめてみました。
Dagsterのリトライの単位は大きく2つあります。

  • 同run内でop/asset単位のリトライ
  • 異なるrunで失敗したop/assetのみをリトライ

同run内でop/asset単位のリトライ

同じrun内でop/asset単位でリトライを行うには、@op, @assetretry_policy=RetryPolicy()を設定します。

ResetPolicy

docs.dagster.io
RetryPolicyには4つの引数があります。

引数 デフォルト 意味
max_retries int 1 リトライの最大回数
delay Optional[Union[int,float]] None 再試行が要求されてから次の試行が開始されるまでの待機時間 (秒単位)
backoff Optional[Backoff] None 再試行回数に応じた遅延の修飾子
jitter Optional[Jitter] None バックオフ計算後に適用される、遅延のランダム化修飾子

BackoffにはLINEAREXPONENTIALがあります。等間隔でリトライを行うか、リトライ回数が増えるほど指数的にリトライ間隔が長くなるようにリトライを行うかという違いがあります。

Jitterは、更に間隔にランダム性をもたせるもので、FULLであればリトライ間隔に乱数をかけます。PLUS_MINUSであれば、乱数を使いjitterを適用する前までのリトライ間隔の前後になるようにリトライ間隔を調整します。

コード例

次のコードを用意しました。
first_opの1回目(dagsterのretryの数え方的には0回目)で必ず落ちるようにif context.retry_number == 0: raise Exception()を行っています。

from dagster import Definitions, OpExecutionContext, RetryPolicy, job, op

@op(retry_policy=RetryPolicy(max_retries=2))
def first_op(context: OpExecutionContext):
    if context.retry_number == 0:
        raise Exception()
    return "First op completed!"

@op
def second_op(first_op_result):
    return "Second op completed!"

@job
def my_job():
    first_result = first_op()  # first_opを実行し、その結果を変数に格納
    second_op(first_result)  # second_opがfirst_opの結果に依存して実行

defs = Definitions(jobs=[my_job])
jobのDAG
jobの実行結果

この結果から分かる通り、first_opが1回目(0回目)で失敗し、job内で再実行され、後続のDAGを実行していることが分かります。

異なるrunで失敗したop/assetのみをリトライ

dagster dev想定ではないですが、Dagster+やクラウドでホストすると、job単位で再実行が可能になります。
job内のasset/opが失敗すると、そのasset/op及び後続のasset/opはそのrunでは実行されません。
そのrunが終わると、異なるrunが立ち上がり、失敗したasset/op及び、その後続のasset/opがリトライされます。
この設定方法は次の2つどちらかの方法で実現できます。
docs.dagster.io

dagster.yaml

dagster.yamlに下記の設定を記載して実行すると実現できます。

run_retries:
  enabled: true # Omit this key if using Dagster+, since run retries are enabled by default
  max_retries: 3

このYAMLを配置し、ローカルで開発するコマンドdagster devを行うと、実行を記録するDBが無いとのエラーが置きます。
そのため、ローカルでの開発とdagster.yamlによる設定反映の相性は良くないです。

jobのタグに設定

@jobにタグを設定する方法で実現できます。
dagster.yamlに設定を書く方法は、こちらの方法と同じようにタグに展開されるので、結果的には同じことをしています。

from dagster import job


@job(tags={"dagster/max_retries": 3})
def sample_job():
    pass


@job(tags={"dagster/max_retries": 3, "dagster/retry_strategy": "ALL_STEPS"})
def other_sample_sample_job():
    pass