Please find below several use cases where Sifflet's flow stopper can be useful in Airflow:

  • a first simple example with dbt
  • a second example where the integration allows to prevent any data quality issues to propagate through the pipelines and impact the dashboards

First example: simple dbt pipeline

A simple example where the data quality check is done in parallel of the dbt tests.
You can find more information about SiffletRunRuleOperator on Airflow custom operators

21342134

Example of DAG with a task "sifflet_rule" calling SiffletRunRuleOperator

from datetime import datetime

from airflow.decorators import dag
from airflow.models.baseoperator import chain
from airflow.operators.bash import BashOperator
from sifflet.operators.ingest.dbt import SiffletDbtIngestOperator
from sifflet.operators.quality.rule import SiffletRunRuleOperator

DBT_PROFILE_DIR = "<DBT_PROFILE_DIR>"
DBT_PROJ_DIR = "<DBT_PROJ_DIR>"


@dag(
    schedule_interval="@daily",
    start_date=datetime(2022, 1, 1),
    catchup=False,
    tags=["sifflet_monitored"],
)
def simple_dag_sifflet_run_rule():
    dbt_run = BashOperator(
        task_id="dbt_run", bash_command=f"cd {DBT_PROJ_DIR} && dbt run --profiles-dir {DBT_PROFILE_DIR} --target dev"
    )

    sifflet_rule = SiffletRunRuleOperator(
        task_id="sifflet_rule",
        rule_ids=[
            "3e2e2687-cd20-11ec-b38b-06bb20181849",
            "3e19eb3e-cd20-11ec-b38b-06bb20181849",
            "3e1a86f1-cd20-11ec-b38b-06bb20181849",
            "3e2e1fc3-cd20-11ec-b38b-06bb20181849",
        ],
    )

    dbt_test = BashOperator(
        task_id="dbt_test",
        trigger_rule="all_done",
        bash_command=f"cd {DBT_PROJ_DIR} && dbt test --profiles-dir {DBT_PROFILE_DIR}",
    )


    chain(dbt_run, (sifflet_rule, dbt_test))


example_dag_sifflet = simple_dag_sifflet_run_rule()

Second example: SiffletRunRuleOperator with SiffletDbtIngestOperator

The example below is another typical use case where using Sifflet's flow stopper prevents any bad quality to propagate in the pipelines.
Here, there are two objectives:

  • check at each step of the transformation the data quality
  • send the dbt artifacts to leverage Sifflet's integration with dbt Core
    For this purpose, the additional tasks are calling SiffletRunRuleOperator and SiffletDbtIngestOperator. You can find more information on these custom operators on Airflow custom operators
26322632

Example of a DAG with tasks checking the data quality at each step (staging, prod)

Some additional remarks:

  • As seen above, each task is ran in parallel to dbt tests
  • If certain data quality rules are deemed not critical, you can set the parameter error_on_rule_fail to false in order to not interrupt the pipeline
  • The task siffle_dbt_ingest (calling operatorSiffletDbtIngestOperator) allows to send the dbt artifacts to Sifflet. It figures at the very end of the DAG, accordingly to our recommendation (more details dbt Core on how to integrate dbt Core)
from datetime import datetime
from typing import List

from airflow.decorators import dag
from airflow.models.baseoperator import chain
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from sifflet.operators.ingest.dbt import SiffletDbtIngestOperator
from sifflet.operators.quality.rule import SiffletRunRuleOperator

DBT_PROFILE_DIR = "<DBT_PROFILE_DIR>"
DBT_PROJ_DIR = "<DBT_PROJ_DIR>"


def branch_dbt_sifflet(model_name: str, rule_ids: List[str]):
    dbt_run = BashOperator(
        task_id=f"dbt_run_{model_name.strip('+')}",
        bash_command=f"cd {DBT_PROJ_DIR} && dbt run --models {model_name} --profiles-dir {DBT_PROFILE_DIR} --target dev",
    )
    dbt_test = BashOperator(
        task_id=f"dbt_test_{model_name.strip('+')}",
        bash_command=f"cd {DBT_PROJ_DIR} && dbt test --models {model_name} --profiles-dir {DBT_PROFILE_DIR} --target dev",
    )

    sifflet_rule = SiffletRunRuleOperator(
        task_id=f"sifflet_run_rule_{model_name.strip('+')}",
        rule_ids=rule_ids,
    )

    dbt_run >> (dbt_test, sifflet_rule)
    return dbt_run, (dbt_test, sifflet_rule)


@dag(
    schedule_interval="@daily",
    start_date=datetime(2022, 1, 1),
    catchup=False,
    tags=["sifflet_monitored"],
)
def advanced_dag_sifflet_simpler():


    dbt_doc_generate = BashOperator(
        task_id="dbt_docs_generate",
        trigger_rule="all_done",
        bash_command=f"cd {DBT_PROJ_DIR} && dbt docs generate --profiles-dir {DBT_PROFILE_DIR}",
    )
    dbt_test_all = BashOperator(
        task_id="dbt_test_all",
        trigger_rule="all_done",
        bash_command=f"cd {DBT_PROJ_DIR} && dbt test --profiles-dir {DBT_PROFILE_DIR}",
    )

    sifflet_dbt_ingest = SiffletDbtIngestOperator(
        task_id="sifflet_dbt_ingest",
        input_folder=DBT_PROJ_DIR,
        target="dev",
        project_name="dbt_test_project_snowflake",
    )

    run_stg_customers, tests_done_stg_customers = branch_dbt_sifflet(
        "+stg_customers", rule_ids=["3e2e2687-cd20-11ec-b38b-06bb20181849", "3e19eb3e-cd20-11ec-b38b-06bb20181849"]
    )

    run_customers, tests_done_customers = branch_dbt_sifflet(
        "customers", rule_ids=["3e2e2687-cd20-11ec-b38b-06bb20181849", "3e19eb3e-cd20-11ec-b38b-06bb20181849"]
    )

    chain(
        tests_done_stg_customers,
        run_customers,
    )
    chain(tests_done_customers, dbt_doc_generate)
    chain(
        dbt_doc_generate,
        dbt_test_all,
        sifflet_dbt_ingest,
    )


advanced_dag_sifflet_simpler = advanced_dag_sifflet_simpler()