Declarative Assets & Lineage

Overview

Sifflet comes with a large number of built-in integrations spanning your entire data pipelines’ stack, from ingestion to consumption. Built-in integrations automatically collect metadata and lineage information and make it available in the Data Catalog.

In some cases though, Sifflet cannot retrieve assets metadata and lineage information, for instance because the built-in integration doesn't exist yet, because Sifflet cannot connect to your closed environment or doesn't have enough access to retrieve and compute the lineage, etc.

For these cases, you have the possibility to programmatically declare data pipeline assets and lineage to ensure an end-to-end observability experience. You can for instance:

  • Reflect data sources such as CRMs (e.g. Salesforce, Pipedrive, etc.), ERPs (e.g. SAP, etc.), marketing automation tools (e.g. HubSpot, Marketo, etc.), etc.
  • Create assets from your preferred BI tools (e.g. Metabase, MicroStrategy, etc.)
  • Surface key API calls and custom scripts
  • Display machine learning models
  • Catalog and reflect orchestrators on lineage
  • Show custom data applications
  • Link assets Sifflet cannot compute lineage for

Getting Started

You can declare assets and their lineage using the declarative assets & lineage framework.

If you are looking to add orchestrators on top of your transformation assets, you can use a dedicated set of API endpoints.

Declarative Assets & Lineage Framework

To programatically declare assets and their lineage, you need to leverage the declared assets & lineage framework. This framework leverages two API endpoints that allow you to declare three main object types.

Assets

Assets are the smallest entity you can currently declare. They correspond to data-related components such as a dashboard, a table, a machine learning model, etc. Declared assets will show up like regular assets on your Data catalog.

Sources

Sources are a logic way to group your assets. You can for instance have a source corresponding to your staging assets and another one corresponding to your production assets. Declared sources will show up alongside Sifflet sources. Declaring sources is optional as it is done automatically from URI.

Workspaces

Workspaces are the highest entity you can declare, they contain declared sources and assets. Workspaces are isolated from each other. Workspaces allow you to manage a collection of declared sources and assets without interfering with collections owned by different teams that would exist in other workspaces.

Prerequisites

See API prerequisites

Manage Declarative Assets & Lineage

Declare Assets & Lineage

To declare assets & lineage, you need to leverage the POST /v1/assets/sync endpoint.

You can declare:

  • Assets only, using the assets array of objects
  • Lineage links only, using the lineages array of objects
  • Both assets and lineage links
    • Using the assets array of objects that contains a lineages object
    • Using the assets array of objects and the lineages array of objects
    • Using a mix of the two above options

Sifflet is equipped with a set of technology icons, which allows you to have your declarative assets show up with the proper technology icon in the Sifflet application if you use the appropriate technology name when defining your assets' URIs. The list of expected technology names for URIs is available here.

Declare Sources

Declaring sources via the sources array of objects is optional. Declaring a source is useful if you want to attach it specific metadata (e.g. a name, a description, etc.). If no source is declared, Sifflet automatically adds declared assets to sources using declared assets URIs.

Modify Declarative Assets & Lineage

Once assets, lineage links, and sources are created in a workspace, they become read-only on the User Interface (UI). Modifications or deletions are exclusively through the declarative asset & lineage framework. This will evolve in the future to allow you to manage your declarative assets & lineage both programmatically and from the UI.

To modify declarative assets and sources, just update your JSON payload and push the new version to the POST /v1/assets/sync endpoint.

Deleted Declarative Assets & Lineage

To delete declarative assets & lineage, you need to leverage the DELETE /v1/assets/{name} endpoint.

How to create/remove an orchestrator node on your lineage

One common case is to get visibility on which Airflow DAG triggers which dbt models. This allows you to:

  • have a detailed view of your data pipeline: orchestrator + transformation + data assets
  • In case of a data anomaly alert, accelerate the investigation time of the root cause issue

As of today, you can link in your lineage any orchestrator information to your dbt models. If you need to link other information, do not hesitate to reach out. This page is constantly updated with our feature coverage.

Declarative lineage scopedbt modelOther transformation node
Airflow DAG
Other

Two steps are required to declare programmatically an orchestrator node:

  1. Generate an Access Token
  2. Add/Remove your nodes and links

1. Generate an Access Token

You can find more information on how to generate an Access Token here

2. Add/Remove your node

You can find the API reference here.


POST /lineage/_create
{
	"linkType": "pipeline",
    "automationPlatform": {
        "type": "airflow",
        "metadata": {
            "pipelineName": "orchestrator_object_name"
        }
    },
    "job": {
        "type": "dbt",
        "metadata": {
            "project": project,
            "model": model,
            "target": target
        }
    }
}


POST /lineage/_remove
{
	"linkType": "pipeline",
    "automationPlatform": {
        "type": "airflow",
        "metadata": {
            "pipelineName": "orchestrator_object_name"
        }
    },
    "job": {
        "type": "dbt",
        "metadata": {
            "project": project,
            "model": model,
            "target": target
        }
    }
}

Parameters:

  • linkType: pipeline

    • type: airflow or other (for instance, a Prefect Flow)
    • metadata/pipelineName: Name of your orchestrator node. It will appear on Sifflet's lineage with this name.
  • job

    • type: dbt
    • metadata
      • project: your dbt project
      • target: your dbt target
      • model (optional). The dbt model triggered by your orchestrator.
        If your dbt node is not specified, Sifflet will link the node to all models within the specified dbt project and target

3. Scripts and Examples

Please find below examples in Python and Bash to declare orchestrators nodes and link them to dbt models.

Python

Please find below an example of how to link your Airflow DAG to your dbt models.

Requirements: pip install requests

import requests
import json

tenant = "tenant_name" # change me
token = "your_token" # change with token created at step 1

url = f"https://{tenant}api.siffletdata.com/api/v1/lineages/_create"

payload = json.dumps({
  "linkType": "pipeline",
  "automationPlatform": {
    "type": "airflow",
    "metadata": {
      "pipelineName": "dag_1" # change me
    }
  },
  "job": {
    "type": "dbt",
    "metadata": {
      "project": "my_dbt_project", # change me
      "model": "my_dbt_model", # change me
      "target": "my_dbt_target" # change me
    }
  }
})
headers = {
  'Authorization': f'Bearer {token}',
  'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)

if response.status_code != 204:
    print("Error using the API")
    print(response.text)
else:
    print("Successfully pushed lineage")

Bash

Please find below an example of how to link your Prefect Flow to your dbt models.

TENANT="my_tenant" # change to your Sifflet deployment
TOKEN="my_token" # change with token created at step 1

BASE_URL="https://${TENANT}api.siffletdata.com"

curl --location "$BASE_URL/api/v1/lineages/_create" \
--header "Authorization: Bearer $TOKEN" \
--header "Content-Type: application/json" \
--data '{
	"linkType": "pipeline",
    "automationPlatform": {
        "type": "other",
        "metadata": {
            "pipelineName": "name_CHANGE_ME"
        }
    },
    "job": {
        "type": "dbt",
        "metadata": {
            "project": "my_dbt_project_CHANGE_ME",
            "model": "my_dbt_model_CHANGE_ME",
            "target": "my_dbt_target_CHANGE_ME"
        }
    }
}'

Examples

Example in the case of Airflow with dbt

Example in the case of Airflow with dbt

Example in the case of any other Orchestrator with dbt

Example in the case of any other Orchestrator with dbt