- 概要
- 基本 asset_checks関数
- 1つのasset_check関数内に複数のチェック項目を設ける
- asset関数内にcheckまで書く
- asset_check factory pattern
- AssetCheckResultをカスタマイズする
- 下流アセットのブロック
- asset_checkを含めた・含めない実行
- asset checkのサブセット化
- multi_assetの中で各assetに対してasset checkを行う
- AssetやSource Assetの鮮度チェックを行う
概要
Dagsterのver upに伴い、今までのverでは無かったAsset Checksの機能について調べました。
より正確に言うと、Asset Checksは既にありましたが、Asset Checksが落ちた場合にDAGを中止するオプションが無かったため使っていませんでした。
基本 asset_checks関数
asset_checksとは、assetがmaterializeされた時点で発火するassetのテストのことで、下記のように書くことが出来ます。
from dagster import AssetCheckResult, Definitions, asset, asset_check @asset def hoge(): return 1 @asset_check(asset=hoge) def hoge_check(): return AssetCheckResult( passed=True, ) defs = Definitions( assets=[hoge], asset_checks=[hoge_check], )
asset hogeをmaterializeすると、下記画面のようにasset_checkもhogeがmaterializeされた時点で発火するようになっています。
asset_checkのデフォルトの名前は[asset関数名]_[asset_check関数名]
になるようです。
asset関数の返り値を使いたい場合はasset_check関数の引数に取ります。
@asset def hoge(): return 1 @asset_check(asset=hoge) def hoge_check(hoge): return AssetCheckResult( passed=bool(hoge == 2), ) defs = Definitions( assets=[hoge], asset_checks=[hoge_check], )
この例では、AssetCheckResultをFalseになるように返していますが、AssetCheckの実行結果としては正常に終了したため実行結果の画面は緑で表現されます。(ややこしいです)
変わりに、ログに赤色でAssetCheckが失敗したことが表現されています。
1つのasset_check関数内に複数のチェック項目を設ける
1つの関数内で複数のチェック項目を設けたい場合、@multi_asset_check
を使います。
multi_asset_checkにチェック項目をまとめることで、無駄な計算ノードの起動・終了に伴う無駄を省くことができます。
また、同階層のアセットを一つのmulti_asset_checkの依存先にすることで、階層ごとにアセットが満たすべき制約を守れているか保証することができます。
@asset def my_asset_one(): return 1 @asset def my_asset_two(): return 2 @multi_asset_check( specs=[ AssetCheckSpec(name="asset_check_one", asset="my_asset_one"), AssetCheckSpec(name="asset_check_two", asset="my_asset_two"), ] ) def the_check(context: AssetCheckExecutionContext) -> Iterable[AssetCheckResult]: yield AssetCheckResult( passed=False, severity=AssetCheckSeverity.WARN, description="The asset is over 0.5", asset_key="my_asset_one", ) yield AssetCheckResult( passed=True, description="The asset is fresh.", asset_key="my_asset_two", ) defs = Definitions( assets=[my_asset_one, my_asset_two], asset_checks=[the_check], )
リネージとログは次のようになります。
asset関数内にcheckまで書く
asset_check関数を書かずに、asset関数内にチェック項目を書くことも出来ます。
こちらはDagsterドキュメントの例をそのまま使います。
import pandas as pd from dagster import ( AssetCheckResult, AssetCheckSpec, AssetExecutionContext, Definitions, Output, asset, ) @asset(check_specs=[AssetCheckSpec(name="orders_id_has_no_nulls", asset="orders")]) def orders(context: AssetExecutionContext): orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]}) # save the output and indicate that it's been saved orders_df.to_csv("orders") yield Output(value=None) # check it num_null_order_ids = orders_df["order_id"].isna().sum() yield AssetCheckResult( passed=bool(num_null_order_ids == 0), ) defs = Definitions(assets=[orders])
asset_check factory pattern
一つのasset_check関数のテンプレートに対して、複数の設定項目を使いまわしたいときに有効なfactory patternです。
公式ドキュメントの実装をそのまま記しました。
この例では、接続先DBのtable(SQL)を変えてテストを行っています。
@asset • def orders(): ... @asset def items(): ... def make_check(check_blob: Mapping[str, str]) -> AssetChecksDefinition: @asset_check( name=check_blob["name"], asset=check_blob["asset"], required_resource_keys={"db_connection"}, ) def _check(context): rows = context.resources.db_connection.execute(check_blob["sql"]) return AssetCheckResult(passed=len(rows) == 0, metadata={"num_rows": len(rows)}) return _check check_blobs = [ { "name": "orders_id_has_no_nulls", "asset": "orders", "sql": "select * from orders where order_id is null", }, { "name": "items_id_has_no_nulls", "asset": "items", "sql": "select * from items where item_id is null", }, ] defs = Definitions( assets=[orders, items], asset_checks=[make_check(check_blob) for check_blob in check_blobs], resources={"db_connection": MagicMock()}, )
AssetCheckResultをカスタマイズする
エラーレベル
ERRORやWARNINGなどエラーレベルを設定することができます。
asset_check関数が返すAssetCheckResultのseverityにエラーレベルを設定することができます。
is_serious = ... return AssetCheckResult( passed=False, severity=AssetCheckSeverity.ERROR if is_serious else AssetCheckSeverity.WARN, )
メタデータ
メタデータを付加することも可能です。
AssetCheckResultのmetadataに辞書を設定します。
return AssetCheckResult( passed=bool(num_null_order_ids == 0), metadata={ "num_null_order_ids": int(num_null_order_ids), }, )
下流アセットのブロック
上流asset Aに対するasset_check関数であるasset check Aがpassed=False
になると、asset Aに依存する下流assetはmaterializeされることがなくなります。
コードとしては、@asset_check
デコレータの引数にblocking=True
に設定することで実現します。
@asset def upstream_asset(): pass @asset_check(asset=upstream_asset, blocking=True) def check_upstream_asset(): return AssetCheckResult(passed=False) @asset(deps=[upstream_asset]) def downstream_asset(): pass defs = Definitions( assets=[upstream_asset, downstream_asset], asset_checks=[check_upstream_asset] )
asset_checkを含めた・含めない実行
from dagster import ( AssetSelection, Definitions, ScheduleDefinition, asset, asset_check, define_asset_job, ) @asset def my_asset(): ... @asset_check(asset=my_asset) def check_1(): ... @asset_check(asset=my_asset) def check_2(): ... # my_assetとそのassetに対するasset_checkをジョブに含める my_job = define_asset_job("my_job", selection=AssetSelection.assets(my_asset)) # asset_checkを含めずに, my_assetを含める my_asset_only_job = define_asset_job( "my_asset_only_job", selection=AssetSelection.assets(my_asset).without_checks(), ) # my_assetを含めずにasset_checkのみを含める checks_only_job = define_asset_job( "checks_only_job", selection=AssetSelection.checks_for_assets(my_asset) ) # my_assetを含めずに, asset_check check1 のみを含める check_1_job = define_asset_job("check_1_job", selection=AssetSelection.checks(check_1)) # schedule my_job to run every day at midnight basic_schedule = ScheduleDefinition(job=my_job, cron_schedule="0 0 * * *") defs = Definitions( assets=[my_asset], asset_checks=[check_1, check_2], jobs=[my_job, my_asset_only_job, checks_only_job, check_1_job], schedules=[basic_schedule], )
assetを含むjobのみがmaterializeボタンがあることが確認できます。
また、jobにassetが無いときに限り、asset_check関数がノードとして現れることが分かります。assetがある場合にはassetのノードの下にcheckという項目がつくことが分かります。
asset checkのサブセット化
multi_asset_check
ではcan_subset=True
を設定するとアセットの一部だけのチェックを実行できます。
@multi_asset_check( specs=[ AssetCheckSpec(name="asset_check_one", asset="asset_one"), AssetCheckSpec(name="asset_check_two", asset="asset_two"), ], can_subset=True, ) def the_check(context: AssetCheckExecutionContext) -> Iterable[AssetCheckResult]: if ( AssetCheckKey(AssetKey("asset_one"), "asset_check_one") in context.selected_asset_check_keys ): yield AssetCheckResult( passed=True, metadata={"foo": "bar"}, check_name="asset_check_one" ) if ( AssetCheckKey(AssetKey("asset_two"), "asset_check_two") in context.selected_asset_check_keys ): yield AssetCheckResult( passed=True, metadata={"foo": "bar"}, check_name="asset_check_two" )
上記のコードを解説します。
まず、AssetCheckKey
はAssetCheckKey(アセットキー, アセットチェックキー)
でassetに対するasset_checkを指定できます。
asset_checkは複数のassetに掛けることが出来ますので、assetまで指定してAssetCheckKeyオブジェクトとなります。
AssetCheckExecutionContext
のcontext.selected_asset_check_keys
はどのアセットチェックが選択されているかの情報が含まれているため下記のコードは、「asset_oneに対するasset_check_oneが現在のアセットチェックの実行に含まれていたら、asset_check_oneという名のAssetCheckResultを返す」という実装になっています。
if ( AssetCheckKey(AssetKey("asset_one"), "asset_check_one") in context.selected_asset_check_keys ): yield AssetCheckResult( passed=True, metadata={"foo": "bar"}, check_name="asset_check_one" )
このように実装することで、asset_oneまたは、asset_twoのどちらか一方のアセットがmaterializeされたときに一方のチェック結果を返すことができます。
multi_assetの中で各assetに対してasset checkを行う
@multi_asset
は複数のassetを出力する関数でした。
この出力のそれぞれに対してasset checkを行いたい場合の実装です。
@multi_asset( specs=[ AssetSpec("multi_asset_piece_1", group_name="asset_checks", skippable=True), AssetSpec("multi_asset_piece_2", group_name="asset_checks", skippable=True), ], check_specs=[AssetCheckSpec("my_check", asset="multi_asset_piece_1")], can_subset=True, ) def multi_asset_1_and_2(context: AssetExecutionContext): if AssetKey("multi_asset_piece_1") in context.selected_asset_keys: yield MaterializeResult(asset_key="multi_asset_piece_1") # The check will only execute when multi_asset_piece_1 is materialized if ( AssetCheckKey(AssetKey("multi_asset_piece_1"), "my_check") in context.selected_asset_check_keys ): yield AssetCheckResult(passed=True, metadata={"foo": "bar"}) if AssetKey("multi_asset_piece_2") in context.selected_asset_keys: # No check on multi_asset_piece_2 yield MaterializeResult(asset_key="multi_asset_piece_2")
1番目と3番目のifは、assetがmaterializeされている場合に発火しMaterializedResult
を返します。
2番目のifはmulti_asset_piece_1がmaterializeされて、それのasset_checkであるmy_checkが実行されている場合、AssetCheckResult
を返します。
結果はすべてこのようなグラフになる。
AssetやSource Assetの鮮度チェックを行う
Source AssetやAssetにいつ変更があったのかをチェックすることができます。
下記のように書くことで、source_tablesというsource_tablesというAssetに対して鮮度チェックを行うことができます。
from datetime import timedelta from dagster import build_last_update_freshness_checks, AssetCheckDefinition source_table_freshness_checks: AssetCheckDefinition = build_last_update_freshness_checks( assets=[source_tables], lower_bound_delta=timedelta(hours=2), )
source_tables assetは、ObserveResult
をyieldし、そのmetadataとして最後に更新された時刻の情報を載せます。
@multi_observable_source_asset(specs=asset_specs) def source_tables(snowflake: SnowflakeResource): with snowflake.get_connection() as conn: freshness_results = fetch_last_updated_timestamps( snowflake_connection=conn.cursor(), tables=table_names, schema=TABLE_SCHEMA, ) for table_name, last_updated in freshness_results.items(): yield ObserveResult( asset_key=table_name, metadata={ "dagster/last_updated_timestamp": MetadataValue.timestamp( last_updated ) }, )
なお、Snowflakeにはdagster_snowflake.fetch_last_updated_timestamps
というテーブルが最後に更新された時刻を簡単に取得してくれる関数があるようです。