Flow stopper

While following the journey of your data throughout your pipelines, checking its quality at every step should be a given. Even more, being able to spot proactively an issue before moving the data further down the pipeline should be a requirement.
For this purpose, Sifflet's flow stopper can be leveraged.
It can be implemented in a few steps:

  1. Define the quality rules for the data to be monitored
  2. Adapt your data pipeline: your orchestrator can trigger these rules after each transformation and check how reliable the data is. In case an anomaly is detected, the pipeline could then be proactively paused stopping the bad data to propagate.

You can find some examples on how to leverage Sifflet's flow stopper in Airflow here

1- Define the quality rules

You can find all the available data quality rules currently available here.
Once done, the next part shows how to configure your data pipeline accordingly.

2- Configure your data pipeline

There are several options to insert Sifflet's flow stopper in your orchestrator:

  • If you use Airflow as your orchestrator, you can leverage the SiffletRunRuleOperator operator
  • For any other orchestrator, you can directly use Sifflet's CLI

Airflow SiffletRunRuleOperator operator

Airflow connection setup

In order to allow your Airflow instance to connect to Sifflet, the following steps are required:

  • add the following package in your requirements.txt : airflow-provider-sifflet
  • add the Sifflet connection in Airflow. You can find more details on Airflow's docs:
    • In the Airflow user interface, you can configure a Connection for Sifflet in Admin -> Connections -> Add a new record.
    • In the "Add Connection" screen, you can fill the following parameters:
      • Connection Id: sifflet_default
      • Connection Type: Sifflet
      • Sifflet Tenant: <your_tenant_name>
      • Sifflet Token: <your_sifflet_access_token>

<your_tenant_name>: if you access to Sifflet with https://abcdef.siffletdata.com, then your tenant would be abcdef
<your_sifflet_access_token>: you can find more information on how to generate it here

You will then be able to use Sifflet's custom operator in your DAGs.

28482848

Add Sifflet connection in Airflow

SiffletRunRuleOperator operator parameters

The operator takes as parameters:

  • task_id: unique identifier of the task
  • rule_ids: list of one or several rule Ids. For more details on how to find the rule_ids, please refer to this page
  • error_on_rule_fail (optional): default value is true. If the rule fails, the task will fail as well. You can choose to set the parameter to false for less critical rules.
sifflet_rule = SiffletRunRuleOperator(
  task_id="sifflet_rule",
  rule_ids=["<rule_id_1>","<rule_id_2>"],
)

Using the CLI

For any other orchestrator, you can directly use Sifflet's CLI to trigger rules programmatically.
In order to setup Sifflet's CLI, you can refer to this page.
You can then run the command sifflet rules run to trigger and test rules.
For more details, please refer to the CLI reference.

# Run one or many rules
sifflet rules run --id <rule_id>
sifflet rules run --id <rule_id_1> --id <rule_id_2> --id <rule_id_3>

📘

Finding the rule id(s)

You have several means to find the id of your rule:

  • using the CLI sifflet rules list
  • on Sifflet's UI, from the URL on the page while on your rule. For instance, if your URL is https://<tenant_name>.siffletdata.com/monitoring/rule/e77ed92f-f248-418a-a4c6-a3de1e3aacf7/overview?name=%20Completeness%20Rule, then the rule_id would be e77ed92f-f248-418a-a4c6-a3de1e3aacf7

Did this page help you?