めもちょー

メモ帳代わりに使っています。

Dagster Asset Checksについて調べた

概要

Dagsterのver upに伴い、今までのverでは無かったAsset Checksの機能について調べました。
より正確に言うと、Asset Checksは既にありましたが、Asset Checksが落ちた場合にDAGを中止するオプションが無かったため使っていませんでした。

基本 asset_checks関数

asset_checksとは、assetがmaterializeされた時点で発火するassetのテストのことで、下記のように書くことが出来ます。

from dagster import AssetCheckResult, Definitions, asset, asset_check


@asset
def hoge():
    return 1


@asset_check(asset=hoge)
def hoge_check():
    return AssetCheckResult(
        passed=True,
    )


defs = Definitions(
    assets=[hoge],
    asset_checks=[hoge_check],
)

asset hogeをmaterializeすると、下記画面のようにasset_checkもhogeがmaterializeされた時点で発火するようになっています。
asset_checkのデフォルトの名前は[asset関数名]_[asset_check関数名]になるようです。

assetをmaterializeするとasset_checkが発火する

asset関数の返り値を使いたい場合はasset_check関数の引数に取ります。

@asset
def hoge():
    return 1


@asset_check(asset=hoge)
def hoge_check(hoge):
    return AssetCheckResult(
        passed=bool(hoge == 2),
    )


defs = Definitions(
    assets=[hoge],
    asset_checks=[hoge_check],
)

この例では、AssetCheckResultをFalseになるように返していますが、AssetCheckの実行結果としては正常に終了したため実行結果の画面は緑で表現されます。(ややこしいです)
変わりに、ログに赤色でAssetCheckが失敗したことが表現されています。

asset, asset_checkの実行結果としては正常終了したため緑で表される
ログには赤でasset_checkが失敗したことが表される

1つのasset_check関数内に複数のチェック項目を設ける

1つの関数内で複数のチェック項目を設けたい場合、@multi_asset_checkを使います。
multi_asset_checkにチェック項目をまとめることで、無駄な計算ノードの起動・終了に伴う無駄を省くことができます。
また、同階層のアセットを一つのmulti_asset_checkの依存先にすることで、階層ごとにアセットが満たすべき制約を守れているか保証することができます。

@asset
def my_asset_one():
    return 1

@asset
def my_asset_two():
    return 2


@multi_asset_check(
    specs=[
        AssetCheckSpec(name="asset_check_one", asset="my_asset_one"),
        AssetCheckSpec(name="asset_check_two", asset="my_asset_two"),
    ]
)
def the_check(context: AssetCheckExecutionContext) -> Iterable[AssetCheckResult]:
    yield AssetCheckResult(
        passed=False,
        severity=AssetCheckSeverity.WARN,
        description="The asset is over 0.5",
        asset_key="my_asset_one",
    )

    yield AssetCheckResult(
        passed=True,
        description="The asset is fresh.",
        asset_key="my_asset_two",
    )


defs = Definitions(
    assets=[my_asset_one, my_asset_two],
    asset_checks=[the_check],
)

リネージとログは次のようになります。

multi_asset_checkのDAG
multi_asset_checkのログ

asset関数内にcheckまで書く

asset_check関数を書かずに、asset関数内にチェック項目を書くことも出来ます。
こちらはDagsterドキュメントの例をそのまま使います。

import pandas as pd

from dagster import (
    AssetCheckResult,
    AssetCheckSpec,
    AssetExecutionContext,
    Definitions,
    Output,
    asset,
)


@asset(check_specs=[AssetCheckSpec(name="orders_id_has_no_nulls", asset="orders")])
def orders(context: AssetExecutionContext):
    orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})

    # save the output and indicate that it's been saved
    orders_df.to_csv("orders")
    yield Output(value=None)

    # check it
    num_null_order_ids = orders_df["order_id"].isna().sum()
    yield AssetCheckResult(
        passed=bool(num_null_order_ids == 0),
    )


defs = Definitions(assets=[orders])
asset関数の実行結果(asset_check関数のノードは表示されない)
asset関数の実行ログにAssetCheckResultを返した跡が確認できる

asset_check factory pattern

一つのasset_check関数のテンプレートに対して、複数の設定項目を使いまわしたいときに有効なfactory patternです。
公式ドキュメントの実装をそのまま記しました。
この例では、接続先DBのtable(SQL)を変えてテストを行っています。

@assetdef orders(): ...


@asset
def items(): ...


def make_check(check_blob: Mapping[str, str]) -> AssetChecksDefinition:
    @asset_check(
        name=check_blob["name"],
        asset=check_blob["asset"],
        required_resource_keys={"db_connection"},
    )
    def _check(context):
        rows = context.resources.db_connection.execute(check_blob["sql"])
        return AssetCheckResult(passed=len(rows) == 0, metadata={"num_rows": len(rows)})

    return _check


check_blobs = [
    {
        "name": "orders_id_has_no_nulls",
        "asset": "orders",
        "sql": "select * from orders where order_id is null",
    },
    {
        "name": "items_id_has_no_nulls",
        "asset": "items",
        "sql": "select * from items where item_id is null",
    },
]

defs = Definitions(
    assets=[orders, items],
    asset_checks=[make_check(check_blob) for check_blob in check_blobs],
    resources={"db_connection": MagicMock()},
)

AssetCheckResultをカスタマイズする

エラーレベル

ERRORやWARNINGなどエラーレベルを設定することができます。
asset_check関数が返すAssetCheckResultのseverityにエラーレベルを設定することができます。

is_serious = ...
return AssetCheckResult(
    passed=False,
    severity=AssetCheckSeverity.ERROR if is_serious else AssetCheckSeverity.WARN,
)

メタデータ

メタデータを付加することも可能です。
AssetCheckResultのmetadataに辞書を設定します。

return AssetCheckResult(
    passed=bool(num_null_order_ids == 0),
    metadata={
        "num_null_order_ids": int(num_null_order_ids),
    },
)

下流アセットのブロック

上流asset Aに対するasset_check関数であるasset check Aがpassed=Falseになると、asset Aに依存する下流assetはmaterializeされることがなくなります。
コードとしては、@asset_checkデコレータの引数にblocking=Trueに設定することで実現します。

@asset
def upstream_asset():
    pass


@asset_check(asset=upstream_asset, blocking=True)
def check_upstream_asset():
    return AssetCheckResult(passed=False)


@asset(deps=[upstream_asset])
def downstream_asset():
    pass


defs = Definitions(
    assets=[upstream_asset, downstream_asset], asset_checks=[check_upstream_asset]
)
上記コードのリネージ、asset_checkはassetの追加情報として表現される
上流assetのasset_checkが落ちたため、下流assetがmaterializeされない

asset_checkを含めた・含めない実行

from dagster import (
    AssetSelection,
    Definitions,
    ScheduleDefinition,
    asset,
    asset_check,
    define_asset_job,
)


@asset
def my_asset(): ...


@asset_check(asset=my_asset)
def check_1(): ...


@asset_check(asset=my_asset)
def check_2(): ...


# my_assetとそのassetに対するasset_checkをジョブに含める
my_job = define_asset_job("my_job", selection=AssetSelection.assets(my_asset))


# asset_checkを含めずに, my_assetを含める
my_asset_only_job = define_asset_job(
    "my_asset_only_job",
    selection=AssetSelection.assets(my_asset).without_checks(),
)

# my_assetを含めずにasset_checkのみを含める
checks_only_job = define_asset_job(
    "checks_only_job", selection=AssetSelection.checks_for_assets(my_asset)
)

# my_assetを含めずに, asset_check check1 のみを含める
check_1_job = define_asset_job("check_1_job", selection=AssetSelection.checks(check_1))

# schedule my_job to run every day at midnight
basic_schedule = ScheduleDefinition(job=my_job, cron_schedule="0 0 * * *")

defs = Definitions(
    assets=[my_asset],
    asset_checks=[check_1, check_2],
    jobs=[my_job, my_asset_only_job, checks_only_job, check_1_job],
    schedules=[basic_schedule],
)
job一覧
check_1_jobのリネージ
my_asset_only_jobのリネージ
checks_only_jobのリネージ
my_jobのリネージ

assetを含むjobのみがmaterializeボタンがあることが確認できます。
また、jobにassetが無いときに限り、asset_check関数がノードとして現れることが分かります。assetがある場合にはassetのノードの下にcheckという項目がつくことが分かります。

asset checkのサブセット化

multi_asset_checkではcan_subset=Trueを設定するとアセットの一部だけのチェックを実行できます。

@multi_asset_check(
    specs=[
        AssetCheckSpec(name="asset_check_one", asset="asset_one"),
        AssetCheckSpec(name="asset_check_two", asset="asset_two"),
    ],
    can_subset=True,
)
def the_check(context: AssetCheckExecutionContext) -> Iterable[AssetCheckResult]:
    if (
        AssetCheckKey(AssetKey("asset_one"), "asset_check_one")
        in context.selected_asset_check_keys
    ):
        yield AssetCheckResult(
            passed=True, metadata={"foo": "bar"}, check_name="asset_check_one"
        )
    if (
        AssetCheckKey(AssetKey("asset_two"), "asset_check_two")
        in context.selected_asset_check_keys
    ):
        yield AssetCheckResult(
            passed=True, metadata={"foo": "bar"}, check_name="asset_check_two"
        )

上記のコードを解説します。
まず、AssetCheckKeyAssetCheckKey(アセットキー, アセットチェックキー)でassetに対するasset_checkを指定できます。
asset_checkは複数のassetに掛けることが出来ますので、assetまで指定してAssetCheckKeyオブジェクトとなります。
AssetCheckExecutionContextcontext.selected_asset_check_keysはどのアセットチェックが選択されているかの情報が含まれているため下記のコードは、「asset_oneに対するasset_check_oneが現在のアセットチェックの実行に含まれていたら、asset_check_oneという名のAssetCheckResultを返す」という実装になっています。

    if (
        AssetCheckKey(AssetKey("asset_one"), "asset_check_one")
        in context.selected_asset_check_keys
    ):
        yield AssetCheckResult(
            passed=True, metadata={"foo": "bar"}, check_name="asset_check_one"
        )

このように実装することで、asset_oneまたは、asset_twoのどちらか一方のアセットがmaterializeされたときに一方のチェック結果を返すことができます。

multi_assetの中で各assetに対してasset checkを行う

@multi_assetは複数のassetを出力する関数でした。
この出力のそれぞれに対してasset checkを行いたい場合の実装です。

@multi_asset(
    specs=[
        AssetSpec("multi_asset_piece_1", group_name="asset_checks", skippable=True),
        AssetSpec("multi_asset_piece_2", group_name="asset_checks", skippable=True),
    ],
    check_specs=[AssetCheckSpec("my_check", asset="multi_asset_piece_1")],
    can_subset=True,
)
def multi_asset_1_and_2(context: AssetExecutionContext):
    if AssetKey("multi_asset_piece_1") in context.selected_asset_keys:
        yield MaterializeResult(asset_key="multi_asset_piece_1")
    # The check will only execute when multi_asset_piece_1 is materialized
    if (
        AssetCheckKey(AssetKey("multi_asset_piece_1"), "my_check")
        in context.selected_asset_check_keys
    ):
        yield AssetCheckResult(passed=True, metadata={"foo": "bar"})
    if AssetKey("multi_asset_piece_2") in context.selected_asset_keys:
        # No check on multi_asset_piece_2
        yield MaterializeResult(asset_key="multi_asset_piece_2")

1番目と3番目のifは、assetがmaterializeされている場合に発火しMaterializedResultを返します。
2番目のifはmulti_asset_piece_1がmaterializeされて、それのasset_checkであるmy_checkが実行されている場合、AssetCheckResultを返します。

multi_asset_piece_1のみをmaterializeするとasset_checkが実行される
multi_asset_piece_2のみをmaterializeするとasset_checkが実行されない
multi_asset_piece_1とmulti_asset_piece_2をmaterializeするとasset_checkが実行される

結果はすべてこのようなグラフになる。

AssetやSource Assetの鮮度チェックを行う

Source AssetやAssetにいつ変更があったのかをチェックすることができます。
下記のように書くことで、source_tablesというsource_tablesというAssetに対して鮮度チェックを行うことができます。

from datetime import timedelta
from dagster import build_last_update_freshness_checks, AssetCheckDefinition

source_table_freshness_checks: AssetCheckDefinition = build_last_update_freshness_checks(
    assets=[source_tables],
    lower_bound_delta=timedelta(hours=2),
)

source_tables assetは、ObserveResultをyieldし、そのmetadataとして最後に更新された時刻の情報を載せます。

@multi_observable_source_asset(specs=asset_specs)
def source_tables(snowflake: SnowflakeResource):
    with snowflake.get_connection() as conn:
        freshness_results = fetch_last_updated_timestamps(
            snowflake_connection=conn.cursor(),
            tables=table_names,
            schema=TABLE_SCHEMA,
        )
        for table_name, last_updated in freshness_results.items():
            yield ObserveResult(
                asset_key=table_name,
                metadata={
                    "dagster/last_updated_timestamp": MetadataValue.timestamp(
                        last_updated
                    )
                },
            )

なお、Snowflakeにはdagster_snowflake.fetch_last_updated_timestampsというテーブルが最後に更新された時刻を簡単に取得してくれる関数があるようです。