Linking Airflow DAGs to Data Assets
Connecting your Airflow DAGs to the data assets they manage is a crucial step for achieving end-to-end data observability. This link enriches your data lineage, providing a clear picture of how your assets are created and updated. It also accelerates incident resolution by immediately showing which pipelines might be responsible for a data quality issue.
This guide explains how to use query tags to automatically link your Airflow DAGs to assets in Sifflet.
Feature availability
Linking DAGs to data assets is currently available for BigQuery and Snowflake. You can reach out to your Sifflet customer success team regarding supporting other data platforms.
How It Works ⚙️
During source refreshes, Sifflet scans the query history from your sources (like Snowflake, BigQuery, or Redshift). When a query modifies an asset (e.g., via CREATE TABLE
, INSERT
, MERGE
), Sifflet inspects the query's metadata for a specific tag.
If a query contains the tag airflow_dag_id:<YOUR_DAG_ID>
, Sifflet automatically creates a relationship between the asset being modified and the specified Airflow DAG in the Sifflet catalog.
How to Link Your DAGs
Follow these steps to configure your Airflow DAGs to send the necessary metadata to Sifflet.
1. Identify the Target DAG and Queries
First, identify which queries within your DAG are responsible for creating or updating the data assets you want to link. This is typically any SQL statement that modifies a table, such as an INSERT
, UPDATE
, MERGE
, or CREATE TABLE AS SELECT
(CTAS).
2. Add the Sifflet Query Tag
For each of those queries, you need to add a metadata tag. The required format is:
airflow_dag_id:<DAG_ID>
It's critical that <DAG_ID>
is an exact match for the dag_id
of your DAG in Airflow.
Most Airflow operators that execute SQL have a parameter for setting query metadata. For example, in Snowflake you can use the query_tag
session parameter, while in BigQuery you might set job labels.
Example: SnowflakeOperator
If you're using Airflow's SnowflakeOperator
, you can pass the tag via the parameters
argument. This injects a SET
command before your main SQL runs.
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.models.dag import DAG
import datetime
with DAG(
dag_id='daily_sales_aggregation',
start_date=datetime.datetime(2025, 1, 1),
schedule_interval='@daily',
catchup=False,
) as dag:
aggregate_sales_data = SnowflakeOperator(
task_id='run_daily_aggregation',
sql='sql/aggregate_sales.sql',
snowflake_conn_id='your_snowflake_conn',
# 👇 This is where you add the Sifflet tag!
parameters={
"query_tag": "airflow_dag_id:daily_sales_aggregation"
}
)
3. Run the DAG
Trigger your DAG in Airflow. Once the tagged query successfully executes against your data warehouse, Sifflet will process it during its next scan of the query history.
4. Verify the Link in Sifflet
After both sources (Airflow and your data platform) are refreshed, you can verify the connection in the Sifflet UI:
-
Asset Page: Navigate to the asset that your query updated (e.g., the
daily_sales_summary
table). You'll find a new field called "Orchestrator" in the overview panel linking to thedaily_sales_aggregation
DAG. -
Lineage: Open the lineage graph for the asset. You will now see the Airflow DAG appearing on top of the asset node, clearly showing that it generates or updates this asset.
The Airflow DAG in the lineage graph
Airflow DAG status
As part of the Airflow node in the lineage, the DAG status appears next to the Airflow logo.
Troubleshooting and Best Practices
- Exact Match Required: Ensure the
<DAG_ID>
in the tag is identical to your Airflowdag_id
. Typos are the most common reason for links not appearing. - DAG Not Appearing?: If the link doesn't show up, please check the following:
- Has the DAG run at least once after you added the tag?
- Did the task containing the tagged query run successfully?
- Was Sifflet able to ingest the latest query history from your data platform source?
- Dynamic Task IDs: If you're using dynamically generated DAGs or tasks, make sure your code correctly passes the parent
dag_id
into the query tag. - Permissions: The service account Sifflet uses to connect to your data platform must have sufficient permissions to read the query or job history.
Updated 2 days ago