Examples
Please find below several use cases where Sifflet's flow stopper can be useful in Airflow:
- a first simple example with dbt
- a second example where the integration allows to prevent any data quality issues to propagate through the pipelines and impact the dashboards
First example: simple dbt pipeline
A simple example where the data quality check is done in parallel of the dbt tests.
You can find more information about SiffletRunRuleOperator
on Airflow custom operators
from datetime import datetime
from airflow.decorators import dag
from airflow.models.baseoperator import chain
from airflow.operators.bash import BashOperator
from sifflet_provider.operators.ingest.dbt import SiffletDbtIngestOperator
from sifflet_provider.operators.quality.rule import SiffletRunRuleOperator
DBT_PROFILE_DIR = "<DBT_PROFILE_DIR>"
DBT_PROJ_DIR = "<DBT_PROJ_DIR>"
@dag(
schedule_interval="@daily",
start_date=datetime(2022, 1, 1),
catchup=False,
tags=["sifflet_monitored"],
)
def simple_dag_sifflet_run_rule():
dbt_run = BashOperator(
task_id="dbt_run", bash_command=f"cd {DBT_PROJ_DIR} && dbt run --profiles-dir {DBT_PROFILE_DIR} --target dev"
)
sifflet_rule = SiffletRunRuleOperator(
task_id="sifflet_rule",
rule_ids=[
"3e2e2687-cd20-11ec-b38b-06bb20181849",
"3e19eb3e-cd20-11ec-b38b-06bb20181849",
"3e1a86f1-cd20-11ec-b38b-06bb20181849",
"3e2e1fc3-cd20-11ec-b38b-06bb20181849",
],
)
dbt_test = BashOperator(
task_id="dbt_test",
trigger_rule="all_done",
bash_command=f"cd {DBT_PROJ_DIR} && dbt test --profiles-dir {DBT_PROFILE_DIR}",
)
chain(dbt_run, (sifflet_rule, dbt_test))
example_dag_sifflet = simple_dag_sifflet_run_rule()
Second example: SiffletRunRuleOperator with SiffletDbtIngestOperator
The example below is another typical use case where using Sifflet's flow stopper prevents any bad quality to propagate in the pipelines.
Here, there are two objectives:
- check at each step of the transformation the data quality
- send the dbt artifacts to leverage Sifflet's integration with dbt Core
For this purpose, the additional tasks are callingSiffletRunRuleOperator
andSiffletDbtIngestOperator
. You can find more information on these custom operators on Airflow custom operators
Some additional remarks:
- As seen above, each task is ran in parallel to dbt tests
- If certain data quality rules are deemed not critical, you can set the parameter
error_on_rule_fail
tofalse
in order to not interrupt the pipeline - The task
siffle_dbt_ingest
(calling operatorSiffletDbtIngestOperator
) allows to send the dbt artifacts to Sifflet. It figures at the very end of the DAG, accordingly to our recommendation (more details dbt Core on how to integrate dbt Core)
from datetime import datetime
from typing import List
from airflow.decorators import dag
from airflow.models.baseoperator import chain
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from sifflet_provider.operators.ingest.dbt import SiffletDbtIngestOperator
from sifflet_provider.operators.quality.rule import SiffletRunRuleOperator
DBT_PROFILE_DIR = "<DBT_PROFILE_DIR>"
DBT_PROJ_DIR = "<DBT_PROJ_DIR>"
def branch_dbt_sifflet(model_name: str, rule_ids: List[str]):
dbt_run = BashOperator(
task_id=f"dbt_run_{model_name.strip('+')}",
bash_command=f"cd {DBT_PROJ_DIR} && dbt run --models {model_name} --profiles-dir {DBT_PROFILE_DIR} --target dev",
)
dbt_test = BashOperator(
task_id=f"dbt_test_{model_name.strip('+')}",
bash_command=f"cd {DBT_PROJ_DIR} && dbt test --models {model_name} --profiles-dir {DBT_PROFILE_DIR} --target dev",
)
sifflet_rule = SiffletRunRuleOperator(
task_id=f"sifflet_run_rule_{model_name.strip('+')}",
rule_ids=rule_ids,
)
dbt_run >> (dbt_test, sifflet_rule)
return dbt_run, (dbt_test, sifflet_rule)
@dag(
schedule_interval="@daily",
start_date=datetime(2022, 1, 1),
catchup=False,
tags=["sifflet_monitored"],
)
def advanced_dag_sifflet_simpler():
dbt_doc_generate = BashOperator(
task_id="dbt_docs_generate",
trigger_rule="all_done",
bash_command=f"cd {DBT_PROJ_DIR} && dbt docs generate --profiles-dir {DBT_PROFILE_DIR}",
)
dbt_test_all = BashOperator(
task_id="dbt_test_all",
trigger_rule="all_done",
bash_command=f"cd {DBT_PROJ_DIR} && dbt test --profiles-dir {DBT_PROFILE_DIR}",
)
sifflet_dbt_ingest = SiffletDbtIngestOperator(
task_id="sifflet_dbt_ingest",
input_folder=DBT_PROJ_DIR,
target="dev",
project_name="dbt_test_project_snowflake",
)
run_stg_customers, tests_done_stg_customers = branch_dbt_sifflet(
"+stg_customers", rule_ids=["3e2e2687-cd20-11ec-b38b-06bb20181849", "3e19eb3e-cd20-11ec-b38b-06bb20181849"]
)
run_customers, tests_done_customers = branch_dbt_sifflet(
"customers", rule_ids=["3e2e2687-cd20-11ec-b38b-06bb20181849", "3e19eb3e-cd20-11ec-b38b-06bb20181849"]
)
chain(
tests_done_stg_customers,
run_customers,
)
chain(tests_done_customers, dbt_doc_generate)
chain(
dbt_doc_generate,
dbt_test_all,
sifflet_dbt_ingest,
)
advanced_dag_sifflet_simpler = advanced_dag_sifflet_simpler()
Updated about 1 year ago