Failure Handling in Apache Airflow DAGs
Learn strategies for graceful failures and efficient monitoring in your DAGs.
Apache Airflow is a powerful tool for orchestrating complex data pipelines. However, no matter how well you design your directed acyclic graphs (DAGs), failures are inevitable. The key is to make your DAGs fail gracefully so you can minimize impact.
In this blog, we take a deep dive into various strategies and look at code snippets that’ll help you make your data pipelines more fault-tolerant, and maintainable.
Variable Retry Mechanisms
Exponentially increase the delay between task retries
Airflow’s built-in retry mechanism is useful, but it often makes sense to exponentially increase the delay between the tries instead of having a constant delay between retries.
Imagine you have a task that calls an external API. If the API goes down temporarily due to rate-limiting or maintenance, the exponential backoff strategy gives the API more time to recover, rather than bombarding it with repeated calls.
You can achieve this with some custom Python logic:
from datetime import timedelta
def custom_retry_delay(context):
var_factor = context['ti'].try_number ** 2 factor
return timedelta(minutes=5 * var_factor)
default_args = {
'retries': 3,
'retry_delay': custom_retry_delay,
}
So in the above code, we define a custom retry delay function. The try_number is squared to get an exponential factor. This is then used to calculate the new delay time, multiplying it with a base delay.
Dynamic retry delays
Similar to exponential delay, dynamic delay introduces a variable retry interval. The difference is with dynamic delay, you can delay times using a pre-defined list. There are no dynamic calculations involved, so there is no dynamic progression or increase. You can set dynamic retry delays by setting retry_delay
to a list of timedelta
objects.
This is useful when you know the exact intervals you’d like between your retries. For example, when dealing with multiple services with known recovery times, you can set the delays accordingly.
default_args = {
'retries': 3,
'retry_delay': [timedelta(minutes=5), timedelta(minutes=10), timedelta(minutes=20)]
}
State Management and Conditional Logic
Using custom XComs for State
For those not familiar, XComs is short for “cross-communications”. It’s a feature that allows you to pass state in form of messages of small amounts of data between tasks. This can be useful when you have tasks that depend on the state of previous tasks. XComs are especially useful in making your DAGs fail gracefully by letting subsequent tasks know if prior task has failed, partially succeeded, or produced a certain output.
def push_xcom_value(**kwargs):
ti = kwargs['ti']
ti.xcom_push(key='my_key', value='my_value')
One of the arguments to push_xcom_value
is ti
, which tasks for task instance. The ti
object has methods for pushing and pulling XCom values in form of key-pairs, like my_key
and my_value
.
When this task is run, it will push the key-pair into XCom. Other tasks within the same DAG can pull this value and make decisions based on it.
def pull_xcom_and_act(**kwargs):
ti = kwargs['ti']
value = ti.xcom_pull(key='my_key')
if value == 'my_value':
do_something()
else:
do_something_else()
Imagine you are asked to set up a data pipeline where the first task is to fetch data from an API. If the API returns an empty dataset, there is no point running the rest of the pipeline. Using XCom, you can pass the state of the data from the first task to subsequent tasks, which can then decide whether to continue execution or skip themselves.
Conditional skipping of tasks
In come cases, you may want to conditionally skip tasks based on custom logic or the state of other tasks. Skipping tasks can help you make your DAGs by avoiding unnecessary computations or operations.
from airflow.exceptions import AirflowSkipException
def conditionally_skip_task():
if some_condition:
raise AirflowSkipException
In above code, when the task is run, the condition will be evaluated. If it is met, the task will raise an exception that will effectively skip the task, setting its state to “Skipped” in the Airflow UI.
Looking at the same case use case as above, this exception would be useful if you want to skip certain tasks when the data comes back empty.
Task rescheduling
Airflow allows tasks to be rescheduled rather than retried from scratch, which is useful for long-running or resource-intensive tasks. This is done by the Reschedule
exception from airflow.models
.
Consider a data processing task that takes several hours to complete. If the task fails midway due to issues like network timeouts, rescheduling allows the task to continue from where it left off, saving both time and compute resources.
Here’s an example:
from airflow.models import Reschedule
def long_running_task():
if not task_complete:
raise Reschedule
So when Reschedule
is raised, the task instance will be rescheduled to run again at the next available time slot, maintaining its state.
This feature enables more efficient resource utilization and can significantly improve the reliability of long-running tasks in your DAGs.
Callbacks and Alerts
Using on_failure_callback
and on_retry_callback
Airflow provides callback mechanisms that allow you to define custom behavior when a task fails or is retried. These callbacks can be used to send alerts, clean up resources, or perform any other custom logic
from airflow.operators.python import PythonOperator
def alert_on_failure(context):
pass
task = PythonOperator(
task_id='my_task'
python_callable=my_python_function
on_failure_callback=alert_on_failure
)
When the task fails, the function specified in on_failure_callback
will be executed.
These callbacks are particularly useful for alerting. For instance, you could use
on_failure_callback
to send a Slack message, log the failure details, or even trigger another task that performs some sort of cleanup or rollback. This is demonstrated next
Add Slack webhooks and operators for notifications
Airflow provides built-in support for Slack notifications via the SlackAPIPostOperator
. This lets you send custom messages to a Slack channel, which can be useful for alerting your team when a task fails or meets certain conditions.
Let's say you have a data pipeline with multiple tasks, and it's crucial to know immediately if any task fails. You can add the
slack_alert
task to your DAG and use Airflow'strigger_rule
to execute it whenever a task fails, ensuring instant notification.
from airflow.providers.slack.operators.slack_api import SlackAPIPostOperator
slack_alert = SlackAPIPostOperator(
task_id = 'slack_alert',
text = 'Task failed!'
token = 'your_token',
channel = '#your-channel'
)
This task can be added to your DAG and set to execute when a preceding task fails, succeeds, or meets some other condition. When triggered, the SlackAPIPostOperator
will post the specified message to the given Slack channel.
Setting up alerts
Airflow also has in-built support to set up email alerts that can be configured using email_on_failure
and email_on_retry
parameters. These parameters allow you to automatically send email alerts when tasks in your DAG fail or are retried.
Consider a data pipeline where task failures could have a significant impact. By setting
email_on_failure
toTrue
, you can ensure that the right team members are immediately notified and can take quick action to resolve the issue.
default_args = {
'email_on_failure': True,
'email_on_retry': False,
'email': ['your-email@example.com']
}
You can specify multiple recipients in the email list.
Dependency Management
External trigger dependency with sensors
Airflow’s sensor tasks are designed to wait for certain criteria to be met before proceeding to execute subsequent tasks. One common use case is to use the ExternalTaskSensor
to wait for the completion of tasks in another DAG.
Imagine two separate DAGs: 1) data ingestion and 2) data processing. The processing DAG should only run after the ingestion DAG has successfully ingested the day's data. You can use
ExternalTaskSensor
in the processing DAG to wait for the completion of the ingestion task in the ingestion DAG.
from airflow.sensors.external_task import ExternalTaskSensor
wait_for_other_task = ExternalTaskSensor(
task_id='wait_for_other_task',
external_dag_id='other_dag_id',
external_task_id='other_task_id',
)
The ExternalTaskSensor
will keep polling for the status of the specified external task in the specified external DAG. Once that task is complete, the sensor task will be marked as complete, allowing downstream tasks to proceed.
Setting up Service Level Agreements (SLAs)
Service Level Agreements (SLAs) in Airflow allow you to specify the maximum amount of time a task should take to complete. This can be particularly useful to ensure your tasks meet operational requirements.
Suppose you have a task that aggregates daily sales data and it's important for this data to be available within the first few hours of the new day for reporting. By setting an SLA, you can make sure that any delays in this task are flagged to allow for prompt remedial action.
my_task = PythonOperator(
task_id='my_task',
python_callable=my_python_function,
sla=timedelta(hours=2),
)
When this task is executed, Airflow will monitor its duration. If the task takes more than 2 hours to complete, it will violate the SLA. Airflow provides ways to alert or take other actions when SLA violations occur, often configured via the email
and sla_miss_callback
parameters.
That’s all for now! These techniques are best practices that can help you build robust data pipelines. Feel free to share any additional techniques or insights you might have—I'd love to hear from you!