Recently, I’ve been trying to coordinate two Airflow DAGs such that one would only run — on its own hourly schedule — if the other DAG (running on a daily basis) has been successful.
In today’s tutorial I will walk you through the use case and demonstrate how to achieve the desired behaviour in three different ways; two using the ExternalTaskSensor
and another one using a customised approach with PythonOperator
.
Now let’s get started with our use case that involves two Airflow DAGs.
The first DAG, my_daily_dag
, runs every day at 5AM UTC.
from datetime import datetime, timedelta
from pathlib import Pathfrom airflow.models import DAG
from airflow.operators.dummy import DummyOperator
with DAG(
catchup=False,
dag_id='my_daily_dag'
start_date=datetime(2023, 7, 26),
default_args={
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=2),
},
schedule_interval='0 5 * * *',
max_active_runs=1,
) as dag:
DummyOperator(task_id='dummy_task')
The second DAG, my_hourly_dag
, runs on an hourly basis, between 6AM and 8PM UTC.
from datetime import datetime, timedelta
from pathlib import Pathfrom airflow.models import DAG
from airflow.operators.dummy import DummyOperator
with DAG(
catchup=False,
dag_id='my_daily_dag'
start_date=datetime(2023, 7, 26),
default_args={
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=2),
},
schedule_interval='0 6-20 * * *', # At :00 every hour between 6AM-8PM
max_active_runs=1,
) as dag:
DummyOperator(task_id='dummy_task')
In our use case, we would like my_hourly_dag
to run only if my_daily_dag
has ran successfully within the current date. If not, then my_hourly_dag
should be skipped. It is important to mention here that we don’t want to trigger my_hourly_dag
as soon as my_daily_dag
succeeds. That would be achievable with TriggerDagRun
…