Declarative lineage

By integrating Sifflet with your Data stack, you automatically get a global vision of your data lineage, from ingestion to consumption.
In some cases, Sifflet cannot connect to your closed environment or doesn't have enough access to retrieve and compute the lineage. For these cases, you have the possibility to add programmatically the edges in your lineage.

Current scope

FeatureAPI
Create/remove* an edge
- between datasets
- between dataset fields
- between other types (dashboards, etc.)❌ (coming soon)
Create/remove an object
- Create/remove a dataset❌ (coming soon)
- Create/remove an orchestrator node linked to a model

*: As of today, you can only remove manually created edges and not automated computed ones

How to create/remove an edge on your lineage

Two steps are required to declare programmatically an edge:

  1. Generate an Access Token
  2. Create/Remove your edges using Sifflet's API

1. Generate an Access Token

You can find more information on how to generate an Access Token here

2. Create/Remove your edge

You can find the API reference here.

a. At dataset level

POST /api/v1/lineage/_create-edge
{
    "downstreamNode":
  	{
      "type": "dataset", 
      "datasourceId": "datasource_UUID",
      "objectName": String
    },
    "upstreamNode":
  	{
      "type": "dataset",
      "datasourceId": "datasource_UUID",
      "objectName": String
    }
}

POST /api/v1/lineage/_remove-edge
{
    "downstreamNode":
  	{
      "type": "dataset", 
      "datasourceId": "datasource_UUID",
      "objectName": String
    },
    "upstreamNode":
  	{
      "type": "dataset",
      "datasourceId": "datasource_UUID",
      "objectName": String
    }
}

Parameters:

  • datasourceId: the Id of the datasource that your object belongs to. You can see below how to find it

  • objectName: the name of your dataset

b. At dataset field level

POST /api/v1/lineage/_create-edge
{
    "downstreamNode":
  	{
      "type": "dataset_field",
      "datasourceId": "datasource_UUID",
      "datasetName": String,
      "objectName": String
    },
    "upstreamNode":
  	{
      "type": "dataset_field",
      "datasourceId": "datasource_UUID",
      "datasetName": String,
      "objectName": String
    }
}

POST /api/v1/lineage/_remove-edge
{
    "downstreamNode":
  	{
      "type": "dataset_field",
      "datasourceId": "datasource_UUID",
      "datasetName": String,
      "objectName": String
    },
    "upstreamNode":
  	{
      "type": "dataset_field",
      "datasourceId": "datasource_UUID",
      "datasetName": String,
      "objectName": String
    }
}

Parameters:

  • datasourceId: the Id of the datasource that your object belongs to. You can see below how to find it

  • datasetName: the name of the dataset your field belongs to

  • objectName: the name of your field

📘

How to find the datasourceId of your object

You have several options to find the datasourceId on Sifflet:

  1. On the asset page, you can see directly the source ID of the source the asset belongs to. You can click on the button next to it to directly copy it
  2. On the source page directly, you can find the source ID in the URL. It should be something similar to https://<name>.siffletdata.com/integrations/sources/source/<datasourceId>/overview

3. Scripts and Examples

Please find below examples in Python and Bash to declare edges on your lineage

Python

Requirements: pip install requests

import requests
import json

tenant = "tenant_name" # change me
token = "your_token" # change with token created at step 1

url = f"https://{tenant}api.siffletdata.com/api/v1/lineages/_create-edge"

payload = json.dumps(
  {
    "downstreamNode":
		{
				"type": "dataset",
				"datasourceId": "datasource_UUID", # change me
        "objectName": String, # change me
    },
    "upstreamNode":
    {
				"type": "dataset",
				"datasourceId": "datasource_UUID", # change me
        "objectName": String, # change me
    }
})
headers = {
  'Authorization': f'Bearer {token}',
  'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)

if response.status_code != 204:
    print("Error using the API")
    print(response.text)
else:
    print("Successfully pushed lineage")

Bash

TENANT="my_tenant" # change to your Sifflet deployment
TOKEN="my_token" # change with token created at step 1

BASE_URL="https://${TENANT}api.siffletdata.com"

curl --location "$BASE_URL/api/v1/lineages/_create-edge" \
--header "Authorization: Bearer $TOKEN" \
--header "Content-Type: application/json" \
--data '{
    "downstreamNode":
		{
				"type": "dataset",
				"datasourceId": "datasource_UUID", # change me
        "objectName": String, # change me
    },
    "upstreamNode":
    {
				"type": "dataset"
				"datasourceId": "datasource_UUID", # change me
        "objectName": String, # change me
    }
}'

Examples

Example of manual edges at both dataset and dataset field level

Example of manual edges at both dataset and dataset field level

How to create/remove an orchestrator node on your lineage

One common case is to get visibility on which Airflow DAG triggers which dbt models. This allows you to:

  • have a detailed view of your data pipeline: orchestrator + transformation + data assets
  • In case of a data anomaly alert, accelerate the investigation time of the root cause issue

As of today, you can link in your lineage any orchestrator information to your dbt models. If you need to link other information, do not hesitate to reach out. This page is constantly updated with our feature coverage.

Declarative lineage scopedbt modelOther transformation node
Airflow DAG
Other

Two steps are required to declare programmatically an orchestrator node:

  1. Generate an Access Token
  2. Add/Remove your nodes and links

1. Generate an Access Token

You can find more information on how to generate an Access Token here

2. Add/Remove your node

You can find the API reference here.


POST /lineage/_create
{
	"linkType": "pipeline",
    "automationPlatform": {
        "type": "airflow",
        "metadata": {
            "pipelineName": "orchestrator_object_name"
        }
    },
    "job": {
        "type": "dbt",
        "metadata": {
            "project": project,
            "model": model,
            "target": target
        }
    }
}


POST /lineage/_remove
{
	"linkType": "pipeline",
    "automationPlatform": {
        "type": "airflow",
        "metadata": {
            "pipelineName": "orchestrator_object_name"
        }
    },
    "job": {
        "type": "dbt",
        "metadata": {
            "project": project,
            "model": model,
            "target": target
        }
    }
}

Parameters:

  • linkType: pipeline

    • type: airflow or other (for instance, a Prefect Flow)
    • metadata/pipelineName: Name of your orchestrator node. It will appear on Sifflet's lineage with this name.
  • job

    • type: dbt
    • metadata
      • project: your dbt project
      • target: your dbt target
      • model (optional). The dbt model triggered by your orchestrator.
        If your dbt node is not specified, Sifflet will link the node to all models within the specified dbt project and target

3. Scripts and Examples

Please find below examples in Python and Bash to declare orchestrators nodes and link them to dbt models.

Python

Please find below an example of how to link your Airflow DAG to your dbt models.

Requirements: pip install requests

import requests
import json

tenant = "tenant_name" # change me
token = "your_token" # change with token created at step 1

url = f"https://{tenant}api.siffletdata.com/api/v1/lineages/_create"

payload = json.dumps({
  "linkType": "pipeline",
  "automationPlatform": {
    "type": "airflow",
    "metadata": {
      "pipelineName": "dag_1" # change me
    }
  },
  "job": {
    "type": "dbt",
    "metadata": {
      "project": "my_dbt_project", # change me
      "model": "my_dbt_model", # change me
      "target": "my_dbt_target" # change me
    }
  }
})
headers = {
  'Authorization': f'Bearer {token}',
  'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)

if response.status_code != 204:
    print("Error using the API")
    print(response.text)
else:
    print("Successfully pushed lineage")

Bash

Please find below an example of how to link your Prefect Flow to your dbt models.

TENANT="my_tenant" # change to your Sifflet deployment
TOKEN="my_token" # change with token created at step 1

BASE_URL="https://${TENANT}api.siffletdata.com"

curl --location "$BASE_URL/api/v1/lineages/_create" \
--header "Authorization: Bearer $TOKEN" \
--header "Content-Type: application/json" \
--data '{
	"linkType": "pipeline",
    "automationPlatform": {
        "type": "other",
        "metadata": {
            "pipelineName": "name_CHANGE_ME"
        }
    },
    "job": {
        "type": "dbt",
        "metadata": {
            "project": "my_dbt_project_CHANGE_ME",
            "model": "my_dbt_model_CHANGE_ME",
            "target": "my_dbt_target_CHANGE_ME"
        }
    }
}'

Examples

Example in the case of Airflow with dbt

Example in the case of Airflow with dbt

Example in the case of any other Orchestrator with dbt

Example in the case of any other Orchestrator with dbt