Declarative lineage
By integrating Sifflet with your Data stack, you automatically get a global vision of your data lineage, from ingestion to consumption.
In some cases, Sifflet cannot connect to your closed environment or doesn't have enough access to retrieve and compute the lineage. For these cases, you have the possibility to add programmatically the edges in your lineage.
Current scope
Feature | API |
---|---|
Create/remove* an edge | |
- between datasets | ✅ |
- between dataset fields | ✅ |
- between other types (dashboards, etc.) | ❌ (coming soon) |
Create/remove an object | |
- Create/remove a dataset | ❌ (coming soon) |
- Create/remove an orchestrator node linked to a model | ✅ |
*: As of today, you can only remove manually created edges and not automated computed ones
How to create/remove an edge on your lineage
Two steps are required to declare programmatically an edge:
- Generate an Access Token
- Create/Remove your edges using Sifflet's API
1. Generate an Access Token
You can find more information on how to generate an Access Token here
2. Create/Remove your edge
You can find the API reference here.
a. At dataset level
POST /api/v1/lineage/_create-edge
{
"downstreamNode":
{
"type": "dataset",
"datasourceId": "datasource_UUID",
"objectName": String
},
"upstreamNode":
{
"type": "dataset",
"datasourceId": "datasource_UUID",
"objectName": String
}
}
POST /api/v1/lineage/_remove-edge
{
"downstreamNode":
{
"type": "dataset",
"datasourceId": "datasource_UUID",
"objectName": String
},
"upstreamNode":
{
"type": "dataset",
"datasourceId": "datasource_UUID",
"objectName": String
}
}
Parameters:
-
datasourceId
: the Id of the datasource that your object belongs to. You can see below how to find it -
objectName
: the name of your dataset
b. At dataset field level
POST /api/v1/lineage/_create-edge
{
"downstreamNode":
{
"type": "dataset_field",
"datasourceId": "datasource_UUID",
"datasetName": String,
"objectName": String
},
"upstreamNode":
{
"type": "dataset_field",
"datasourceId": "datasource_UUID",
"datasetName": String,
"objectName": String
}
}
POST /api/v1/lineage/_remove-edge
{
"downstreamNode":
{
"type": "dataset_field",
"datasourceId": "datasource_UUID",
"datasetName": String,
"objectName": String
},
"upstreamNode":
{
"type": "dataset_field",
"datasourceId": "datasource_UUID",
"datasetName": String,
"objectName": String
}
}
Parameters:
-
datasourceId
: the Id of the datasource that your object belongs to. You can see below how to find it -
datasetName
: the name of the dataset your field belongs to -
objectName
: the name of your field
How to find the datasourceId of your object
You have several options to find the datasourceId on Sifflet:
- On the asset page, you can see directly the
source ID
of the source the asset belongs to. You can click on the button next to it to directly copy it
- On the source page directly, you can find the
source ID
in the URL. It should be something similar tohttps://<name>.siffletdata.com/integrations/sources/source/<datasourceId>/overview
3. Scripts and Examples
Please find below examples in Python and Bash to declare edges on your lineage
Python
Requirements: pip install requests
import requests
import json
tenant = "tenant_name" # change me
token = "your_token" # change with token created at step 1
url = f"https://{tenant}api.siffletdata.com/api/v1/lineages/_create-edge"
payload = json.dumps(
{
"downstreamNode":
{
"type": "dataset",
"datasourceId": "datasource_UUID", # change me
"objectName": String, # change me
},
"upstreamNode":
{
"type": "dataset",
"datasourceId": "datasource_UUID", # change me
"objectName": String, # change me
}
})
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
if response.status_code != 204:
print("Error using the API")
print(response.text)
else:
print("Successfully pushed lineage")
Bash
TENANT="my_tenant" # change to your Sifflet deployment
TOKEN="my_token" # change with token created at step 1
BASE_URL="https://${TENANT}api.siffletdata.com"
curl --location "$BASE_URL/api/v1/lineages/_create-edge" \
--header "Authorization: Bearer $TOKEN" \
--header "Content-Type: application/json" \
--data '{
"downstreamNode":
{
"type": "dataset",
"datasourceId": "datasource_UUID", # change me
"objectName": String, # change me
},
"upstreamNode":
{
"type": "dataset"
"datasourceId": "datasource_UUID", # change me
"objectName": String, # change me
}
}'
Examples
How to create/remove an orchestrator node on your lineage
One common case is to get visibility on which Airflow DAG triggers which dbt models. This allows you to:
- have a detailed view of your data pipeline: orchestrator + transformation + data assets
- In case of a data anomaly alert, accelerate the investigation time of the root cause issue
As of today, you can link in your lineage any orchestrator information to your dbt models. If you need to link other information, do not hesitate to reach out. This page is constantly updated with our feature coverage.
Declarative lineage scope | dbt model | Other transformation node |
---|---|---|
Airflow DAG | ✅ | ❌ |
Other | ✅ | ❌ |
Two steps are required to declare programmatically an orchestrator node:
- Generate an Access Token
- Add/Remove your nodes and links
1. Generate an Access Token
You can find more information on how to generate an Access Token here
2. Add/Remove your node
You can find the API reference here.
POST /lineage/_create
{
"linkType": "pipeline",
"automationPlatform": {
"type": "airflow",
"metadata": {
"pipelineName": "orchestrator_object_name"
}
},
"job": {
"type": "dbt",
"metadata": {
"project": project,
"model": model,
"target": target
}
}
}
POST /lineage/_remove
{
"linkType": "pipeline",
"automationPlatform": {
"type": "airflow",
"metadata": {
"pipelineName": "orchestrator_object_name"
}
},
"job": {
"type": "dbt",
"metadata": {
"project": project,
"model": model,
"target": target
}
}
}
Parameters:
-
linkType
: pipelinetype
:airflow
orother
(for instance, a Prefect Flow)metadata
/pipelineName
: Name of your orchestrator node. It will appear on Sifflet's lineage with this name.
-
job
type
:dbt
metadata
project
: your dbt projecttarget
: your dbt targetmodel
(optional). The dbt model triggered by your orchestrator.
If your dbt node is not specified, Sifflet will link the node to all models within the specified dbt project and target
3. Scripts and Examples
Please find below examples in Python and Bash to declare orchestrators nodes and link them to dbt models.
Python
Please find below an example of how to link your Airflow DAG to your dbt models.
Requirements: pip install requests
import requests
import json
tenant = "tenant_name" # change me
token = "your_token" # change with token created at step 1
url = f"https://{tenant}api.siffletdata.com/api/v1/lineages/_create"
payload = json.dumps({
"linkType": "pipeline",
"automationPlatform": {
"type": "airflow",
"metadata": {
"pipelineName": "dag_1" # change me
}
},
"job": {
"type": "dbt",
"metadata": {
"project": "my_dbt_project", # change me
"model": "my_dbt_model", # change me
"target": "my_dbt_target" # change me
}
}
})
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
if response.status_code != 204:
print("Error using the API")
print(response.text)
else:
print("Successfully pushed lineage")
Bash
Please find below an example of how to link your Prefect Flow to your dbt models.
TENANT="my_tenant" # change to your Sifflet deployment
TOKEN="my_token" # change with token created at step 1
BASE_URL="https://${TENANT}api.siffletdata.com"
curl --location "$BASE_URL/api/v1/lineages/_create" \
--header "Authorization: Bearer $TOKEN" \
--header "Content-Type: application/json" \
--data '{
"linkType": "pipeline",
"automationPlatform": {
"type": "other",
"metadata": {
"pipelineName": "name_CHANGE_ME"
}
},
"job": {
"type": "dbt",
"metadata": {
"project": "my_dbt_project_CHANGE_ME",
"model": "my_dbt_model_CHANGE_ME",
"target": "my_dbt_target_CHANGE_ME"
}
}
}'
Examples
Updated 4 months ago