Download DAGs¶
These DAGs fetch the external data sources that the model needs as forcing inputs. Each data source has two parallel download DAGs: one that downloads to Lucia and one that downloads to Marines (for the GCP backup).
ECMWF IFS atmospheric forcing¶
The model uses three ECMWF IFS (Integrated Forecasting System) files per day:
| File type | Filename pattern | Content |
|---|---|---|
| Analysis (an) | {yesterday}-ECMWF---AM0100-MEDATL-b{today}_an-fv13.00.nc |
Yesterday's analysis fields at today's time stamp |
| Analysis 00 (an00) | {today}-ECMWF---AM0100-MEDATL-b{today}_an00-fv13.00.nc |
Today's analysis at 00:00 |
| Forecast (fc) | {today}_{today+10}-ECMWF---AM0100-MEDATL-b{today}_fc00-fv13.00.nc |
10-day forecast from today |
download_ifs_an (Lucia)¶
| Property | Value |
|---|---|
| Schedule | 00 08 * * * (08:00 UTC) |
| Retries | 56 x 15 min = up to 14 hours |
| File | LUCIA/download_ifs_an.py |
Triggers the get_an.sh script on the Lucia frontal node. That script is responsible for fetching the IFS analysis file to Lucia's GPFS filesystem.
download_ifs_an = SSHOperator(
conn_timeout=5*60,
cmd_timeout=5*60,
task_id='download_ifs_an',
ssh_conn_id='lucia_gateway_luc',
command='ssh frontal "/gpfs/.../data/IFS/get_an.sh >> get_an.log 2>&1"',
)
The SSH chain is: Docker worker -> lucia_gateway_luc gateway -> ssh frontal. Logs from get_an.sh are appended to get_an.log on Lucia.
download_ifs_an00 (Lucia)¶
| Property | Value |
|---|---|
| Schedule | 01 08 * * * (08:01 UTC) |
| Retries | 56 x 15 min = up to 14 hours |
| File | LUCIA/download_ifs_an00.py |
One minute after download_ifs_an to avoid concurrent SSH sessions clashing. Runs get_an00.sh on Lucia.
download_ifs_fc (Lucia)¶
| Property | Value |
|---|---|
| Schedule | 02 08 * * * (08:02 UTC) |
| Retries | 56 x 15 min = up to 14 hours |
cmd_timeout |
2 hours 2 minutes |
| File | LUCIA/download_ifs_fc.py |
Runs get_fc.sh on Lucia. The forecast file is much larger than the analysis files, so the cmd_timeout is set to 2 hours because of the slower transfers.
download_local_ifs_analysis (LOCAL)¶
| Property | Value |
|---|---|
| Schedule | 30 07 * * * (07:30 UTC) |
| Retries | 3 x 5 min |
| Source | sftp_CMCC (primary SFTP at CMCC) |
| Destination | /opt/airflow/marines_data/data/IFS/Analysis/ |
| Backup | seamod SFTP server |
| File | LOCAL/download_IFS_an.py |
generate_paths: computes the date-specific filename and paths, pushes them to XCom.wait_for_file(SFTPSensor): pollssftp_CMCCevery 15 minutes for up to 12 hours until the remote file appears. The file path is read from the XCom left bygenerate_paths.download_file: retrieves the file fromsftp_CMCCto local disk. Skips if the file is already present (safe to re-run).backup_file: uploads the downloaded file to theseamodSFTP server for redundancy.
The 07:30 UTC start is 30 minutes before the equivalent Lucia DAG (08:00 UTC) because the local download includes an additional SFTPSensor wait that the Lucia shell script handles internally.
download_local_ifs_an00 (LOCAL)¶
| Property | Value |
|---|---|
| Schedule | 31 07 * * * (07:31 UTC) |
| Retries | 3 x 10 min |
| Source | sftp_CMCC |
| Destination | /opt/airflow/marines_data/data/IFS/Analysis/ |
| Backup | seamod |
| File | LOCAL/download_IFS_an00.py |
Same four-task pattern as download_local_ifs_analysis, but for the an00 file. Runs one minute later to avoid concurrent SFTP sessions.
download_local_ifs_fc (LOCAL)¶
| Property | Value |
|---|---|
| Schedule | 32 07 * * * (07:32 UTC) |
| Retries | 20 x 15 min |
| Source | sftp_CMCC with automatic fallback to sftp_CMCC_backup (AWS) |
| Destination | /opt/airflow/marines_data/data/IFS/Forecast/ |
| Backup | seamod |
| File | LOCAL/download_IFS_fc.py |
| Execution timeout | 1 hour per attempt |
Unlike the analysis files, the forecast file has a two-server fallback baked into the download_with_fallback Python function:
- Try to download from
sftp_CMCC(primary). Use a.tmpextension during download, thenos.rename()to the final path atomically. - If that fails for any reason, switch automatically to
sftp_CMCC_backup(an AWS SFTP mirror) and retry from there. - If both fail, raise an
AirflowExceptionto trigger the task retry.
This means a single sftp_CMCC outage does not require human intervention; the DAG heals itself.
No SFTPSensor for the forecast
Unlike the analysis DAGs, download_local_ifs_fc does not use a dedicated SFTPSensor. Instead it checks file existence directly inside download_with_fallback and relies on the task retry mechanism. The 20 retries at 15 minutes each give a 5-hour window.
MAR spectral forecast¶
MAR provides a mesoscale atmospheric/spectral forecast used as surface forcing for the model. The file naming convention uses the format y{YYYY}m{MM}d{DD}:
Spectral_y2025m06d17.nc = forecast valid for 2025-06-17
Both MAR DAGs have a time-based fallback: if the forecast for day T+9 has not appeared by 14:00 UTC (15:00 Brussels winter time), the DAG copies the T+8 file as a placeholder for T+9 and continues.
download_MAR (Lucia)¶
| Property | Value |
|---|---|
| Schedule | 25 09 * * * (09:25 UTC) |
| Retries | 45 x 10 min |
| Source | climato.be (Universite de Liege), port 22 |
| Destination | /gpfs/.../data/MAR/oper/ on Lucia |
| File | LUCIA/download_MAR.py |
The get_script_based_on_time() Python function generates different bash commands depending on the current time in the Europe/Paris timezone:
- Before 14:00 UTC: attempt
scpfromclimato.beto fetch the Spectral file for T+9 days. - After 14:00 UTC: if the T+9 file is missing, copy T+8 as a placeholder. Send an email alert to
luc.vandenbulcke@uliege.be.
download_MAR_local (LOCAL)¶
| Property | Value |
|---|---|
| Schedule | 25 09 * * * (09:25 UTC) |
| Retries | 45 x 10 min |
| Source | climato.be at 139.165.67.19:10000 via SSH + SCP |
| Destination | /opt/airflow/marines_data/backupPU/forcing/MAR/ |
| File | LOCAL/download_MAR.py |
The two tasks are independent (no >> between them). delete_2daysold removes the MAR file from 2 days ago to keep disk usage bounded.
# Uses SSH key from host home directory (mounted into container)
ssh -i /opt/airflow/users_home/luc/.ssh/id_rsa \
-p 10000 lvdbulcke@139.165.67.19 \
"test -f .../Spectral_${today9_str2}.nc"
scp -P 10000 -o IdentityFile=... \
lvdbulcke@139.165.67.19:.../Spectral*.nc \
/opt/airflow/marines_data/backupPU/forcing/MAR/
Same time-based fallback logic as the Lucia version.
CMEMS observations¶
download_cmems_obs (Lucia)¶
| Property | Value |
|---|---|
| Schedule | 00 06,14 * * * (06:00 and 14:00 UTC, twice daily) |
| Retries | 45 x 10 min |
| Source | Copernicus Marine Service (exact datasets not visible in the DAG file) |
| Destination | /gpfs/.../data/Obs/ on Lucia |
| File | LUCIA/download_obs.py |
Runs nohup /gpfs/.../data/Obs/get.sh on the Lucia frontal node.
EFAS river discharge¶
The European Flood Awareness System (EFAS) provides daily river discharge forecasts. The archive is on ECMWF's FTP server at aux.ecmwf.int, user efas_cmbs.
The file format is: efas_dis_aifs_cmbs_{YYYYMMDD}00_member_00.nc
download_efas (Lucia)¶
| Property | Value |
|---|---|
| Schedule | 10 09 * * * (09:10 UTC) |
| Retries | 80 x 10 min |
| Source | FTP aux.ecmwf.int (via get.sh on Lucia) |
| Destination | /gpfs/.../data/Rivers/EFAS/NRT/ |
| File | LUCIA/download_efas.py |
download_efas_local_preprocess (LOCAL)¶
| Property | Value |
|---|---|
| Schedule | 10 09 * * * (09:10 UTC) |
| Retries | 80 x 10 min |
| Source | FTP aux.ecmwf.int (wget from inside Docker) |
| Destination | /opt/airflow/marines_data/data/RIVERS/EFAS/NRT/ |
| File | LOCAL/efas_and_rnf2025.py |
The download step is a BashOperator that:
- Uses
wgetwith FTP credentials to download the daily tarballcmbs{DATE}00.tar. - Extracts only the
*aifs*member file from the tarball. - Moves the tarball to a
tar/subdirectory for archiving.
Then rnf_v2025 runs the Julia preprocessing script:
julia rnf_oper_v2025.jl \
--year {{ get_datepart("%Y") }} \
--month {{ get_datepart("%m") }} \
--day {{ get_datepart("%d") }} \
--days 10
This script converts the EFAS discharge NetCDF into the river runoff forcing format expected by the GCP model, covering 10 days from today.
CAMS atmospheric deposition¶
The Copernicus Atmosphere Monitoring Service (CAMS) provides atmospheric composition forecasts. We use nitrogen deposition fields as atmospheric forcing for the biogeochemical model.
CDS not reachable from Lucia
The CAMS Climate Data Store (CDS) API is blocked from the Lucia network. All CAMS data is downloaded to the marines server by the LOCAL DAG and then pushed to Lucia over SSH. The disabled file LUCIA/download_CAMS.py__cds_not_reachable_from_Lucia documents this constraint.
download_CAMS (LOCAL)¶
| Property | Value |
|---|---|
| Schedule | 30 09 * * * (09:30 UTC) |
| Retries | 87 x 10 min (up to midnight) |
| Source | CAMS Global Atmospheric Composition Forecasts via CDS API |
| Intermediate file | /opt/airflow/marines_data/data/CAMS/tempo.zip |
| Output files | cams_y{Y}m{M}d{D}.nc (today through T+4) |
| Mean forecast | mean_forecast.nc (average of last 5 days) |
| File | LOCAL/download_CAMS.py |
download_cams ─────────────────────────────────────┐
│
release_at_1230 >> extend_cams >> copy_to_lucia ─┘
>> copy_to_docker
The download_cams and release_at_1230 tasks run in parallel from the DAG start at 09:30 UTC. They are not connected by a dependency. The intent is:
download_camsfetches the data and writes per-day NC files. On normal days this finishes within an hour.release_at_1230simply waits until the wall clock reaches 12:30 UTC, ensuring that even if the CDS API is delayed, we wait for the freshest available data.- Once 12:30 UTC is reached,
extend_camsaverages the last 5 daily files intomean_forecast.nc, which serves as a fallback for days T+5 through T+9.
The CDS request downloads 120-hour atmospheric deposition forecasts for:
| CDS variable | Short name | NEMO variable |
|---|---|---|
| Dry deposition of coarse mode nitrate aerosol | aerddpnic |
NOS (nitrogen oxidized) |
| Dry deposition of fine mode nitrate aerosol | aerddpnif |
NOS |
| Wet deposition of coarse/fine mode nitrate (convective + large-scale) | aerwdcnic, aerwdlnic, aerwdcnif, aerwdlnif |
NOS |
| Dry deposition of ammonium aerosol | aerddpam |
NHS (ammonium) |
| Wet deposition of ammonium aerosol (convective + large-scale) | aerwdcam, aerwdlam |
NHS |
Units are converted from kg/m2/s to mmol/m2/s during processing. The spatial domain is [48.75N, 25.5E, 39N, 43.5E] (Black Sea region).
After creating the per-day files, the download_cams task validates that:
- All 5 expected files exist on disk.
- Neither
NOSnorNHScontains NaN values or values above1e6 mmol/m2/s.
If validation fails, all generated files are deleted and the task raises a ValueError to trigger the Airflow retry. This prevents silently writing corrupt data to the forcing directory.
copy_to_lucia uploads files cams_y{Y}m{M}d{D}.nc for days T+0 through T+9 to the Lucia gateway home directory, then SCPs them to the Lucia frontal node:
For days beyond T+4 where no per-day file exists yet, mean_forecast.nc is used as a substitute.
copy_to_docker copies the same files to /opt/airflow/marines_data/backupPU/forcing/CAMS/ for the GCP model. It deletes all previous CAMS files in that directory except yesterday's file and two special static files (bsfs_cams_bilin.nc, deposition.nc).
The CDS API key is read from /opt/airflow/marines_data/cdsapirc.ads (not from Airflow Variables or Connections).
NIHWM river data¶
download_nihwm (Lucia)¶
| Property | Value |
|---|---|
| Schedule | 10 16 * * * (16:10 UTC) |
| Retries | 45 x 10 min |
| Destination | /gpfs/.../data/Rivers/nihwm/ |
| File | LUCIA/download_nihwm.py |
Runs nohup /gpfs/.../data/Rivers/nihwm/get.sh on Lucia. The verification check looks for HF_{today}*.csv.