Dagsterのjob内のasset/opのリセットの仕組み

概要

Dagsterのリセットポリシーについてまとめてみました。
Dagsterのリトライの単位は大きく2つあります。

  • 同run内でop/asset単位のリトライ
  • 異なるrunで失敗したop/assetのみをリトライ

同run内でop/asset単位のリトライ

同じrun内でop/asset単位でリトライを行うには、@op, @assetretry_policy=RetryPolicy()を設定します。

ResetPolicy

docs.dagster.io
RetryPolicyには4つの引数があります。

引数 デフォルト 意味
max_retries int 1 リトライの最大回数
delay Optional[Union[int,float]] None 再試行が要求されてから次の試行が開始されるまでの待機時間 (秒単位)
backoff Optional[Backoff] None 再試行回数に応じた遅延の修飾子
jitter Optional[Jitter] None バックオフ計算後に適用される、遅延のランダム化修飾子

BackoffにはLINEAREXPONENTIALがあります。等間隔でリトライを行うか、リトライ回数が増えるほど指数的にリトライ間隔が長くなるようにリトライを行うかという違いがあります。

Jitterは、更に間隔にランダム性をもたせるもので、FULLであればリトライ間隔に乱数をかけます。PLUS_MINUSであれば、乱数を使いjitterを適用する前までのリトライ間隔の前後になるようにリトライ間隔を調整します。

コード例

次のコードを用意しました。
first_opの1回目(dagsterのretryの数え方的には0回目)で必ず落ちるようにif context.retry_number == 0: raise Exception()を行っています。

from dagster import Definitions, OpExecutionContext, RetryPolicy, job, op

@op(retry_policy=RetryPolicy(max_retries=2))
def first_op(context: OpExecutionContext):
    if context.retry_number == 0:
        raise Exception()
    return "First op completed!"

@op
def second_op(first_op_result):
    return "Second op completed!"

@job
def my_job():
    first_result = first_op()  # first_opを実行し、その結果を変数に格納
    second_op(first_result)  # second_opがfirst_opの結果に依存して実行

defs = Definitions(jobs=[my_job])
jobのDAG
jobの実行結果

この結果から分かる通り、first_opが1回目(0回目)で失敗し、job内で再実行され、後続のDAGを実行していることが分かります。

異なるrunで失敗したop/assetのみをリトライ

dagster dev想定ではないですが、Dagster+やクラウドでホストすると、job単位で再実行が可能になります。
job内のasset/opが失敗すると、そのasset/op及び後続のasset/opはそのrunでは実行されません。
そのrunが終わると、異なるrunが立ち上がり、失敗したasset/op及び、その後続のasset/opがリトライされます。
この設定方法は次の2つどちらかの方法で実現できます。
docs.dagster.io

dagster.yaml

dagster.yamlに下記の設定を記載して実行すると実現できます。

run_retries:
  enabled: true # Omit this key if using Dagster+, since run retries are enabled by default
  max_retries: 3

このYAMLを配置し、ローカルで開発するコマンドdagster devを行うと、実行を記録するDBが無いとのエラーが置きます。
そのため、ローカルでの開発とdagster.yamlによる設定反映の相性は良くないです。

jobのタグに設定

@jobにタグを設定する方法で実現できます。
dagster.yamlに設定を書く方法は、こちらの方法と同じようにタグに展開されるので、結果的には同じことをしています。

from dagster import job


@job(tags={"dagster/max_retries": 3})
def sample_job():
    pass


@job(tags={"dagster/max_retries": 3, "dagster/retry_strategy": "ALL_STEPS"})
def other_sample_sample_job():
    pass