Cross-DAG dependencies are one of those things that seems fine until your pipelines get complicated. You have an ingestion DAG that loads raw data, a transformation DAG that cleans it, and a reporting DAG that emails the results.
At some point, someone hard-codes the schedule: ingest at 6am, transform at 7am, reports at 8am. It works. Until the ingest takes two hours one morning and the reporting DAG fires on yesterday’s data and nobody notices until the standup meeting.
The slightly less bad version of this is ExternalTaskSensor. You write something like:
wait_for_ingest = ExternalTaskSensor(
task_id='wait_for_ingest',
external_dag_id='ingest_customers',
external_task_id='load_to_staging',
execution_delta=timedelta(hours=1),
timeout=3600,
)
And now you’ve traded one fragile assumption (timing) for a different one (exact execution alignment). The sensor pokes the metadata DB every 60 seconds, has to match execution dates perfectly, and will happily sit there and time out if the upstream DAG ran even slightly differently than expected.
There’s a better model, and it’s been in Airflow since 2.4.
Data-Aware Scheduling
The idea is simple: instead of coupling DAGs by time or by explicit references to other DAGs, you couple them by the data they produce and consume.
In Airflow 2.4+, a Dataset is just a URI, a logical identifier for a piece of data.
It can point to an S3 path, a database table, a file location, whatever makes sense for your pipeline.
Airflow doesn’t actually read or write to that location. The URI is just a label that lets Airflow know
“this task produced something” and “this DAG should run when that something is ready.”
Two concepts:
- Producers — tasks that emit a dataset when they complete
- Consumers — DAGs that schedule themselves to run when a dataset is updated
That’s the whole model. No sensors, no time alignment, no TriggerDagRunOperator.
It’s the same shift you see going from imperative to declarative code. With time-based schedules and sensors, you’re scripting the execution order: run this at 7am, wait for that task ID, poll every 60 seconds. With datasets, you declare what each DAG consumes and what it produces. Airflow works out when things run.
Setting Up a Producer
On the producing side, you declare the dataset and add it to the task’s outlets:
from airflow import Dataset
from airflow.decorators import dag, task
from datetime import datetime
customers_dataset = Dataset("s3://your-bucket/staging/customers/")
@dag(schedule="0 6 * * *", start_date=datetime(2024, 1, 1), catchup=False)
def ingest_customers():
@task(outlets=[customers_dataset])
def load_to_staging():
# your actual load logic here
pass
load_to_staging()
ingest_customers()
When load_to_staging completes successfully, Airflow marks customers_dataset as updated.
That’s the signal. If the task fails, the dataset doesn’t get marked and downstream DAGs don’t run.
Setting Up a Consumer
On the consuming side, you swap out the schedule parameter:
from airflow import Dataset
from airflow.decorators import dag, task
from datetime import datetime
customers_dataset = Dataset("s3://your-bucket/staging/customers/")
@dag(schedule=[customers_dataset], start_date=datetime(2024, 1, 1), catchup=False)
def transform_customers():
@task
def run_transformations():
# dbt run, custom transforms, whatever
pass
run_transformations()
transform_customers()
That’s it. transform_customers will trigger automatically every time ingest_customers successfully updates the dataset.
No sensors. No execution date math. No polling.
Chaining It Further
The pattern composes naturally. Your transform DAG can itself produce a dataset, which then triggers the reporting DAG:
from airflow import Dataset
from airflow.decorators import dag, task
from datetime import datetime
customers_dataset = Dataset("s3://your-bucket/staging/customers/")
customers_transformed = Dataset("s3://your-bucket/warehouse/customers/")
@dag(schedule=[customers_dataset], start_date=datetime(2024, 1, 1), catchup=False)
def transform_customers():
@task(outlets=[customers_transformed])
def run_transformations():
pass
run_transformations()
transform_customers()
@dag(schedule=[customers_transformed], start_date=datetime(2024, 1, 1), catchup=False)
def customer_report():
@task
def send_report():
pass
send_report()
customer_report()
Ingest completes, transform triggers, report triggers. Each DAG only knows about its own inputs and outputs. The orchestration is implicit in the data.
Multiple Datasets: AND Logic
If a DAG needs to wait for multiple datasets before running, you can pass a list. Airflow treats this as AND logic, all listed datasets must be updated before the consumer triggers:
customers_ready = Dataset("s3://your-bucket/staging/customers/")
orders_ready = Dataset("s3://your-bucket/staging/orders/")
@dag(
schedule=[customers_ready, orders_ready],
start_date=datetime(2024, 1, 1),
catchup=False
)
def build_customer_order_model():
...
This DAG will only trigger after both upstream ingest jobs have completed successfully in the same scheduling cycle. You get fan-in behavior without writing a single sensor.
What You Get in the UI
The Airflow web UI has a dedicated Datasets view that shows you all registered datasets, which DAGs produce each one, which DAGs consume each one, and a history of when each was last updated.
This is actually more useful than it sounds. When something breaks, you can open the dataset view and immediately see which producer DAG stalled and which consumer DAGs are waiting on it. The dependency graph that used to live in someone’s head or in a wiki page nobody updates is now visible in the scheduler.
The Trade-offs
This model isn’t for every situation. A few things worth knowing before you go all-in:
No time-based backfill. If you need to reprocess data for a historical date range, dataset-scheduled DAGs don’t help. They only trigger in response to real dataset update events. For backfills you still need time-based schedules, or you trigger manually.
No OR logic out of the box. It’s AND. All listed datasets must update before the consumer runs. If you need “run when any of these datasets update,” you’ll need to restructure or stick with sensors for that case.
Dataset state is per-event, not persistent. If the scheduler restarts mid-cycle, dataset state can behave unexpectedly in older Airflow versions. This improved in 2.9, but it’s worth knowing if you’re on an older version.
The URI is just a label. Airflow doesn’t validate that the URI actually points to anything. If your producer task marks the dataset as updated without actually writing data (because of a bug or early exit), the consumer still triggers. The dataset mechanism handles scheduling, not data quality.
Wrapping Up
ExternalTaskSensor works, but it’s the duct tape solution.
You’re coupling DAGs by schedule alignment and then building sensors to detect when that alignment held.
It’s two layers of fragility stacked on top of each other.
Dataset scheduling flips the model. Your DAGs describe what data they need and what data they produce. Airflow figures out the rest. The dependency graph is visible, the trigger logic is automatic, and you stop worrying about whether the upstream DAG finishes before this one starts.
If you’re already building pipelines with clear data hand-offs ( staging tables, S3 paths, intermediate datasets )
you’re halfway there. Add a Dataset to those tasks and let Airflow handle the scheduling.