Linking Airflow DAGs to Data Assets

Connecting your Airflow DAGs to the data assets they manage is a crucial step for achieving end-to-end data observability. This link enriches your data lineage, providing a clear picture of how your assets are created and updated. It also accelerates incident resolution by immediately showing which pipelines might be responsible for a data quality issue.

This guide explains how to use query tags to automatically link your Airflow DAGs to assets in Sifflet.

📘

Feature availability

Linking DAGs to data assets is currently available for BigQuery and Snowflake. You can reach out to your Sifflet customer success team regarding supporting other data platforms.

How It Works ⚙️

During source refreshes, Sifflet scans the query history from your sources (like Snowflake, BigQuery, or Redshift). When a query modifies an asset (e.g., via CREATE TABLE, INSERT, MERGE), Sifflet inspects the query's metadata for a specific tag.

If a query contains the tag airflow_dag_id:<YOUR_DAG_ID>, Sifflet automatically creates a relationship between the asset being modified and the specified Airflow DAG in the Sifflet catalog.

How to Link Your DAGs

Follow these steps to configure your Airflow DAGs to send the necessary metadata to Sifflet.

1. Identify the Target DAG and Queries

First, identify which queries within your DAG are responsible for creating or updating the data assets you want to link. This is typically any SQL statement that modifies a table, such as an INSERT, UPDATE, MERGE, or CREATE TABLE AS SELECT (CTAS).

2. Add the Sifflet Query Tag

For each of those queries, you need to add a metadata tag. The required format is:

airflow_dag_id:<DAG_ID>

It's critical that <DAG_ID> is an exact match for the dag_id of your DAG in Airflow.

Most Airflow operators that execute SQL have a parameter for setting query metadata. For example, in Snowflake you can use the query_tag session parameter, while in BigQuery you might set job labels.

Example: SnowflakeOperator

If you're using Airflow's SnowflakeOperator, you can pass the tag via the parameters argument. This injects a SET command before your main SQL runs.

from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.models.dag import DAG
import datetime

with DAG(
    dag_id='daily_sales_aggregation',
    start_date=datetime.datetime(2025, 1, 1),
    schedule_interval='@daily',
    catchup=False,
) as dag:
    aggregate_sales_data = SnowflakeOperator(
        task_id='run_daily_aggregation',
        sql='sql/aggregate_sales.sql',
        snowflake_conn_id='your_snowflake_conn',
        
        # 👇 This is where you add the Sifflet tag!
        parameters={
            "query_tag": "airflow_dag_id:daily_sales_aggregation"
        }
    )

3. Run the DAG

Trigger your DAG in Airflow. Once the tagged query successfully executes against your data warehouse, Sifflet will process it during its next scan of the query history.

4. Verify the Link in Sifflet

After both sources (Airflow and your data platform) are refreshed, you can verify the connection in the Sifflet UI:

  • Asset Page: Navigate to the asset that your query updated (e.g., the daily_sales_summary table). You'll find a new field called "Orchestrator" in the overview panel linking to the daily_sales_aggregation DAG.

  • Lineage: Open the lineage graph for the asset. You will now see the Airflow DAG appearing on top of the asset node, clearly showing that it generates or updates this asset.

    The Airflow DAG in the lineage graph

    The Airflow DAG in the lineage graph

    📘

    Airflow DAG status

    As part of the Airflow node in the lineage, the DAG status appears next to the Airflow logo.


Troubleshooting and Best Practices

  • Exact Match Required: Ensure the <DAG_ID> in the tag is identical to your Airflow dag_id. Typos are the most common reason for links not appearing.
  • DAG Not Appearing?: If the link doesn't show up, please check the following:
    • Has the DAG run at least once after you added the tag?
    • Did the task containing the tagged query run successfully?
    • Was Sifflet able to ingest the latest query history from your data platform source?
  • Dynamic Task IDs: If you're using dynamically generated DAGs or tasks, make sure your code correctly passes the parent dag_id into the query tag.
  • Permissions: The service account Sifflet uses to connect to your data platform must have sufficient permissions to read the query or job history.