Op retries
If you are just getting started with Dagster, we strongly recommend you use assets rather than ops to build your data pipelines. The ops documentation is for Dagster users who need to manage existing ops, or who have complex use cases.
When an exception occurs during op execution, Dagster provides tools to retry that op within the same job run.
Relevant APIs
| Name | Description | 
|---|---|
| RetryRequested | An exception that can be thrown from the body of an op to request a retry | 
| RetryPolicy | A declarative policy to attach which will have retries requested on exception | 
| Backoff | Modification to delay between retries based on attempt number | 
| Jitter | Random modification to delay beween retries | 
Overview
In Dagster, code is executed within an op. Sometimes this code can fail for transient reasons, and the desired behavior is to retry and run the function again.
Dagster provides both declarative RetryPolicy as well as manual RetryRequested exceptions to enable this behavior.
Using op retries
Here we start off with an op that is causing us to have to retry the whole job anytime it fails.
@dg.op
def problematic():
    fails_sometimes()
RetryPolicy
To get this op to retry when an exception occurs, we can attach a RetryPolicy.
@dg.op(retry_policy=dg.RetryPolicy())
def better():
    fails_sometimes()
This improves the situation, but we may need additional configuration to control how many times to retry and/or how long to wait between each retry.
@dg.op(
    retry_policy=dg.RetryPolicy(
        max_retries=3,
        delay=0.2,  # 200ms
        backoff=dg.Backoff.EXPONENTIAL,
        jitter=dg.Jitter.PLUS_MINUS,
    )
)
def even_better():
    fails_sometimes()
In addition to being able to set the policy directly on the op definition, it can also be set on specific invocations of an op, or a @dg.job to apply to all ops contained within.
default_policy = dg.RetryPolicy(max_retries=1)
flakey_op_policy = dg.RetryPolicy(max_retries=10)
@dg.job(op_retry_policy=default_policy)
def default_and_override_job():
    problematic.with_retry_policy(flakey_op_policy)()
Retry policies also work for asset jobs.
import random
import dagster as dg
@dg.asset
def sample_asset():
    if random.choice([True, False]):
        raise Exception("failed")
sample_job = dg.define_asset_job(
    name="sample_job",
    selection="sample_asset",
    op_retry_policy=dg.RetryPolicy(max_retries=3),
)
RetryRequested
In certain more nuanced situations, we may need to evaluate code to determine if we want to retry or not. For this we can use a manual RetryRequested exception.
@dg.op
def manual():
    try:
        fails_sometimes()
    except Exception as e:
        if should_retry(e):
            raise dg.RetryRequested(max_retries=1, seconds_to_wait=1) from e
        else:
            raise
Using raise from will ensure the original exceptions information is captured by Dagster.