Skip to content

DAG Reference

This section documents every DAG in production. DAGs are organized by function. All schedule_interval values are UTC cron expressions.


Where DAG files live

dags/
├── LUCIA/          # Lucia-side DAGs (using SSHOperator)
├── LOCAL/          # Docker-side DAGs (run inside the worker container)
├── GoogleCloud/    # GCP backup run DAG
└── check_MDS.py    # count_files_MDS (monitoring DAG, at root level)

Full pipeline timeline

The diagram below maps every DAG onto the UTC day. Solid arrows are Airflow-enforced dependencies (file polling via SSHOperator or BashOperator retries, Airflow Variable reads, PythonSensor). Dashed arrows are pure data dependencies where one DAG writes files that another later reads, with no Airflow mechanism enforcing the ordering.

Color key: blue = Lucia pipeline, green = local pipeline, amber = GCP backup, purple = monitoring.

The download_CAMS DAG appears twice because it contains an internal TimeSensor that holds execution at 12h30 before extending the forecast and copying files to Lucia and to the GCP backup working directory.

graph TD
    subgraph t06["06h00"]
        OBS1["download_cmems_obs (run 1)"]
    end

    subgraph t0730["07h30"]
        LIAN["download_local_ifs_analysis"]
        LIAN00["download_local_ifs_an00"]
        LIFC["download_local_ifs_fc"]
    end

    subgraph t0800["08h00"]
        IAN["download_ifs_an"]
        IAN00["download_ifs_an00"]
        IFC["download_ifs_fc"]
    end

    subgraph t0815["08h15"]
        PIFS["ifs_process"]
        LPIFS["ifs_process_local"]
    end

    subgraph t09["09h10 to 09h30"]
        EFAS["download_efas"]
        LEFAS["download_efas_local_preprocess"]
        MAR["download_MAR"]
        LMAR["download_MAR_local"]
        GCP["GoogleCloud_backup_run (manual)"]
        CAMS_DL["download_CAMS (download)"]
    end

    subgraph t1030["10h30"]
        RUN["model_lucia_run"]
    end

    subgraph t1230["12h30"]
        CAMS_EXT["download_CAMS (extend + copy)"]
    end

    subgraph t1400["14h00"]
        POST["model_lucia_postprocess"]
        OBS2["download_cmems_obs (run 2)"]
    end

    subgraph t1545["15h45"]
        COMP["compress_results"]
    end

    subgraph t1600["16h00 to 16h10"]
        MDS["count_files_MDS"]
        NIHWM["download_nihwm"]
        LNIHWM["download_nihwm_local"]
    end

    IAN --> PIFS
    IAN00 --> PIFS
    IFC --> PIFS

    PIFS -->|"polls ecmwf file on Lucia"| RUN
    MAR -->|"polls MAR file on Lucia"| RUN
    RUN -->|"Variable: submitted_job"| POST
    POST -->|"PythonSensor: ocean.output"| COMP

    LIAN --> LPIFS
    LIAN00 --> LPIFS
    LIFC --> LPIFS

    CAMS_DL -.->|"TimeSensor: hold until 12h30"| CAMS_EXT
    CAMS_EXT -.->|"copy to backupPU/forcing/CAMS"| GCP

    LPIFS -.->|"IFS forcings"| GCP
    LMAR -.->|"MAR forcing"| GCP
    LEFAS -.->|"EFAS forcing"| GCP

    classDef lucia fill:#dbeafe,stroke:#2563eb,color:#1e3a8a
    classDef local fill:#dcfce7,stroke:#16a34a,color:#14532d
    classDef gcp fill:#fef3c7,stroke:#d97706,color:#78350f
    classDef monitor fill:#f3e8ff,stroke:#9333ea,color:#581c87

    class OBS1,OBS2,IAN,IAN00,IFC,PIFS,MAR,EFAS,RUN,POST,NIHWM lucia
    class LIAN,LIAN00,LIFC,LPIFS,LMAR,LEFAS,CAMS_DL,CAMS_EXT,COMP,LNIHWM local
    class GCP gcp
    class MDS monitor