Skip to main content
Users need the Admin role to create and edit pipeline Docs. Analysts have read-only access.
A pipeline Doc is a special Doc type designed for automated, multi-step data workflows. Instead of an interactive dashboard, a pipeline executes datasets in order, optionally on a schedule, and delivers results to destinations like Slack or email.

What is a Pipeline Doc?

Set kind: pipeline in your Doc YAML to create a pipeline. Unlike a standard dashboard, pipelines:
  • Execute datasets in dependency order (a DAG)
  • Don’t require a layout (it’s auto-generated if omitted)
  • Are designed to run unattended on a schedule or in response to events
version: 1
schemaVersion: "2025-01"
kind: pipeline
metadata:
  name: Daily Sales Report
  description: Pulls order data, enriches it, sends to Slack

datasets:
  # ... your datasets here

automations:
  # ... your triggers and destinations here

Dataset Dependencies (dependsOn)

Use the dependsOn array to declare that a dataset must wait for other datasets to finish before it runs. Definite uses these declarations to build a DAG (directed acyclic graph) and execute datasets in the correct order.
datasets:
  raw_orders:
    engine: sql
    sql: |
      SELECT id, customer_id, amount, created_at
      FROM orders
      WHERE created_at >= current_date - interval '30 days'

  customer_lookup:
    engine: cube
    cube: Customers
    measures: [count]
    dimensions: [id, name, region]

  enriched_report:
    engine: python
    dependsOn: [raw_orders, customer_lookup]
    inputs:
      - ref: raw_orders
        as: orders_df
      - ref: customer_lookup
        as: customers_df
    code: |
      result = orders_df.merge(customers_df, on='customer_id')
      result['daily_total'] = result.groupby('created_at')['amount'].transform('sum')
    timeoutMs: 60000
In this example, enriched_report waits for both raw_orders and customer_lookup to complete before running.
You can chain as many steps as you need. Each dataset can depend on one or more upstream datasets, forming a multi-step pipeline.

Automations

The top-level automations array defines when your pipeline runs and where results go. Each automation has a trigger and an optional destination.
Always use the plural key automations (not automation). The singular form is not supported and will cause your automations to be silently removed.
automations:
  - trigger:
      type: schedule
      config:
        type: cron
        cron: "0 9 * * 1-5"
    destination:
      type: slack
      channelId: "C1234567890"
      integrationId: "your-slack-integration-id"
      format: csv
      skipIfEmpty: true

Schedule Triggers

Schedule triggers run your pipeline on a cron. Cron expressions use the standard 5-field format (no seconds).
automations:
  - trigger:
      type: schedule
      config:
        type: cron
        cron: "0 9 * * *"  # Every day at 9 AM UTC
The minimum interval is 5 minutes. Cron expressions that resolve to a shorter interval will be rejected.

Common Cron Examples

ScheduleCron ExpressionDescription
Every hour0 * * * *Top of every hour
Every day at 9 AM UTC0 9 * * *Once daily
Weekdays at 9 AM UTC0 9 * * 1-5Monday through Friday
Every Monday at 8 AM UTC0 8 * * 1Weekly on Monday
First of every month0 0 1 * *Midnight on the 1st
Every 15 minutes*/15 * * * *Four times per hour
Every 6 hours0 */6 * * *At 00:00, 06:00, 12:00, 18:00

Event Triggers

Event triggers fire in response to something happening in your workspace.

on_sync_end

Runs the pipeline when a data source sync completes. Useful for refreshing reports right after new data lands.
automations:
  - trigger:
      type: event
      config:
        type: on_sync_end
        srcIntegrationId: "your-integration-id"
    destination:
      type: email
      address: [email protected]
      format: pdf

on_job_end

Runs the pipeline when another automation job finishes. Use this to chain pipelines together.
automations:
  - trigger:
      type: event
      config:
        type: on_job_end
        jobId: "upstream-job-id"

webhook

Runs the pipeline when an external system sends a webhook.
automations:
  - trigger:
      type: event
      config:
        type: webhook

Destinations

Destinations control where pipeline results are delivered. If you omit the destination, the pipeline executes but doesn’t send output anywhere (useful for pipelines that write data back to your warehouse).

Slack

Post results to a Slack channel. Requires a connected Slack integration.
destination:
  type: slack
  channelId: "C1234567890"
  integrationId: "your-slack-integration-id"
  format: csv
  skipIfEmpty: true

Email

Send results as an email attachment.
destination:
  type: email
  address: [email protected]
  format: pdf
  skipIfEmpty: true

Google Sheets

Push results directly to a Google Sheet.
destination:
  type: sheets
  spreadsheet_id: "your-spreadsheet-id"
  sheet: "Sheet1"
  header: true
  overwrite_sheet: true
  integration_id: "your-gsheets-integration-id"

Google Cloud Storage

Write results as a file to a GCS bucket.
destination:
  type: gcs
  bucket: "gs://your-bucket"
  blob_name: "reports/daily-report.parquet"
  format: parquet
  integration_id: "your-gcs-integration-id"

Webhook

POST results to an HTTP endpoint.
destination:
  type: webhook
  url: "https://your-api.com/ingest"

None (execute only)

Explicitly run without sending output. Equivalent to omitting destination.
destination:
  type: none

Destination Options

OptionTypeDescription
formatstringOutput format for the results
skipIfEmptybooleanDon’t send if the final dataset returns no rows
Available formats:
FormatDescription
autoAutomatically choose based on data shape
csvComma-separated values
jsonJSON array of objects
pngChart image (for visualization tiles)
pdfPDF document
parquetApache Parquet (columnar, compressed)
txtPlain text

Python Datasets in Pipelines

Python datasets are powerful building blocks for pipelines. They can transform data from upstream datasets, call external APIs, or run complex logic.

Inline Code vs. Shell Command

You can run Python in two ways: Inline code (most common): write Python directly in the YAML.
datasets:
  summary:
    engine: python
    dependsOn: [raw_data]
    inputs:
      - ref: raw_data
        as: df
    code: |
      result = df.groupby('region').agg({
        'revenue': 'sum',
        'orders': 'count'
      }).reset_index()
    timeoutMs: 60000
Shell command: run a script file instead.
datasets:
  processed:
    engine: python
    command: "uv run transform.py"
    inputs: [raw_data]
    timeoutMs: 120000

Inputs

The inputs array passes upstream dataset results into your Python code as DataFrames. You can use shorthand (just the dataset ID) or explicit binding:
# Shorthand: variable name matches the dataset ID
inputs: [raw_data, customer_lookup]

# Explicit: bind to a custom variable name
inputs:
  - ref: raw_data
    as: orders_df
  - ref: customer_lookup
    as: customers_df

fiEnvId (Persistent Sandbox)

Set fiEnvId to run your code in a persistent Fi sandbox. This is useful when your script needs state or packages from a previous session.
datasets:
  analysis:
    engine: python
    fiEnvId: "fi-thread-uuid"
    code: |
      import pandas as pd
      summary = df.describe()
    timeoutMs: 300000
Use _THREAD_ID as the fiEnvId value to reuse the current Fi thread’s sandbox, keeping packages and state across runs.

timeoutMs

Set execution timeout in milliseconds. The default is often too low for pipelines that process large datasets or call external APIs.
timeoutMs: 300000  # 5 minutes
For pipelines, set timeoutMs to at least 300000 (5 minutes). Python datasets that hit the timeout will fail silently, and the pipeline will stop.

Custom Alert Workflows

For cases where built-in destinations aren’t flexible enough (custom message formatting, conditional logic, multi-channel routing), you can write a Python dataset that sends alerts directly using the Definite SDK.
datasets:
  check_anomalies:
    engine: sql
    sql: |
      SELECT metric, value, threshold
      FROM kpi_monitor
      WHERE value > threshold

  send_alerts:
    engine: python
    dependsOn: [check_anomalies]
    inputs:
      - ref: check_anomalies
        as: anomalies
    code: |
      from definite_sdk import DefiniteClient
      client = DefiniteClient()

      for _, row in anomalies.iterrows():
          client.message_client.send(
              channel="alerts",
              text=f"Alert: {row['metric']} is {row['value']} (threshold: {row['threshold']})"
          )
    timeoutMs: 60000
See Custom Functions for more on the Definite SDK and Python environments.

Quick Reference: Complete Pipeline Example

version: 1
schemaVersion: "2025-01"
kind: pipeline
metadata:
  name: Weekly Revenue Report
  description: Aggregates revenue by region and sends to Slack every Monday

datasets:
  revenue_by_region:
    engine: sql
    sql: |
      SELECT region, SUM(amount) as total_revenue, COUNT(*) as order_count
      FROM orders
      WHERE created_at >= current_date - interval '7 days'
      GROUP BY region
      ORDER BY total_revenue DESC

  summary:
    engine: python
    dependsOn: [revenue_by_region]
    inputs:
      - ref: revenue_by_region
        as: df
    code: |
      total = df['total_revenue'].sum()
      df['pct_of_total'] = (df['total_revenue'] / total * 100).round(1)
      result = df
    timeoutMs: 30000

automations:
  - trigger:
      type: schedule
      config:
        type: cron
        cron: "0 9 * * 1"  # Every Monday at 9 AM UTC
    destination:
      type: slack
      channelId: "C1234567890"
      integrationId: "slack-integration-id"
      format: csv
      skipIfEmpty: true

Next Steps