Airflow Core Concepts¶
This page explains the Airflow core blocks that appear throughout our DAG code, with concrete examples from our own pipeline. The complete documentation can be found on Apache's Airflow official docs.
DAG¶
A DAG (Directed Acyclic Graph) is a Python file that describes a set of tasks and the dependencies between them. "Acyclic" means there are no loops: the graph always flows forward. Every DAG has a schedule_interval (a cron expression) and a start_date.
dag = DAG(
'download_ifs_an', # unique ID used in the UI and CLI
default_args=default_args,
start_date=datetime(2025, 8, 2),
schedule_interval='00 08 * * *', # every day at 08:00 UTC
max_active_runs=1, # prevent overlapping runs
catchup=False, # do not backfill missed runs
)
catchup=False is very important; without it, if Airflow is restarted after a weekend, it would try to run every missed daily DAG instance, flooding Lucia with model submissions. All our DAGs set catchup=False.
max_active_runs=1 ensures that if a run from the previous day is still retrying (waiting for a file), the new day's run does not start until the old one finishes or is cleared.
All our DAGs start paused (AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=true). A new DAG file must be explicitly enabled in the UI before it starts running.
Operators¶
An operator is a single task in a DAG. It defines what happens when the task executes.
PythonOperator¶
Runs a Python callable. The function receives **context which gives access to the task instance (context['ti']), the logical date (context['ds']), and so on.
# From download_local_ifs_analysis (LOCAL/download_IFS_an.py)
download_task = PythonOperator(
task_id='download_file',
python_callable=download_file, # regular Python function
dag=dag,
)
Our Python tasks handle file downloads (download_file in the IFS DAGs), NetCDF compression (compress_files in compress_results), CAMS processing (download_cams), and job number bookkeeping (store_results_globally in model_lucia_run).
BashOperator¶
Runs a shell command inside the Airflow worker container. We use it for tasks that are already implemented as shell scripts, or where constructing a multi-line bash script is more readable than Python.
# From download_MAR_local (LOCAL/download_MAR.py)
download_MAR = BashOperator(
task_id='download_MAR',
dag=dag,
bash_command="""
today9_str2="{{ get_date(9,2) }}"
ssh -i /opt/airflow/users_home/luc/.ssh/id_rsa \
lvdbulcke@139.165.67.19 \
"test -f /srv10_tmp5/.../Spectral_${today9_str2}.nc"
""",
)
With SSHOperator's, these two blocks are at the core of our orchestration of the Lucia HPC cluster. They are our "controller" tasks that submit jobs, wait for them to finish, download the results, ...
SSHOperator¶
Executes a command on a remote host over SSH. The target host is specified by an Airflow Connection ID. This is the primary mechanism for controlling the Lucia HPC cluster.
# From download_ifs_an (LUCIA/download_ifs_an.py)
download_ifs_an = SSHOperator(
conn_timeout=5*60, # seconds to establish the SSH connection
cmd_timeout=5*60, # seconds to wait for the command to finish
task_id='download_ifs_an',
ssh_conn_id='lucia_gateway_luc', # Airflow Connection ID
command='ssh frontal "/gpfs/.../data/IFS/get_an.sh >> get_an.log 2>&1"',
dag=dag,
)
The lucia_gateway_luc Connection stores the SSH credentials (host, user, private key). The SSHOperator opens a session to the gateway node and runs ssh frontal ... inside it, a two-hop pattern because Lucia's compute frontal node is behind a gateway.
Sensors¶
A sensor is a special operator that waits for a condition to become true. It polls at a configurable interval and blocks the rest of the DAG until the condition is met or the timeout is reached.
SFTPSensor¶
Polls an SFTP server until a file at a given path exists.
# From download_local_ifs_analysis (LOCAL/download_IFS_an.py)
wait_for_file = SFTPSensor(
task_id='wait_for_file',
sftp_conn_id=SFTP_CONN_ID,
path="{{ task_instance.xcom_pull(task_ids='generate_paths')['remote_path'] }}",
timeout=60 * 60 * 12, # give up after 12 hours
poke_interval=60 * 15, # check every 15 minutes
dag=dag,
)
PythonSensor¶
Like PythonOperator, but the callable returns True (proceed) or False (keep waiting).
# From compress_results (LOCAL/compress_results.py)
def check_files_arrived(**context):
ocean_output = os.path.join(BASE_PATH, get_date(0), 'ocean.output')
return os.path.exists(ocean_output)
file_sensor = PythonSensor(
task_id='check_files_arrived',
python_callable=check_files_arrived,
timeout=4*60*60, # give up after 4 hours
poke_interval=300, # check every 5 minutes
dag=dag,
)
The compress_results DAG uses this to wait for ocean.output to appear in the NRT_V2025 output directory, confirming that Lucia has finished the model run and model_lucia_postprocess has successfully transferred all files.
TimeSensor¶
Waits until the wall clock reaches a target time. Not polling an external resource; just blocking until a specific time of day.
# From download_CAMS (LOCAL/download_CAMS.py)
release_at_1230 = TimeSensor(
task_id='release_at_1230',
target_time=time(12, 30), # 12:30 UTC
poke_interval=60,
mode='reschedule',
dag=dag,
)
This is used in download_CAMS to ensure the forecast-extension step does not run until 12:30 UTC, giving the CAMS API time to publish that day's data before we finalize and distribute the forcing files.
mode='reschedule' means the sensor releases its worker slot between polls. Without this, the worker slot would be held idle for hours. The airflow-triggerer service handles the scheduling of the next poke while the worker is free.
XCom¶
XCom (cross-communication) lets tasks pass small pieces of data to downstream tasks within the same DAG run. Values are stored in the Postgres metadata database.
# Task 1: compute paths and return them
def get_file_paths(**context):
return {'filename': '...', 'remote_path': '...', 'local_path': '...'}
generate_paths = PythonOperator(task_id='generate_paths', ...)
# Task 2: read the paths that Task 1 stored
def download_file(**context):
paths = context['task_instance'].xcom_pull(task_ids='generate_paths')
local_path = paths['local_path']
A PythonOperator's return value is automatically pushed to XCom under the key return_value. xcom_pull(task_ids='...') retrieves it.
XCom is also used across DAGs in one case: model_lucia_run stores the Slurm job number in an Airflow Variable (not an XCom) because model_lucia_postprocess runs as a separate DAG:
# model_lucia_run
Variable.set("submitted_job", results['job_submitted'])
# model_lucia_postprocess
submitted_job_var = Variable.get("submitted_job")
XCom is for small data only
XCom values are stored in the Postgres metadata database. Keep them small: file paths, job IDs, status strings, counts. Never push a NetCDF array or a large DataFrame through XCom.
Variables¶
Variables are global key-value pairs stored in the Postgres metadata database. They persist across DAG runs and container restarts. We use them for:
- Credentials that do not fit the Connection model (Copernicus username/password)
- State passing between separate DAGs (the Slurm job number from
model_lucia_runtomodel_lucia_postprocess) - Transient state flags (the
after_17_alert_sent_<run_id>flag inmodel_lucia_postprocess)
Connections¶
Connections store the host, port, login, and encrypted password for external systems. The password is encrypted with the Fernet key from .env.
The key pattern throughout our DAGs is that conn_id strings are hardcoded in the DAG code, but the actual credentials (host, private key, password) live only in the Connections store in the database. Moving the stack to a new host only requires re-entering Connections, not editing DAG files.
Task dependencies¶
Dependencies are expressed with the >> and << operators:
# Linear chain
generate_paths >> wait_for_file >> download_task >> backup_task
# Fanout: both tasks start after prepare
[check_an, check_an00, check_fc] >> prepare_ifs
# Fanin: cleanup starts after both branches finish
model_postproc >> [check_model_postproc, copy_to_marines]
check_model_postproc >> cleanup_jobnr_Variable
Tasks with no explicit dependency on each other run in parallel, subject to the executor's slot limits.
Retry logic¶
Our DAGs use retries aggressively. Rather than building a dedicated long-running sensor for each data source, many DAGs simply retry the entire task every N minutes, up to a count that covers the expected wait window.