概要
Dagsterのリセットポリシーについてまとめてみました。
Dagsterのリトライの単位は大きく2つあります。
- 同run内でop/asset単位のリトライ
- 異なるrunで失敗したop/assetのみをリトライ
同run内でop/asset単位のリトライ
同じrun内でop/asset単位でリトライを行うには、@op
, @asset
にretry_policy=RetryPolicy()
を設定します。
ResetPolicy
docs.dagster.io
RetryPolicyには4つの引数があります。
引数 | 型 | デフォルト | 意味 |
max_retries |
int |
1 | リトライの最大回数 |
delay |
Optional[Union[int,float]] |
None | 再試行が要求されてから次の試行が開始されるまでの待機時間 (秒単位) |
backoff |
Optional[Backoff] |
None | 再試行回数に応じた遅延の修飾子 |
jitter |
Optional[Jitter] |
None | バックオフ計算後に適用される、遅延のランダム化修飾子 |
Backoff
にはLINEAR
とEXPONENTIAL
があります。等間隔でリトライを行うか、リトライ回数が増えるほど指数的にリトライ間隔が長くなるようにリトライを行うかという違いがあります。
Jitter
は、更に間隔にランダム性をもたせるもので、FULL
であればリトライ間隔に乱数をかけます。PLUS_MINUS
であれば、乱数を使いjitterを適用する前までのリトライ間隔の前後になるようにリトライ間隔を調整します。
コード例
次のコードを用意しました。
first_opの1回目(dagsterのretryの数え方的には0回目)で必ず落ちるようにif context.retry_number == 0: raise Exception()
を行っています。
from dagster import Definitions, OpExecutionContext, RetryPolicy, job, op @op(retry_policy=RetryPolicy(max_retries=2)) def first_op(context: OpExecutionContext): if context.retry_number == 0: raise Exception() return "First op completed!" @op def second_op(first_op_result): return "Second op completed!" @job def my_job(): first_result = first_op() # first_opを実行し、その結果を変数に格納 second_op(first_result) # second_opがfirst_opの結果に依存して実行 defs = Definitions(jobs=[my_job])


この結果から分かる通り、first_opが1回目(0回目)で失敗し、job内で再実行され、後続のDAGを実行していることが分かります。
異なるrunで失敗したop/assetのみをリトライ
dagster dev
想定ではないですが、Dagster+やクラウドでホストすると、job単位で再実行が可能になります。
job内のasset/opが失敗すると、そのasset/op及び後続のasset/opはそのrunでは実行されません。
そのrunが終わると、異なるrunが立ち上がり、失敗したasset/op及び、その後続のasset/opがリトライされます。
この設定方法は次の2つどちらかの方法で実現できます。
docs.dagster.io
dagster.yaml
dagster.yaml
に下記の設定を記載して実行すると実現できます。
run_retries: enabled: true # Omit this key if using Dagster+, since run retries are enabled by default max_retries: 3
このYAMLを配置し、ローカルで開発するコマンドdagster dev
を行うと、実行を記録するDBが無いとのエラーが置きます。
そのため、ローカルでの開発とdagster.yaml
による設定反映の相性は良くないです。
jobのタグに設定
@job
にタグを設定する方法で実現できます。
dagster.yaml
に設定を書く方法は、こちらの方法と同じようにタグに展開されるので、結果的には同じことをしています。
from dagster import job @job(tags={"dagster/max_retries": 3}) def sample_job(): pass @job(tags={"dagster/max_retries": 3, "dagster/retry_strategy": "ALL_STEPS"}) def other_sample_sample_job(): pass