Users need the Admin role to create and edit pipeline Docs. Analysts have read-only access.
A pipeline Doc is a special Doc type designed for automated, multi-step data workflows. Instead of an interactive dashboard, a pipeline executes datasets in order, optionally on a schedule, and delivers results to destinations like Slack or email.
What is a Pipeline Doc?
Set kind: pipeline in your Doc YAML to create a pipeline. Unlike a standard dashboard, pipelines:
- Execute datasets in dependency order (a DAG)
- Don’t require a layout (it’s auto-generated if omitted)
- Are designed to run unattended on a schedule or in response to events
version: 1
schemaVersion: "2025-01"
kind: pipeline
metadata:
name: Daily Sales Report
description: Pulls order data, enriches it, sends to Slack
datasets:
# ... your datasets here
automations:
# ... your triggers and destinations here
Dataset Dependencies (dependsOn)
Use the dependsOn array to declare that a dataset must wait for other datasets to finish before it runs. Definite uses these declarations to build a DAG (directed acyclic graph) and execute datasets in the correct order.
datasets:
raw_orders:
engine: sql
sql: |
SELECT id, customer_id, amount, created_at
FROM orders
WHERE created_at >= current_date - interval '30 days'
customer_lookup:
engine: cube
cube: Customers
measures: [count]
dimensions: [id, name, region]
enriched_report:
engine: python
dependsOn: [raw_orders, customer_lookup]
inputs:
- ref: raw_orders
as: orders_df
- ref: customer_lookup
as: customers_df
code: |
result = orders_df.merge(customers_df, on='customer_id')
result['daily_total'] = result.groupby('created_at')['amount'].transform('sum')
timeoutMs: 60000
In this example, enriched_report waits for both raw_orders and customer_lookup to complete before running.
You can chain as many steps as you need. Each dataset can depend on one or more upstream datasets, forming a multi-step pipeline.
Automations
The top-level automations array defines when your pipeline runs and where results go. Each automation has a trigger and an optional destination.
Always use the plural key automations (not automation). The singular form is not supported and will cause your automations to be silently removed.
automations:
- trigger:
type: schedule
config:
type: cron
cron: "0 9 * * 1-5"
destination:
type: slack
channelId: "C1234567890"
integrationId: "your-slack-integration-id"
format: csv
skipIfEmpty: true
Schedule Triggers
Schedule triggers run your pipeline on a cron. Cron expressions use the standard 5-field format (no seconds).
automations:
- trigger:
type: schedule
config:
type: cron
cron: "0 9 * * *" # Every day at 9 AM UTC
The minimum interval is 5 minutes. Cron expressions that resolve to a shorter interval will be rejected.
Common Cron Examples
| Schedule | Cron Expression | Description |
|---|
| Every hour | 0 * * * * | Top of every hour |
| Every day at 9 AM UTC | 0 9 * * * | Once daily |
| Weekdays at 9 AM UTC | 0 9 * * 1-5 | Monday through Friday |
| Every Monday at 8 AM UTC | 0 8 * * 1 | Weekly on Monday |
| First of every month | 0 0 1 * * | Midnight on the 1st |
| Every 15 minutes | */15 * * * * | Four times per hour |
| Every 6 hours | 0 */6 * * * | At 00:00, 06:00, 12:00, 18:00 |
Event Triggers
Event triggers fire in response to something happening in your workspace.
on_sync_end
Runs the pipeline when a data source sync completes. Useful for refreshing reports right after new data lands.
automations:
- trigger:
type: event
config:
type: on_sync_end
srcIntegrationId: "your-integration-id"
destination:
type: email
address: [email protected]
format: pdf
on_job_end
Runs the pipeline when another automation job finishes. Use this to chain pipelines together.
automations:
- trigger:
type: event
config:
type: on_job_end
jobId: "upstream-job-id"
webhook
Runs the pipeline when an external system sends a webhook.
automations:
- trigger:
type: event
config:
type: webhook
Destinations
Destinations control where pipeline results are delivered. If you omit the destination, the pipeline executes but doesn’t send output anywhere (useful for pipelines that write data back to your warehouse).
Slack
Post results to a Slack channel. Requires a connected Slack integration.
destination:
type: slack
channelId: "C1234567890"
integrationId: "your-slack-integration-id"
format: csv
skipIfEmpty: true
Email
Send results as an email attachment.
Google Sheets
Push results directly to a Google Sheet.
destination:
type: sheets
spreadsheet_id: "your-spreadsheet-id"
sheet: "Sheet1"
header: true
overwrite_sheet: true
integration_id: "your-gsheets-integration-id"
Google Cloud Storage
Write results as a file to a GCS bucket.
destination:
type: gcs
bucket: "gs://your-bucket"
blob_name: "reports/daily-report.parquet"
format: parquet
integration_id: "your-gcs-integration-id"
Webhook
POST results to an HTTP endpoint.
destination:
type: webhook
url: "https://your-api.com/ingest"
None (execute only)
Explicitly run without sending output. Equivalent to omitting destination.
Destination Options
| Option | Type | Description |
|---|
format | string | Output format for the results |
skipIfEmpty | boolean | Don’t send if the final dataset returns no rows |
Available formats:
| Format | Description |
|---|
auto | Automatically choose based on data shape |
csv | Comma-separated values |
json | JSON array of objects |
png | Chart image (for visualization tiles) |
pdf | PDF document |
parquet | Apache Parquet (columnar, compressed) |
txt | Plain text |
Python Datasets in Pipelines
Python datasets are powerful building blocks for pipelines. They can transform data from upstream datasets, call external APIs, or run complex logic.
Inline Code vs. Shell Command
You can run Python in two ways:
Inline code (most common): write Python directly in the YAML.
datasets:
summary:
engine: python
dependsOn: [raw_data]
inputs:
- ref: raw_data
as: df
code: |
result = df.groupby('region').agg({
'revenue': 'sum',
'orders': 'count'
}).reset_index()
timeoutMs: 60000
Shell command: run a script file instead.
datasets:
processed:
engine: python
command: "uv run transform.py"
inputs: [raw_data]
timeoutMs: 120000
The inputs array passes upstream dataset results into your Python code as DataFrames. You can use shorthand (just the dataset ID) or explicit binding:
# Shorthand: variable name matches the dataset ID
inputs: [raw_data, customer_lookup]
# Explicit: bind to a custom variable name
inputs:
- ref: raw_data
as: orders_df
- ref: customer_lookup
as: customers_df
fiEnvId (Persistent Sandbox)
Set fiEnvId to run your code in a persistent Fi sandbox. This is useful when your script needs state or packages from a previous session.
datasets:
analysis:
engine: python
fiEnvId: "fi-thread-uuid"
code: |
import pandas as pd
summary = df.describe()
timeoutMs: 300000
Use _THREAD_ID as the fiEnvId value to reuse the current Fi thread’s sandbox, keeping packages and state across runs.
timeoutMs
Set execution timeout in milliseconds. The default is often too low for pipelines that process large datasets or call external APIs.
timeoutMs: 300000 # 5 minutes
For pipelines, set timeoutMs to at least 300000 (5 minutes). Python datasets that hit the timeout will fail silently, and the pipeline will stop.
Custom Alert Workflows
For cases where built-in destinations aren’t flexible enough (custom message formatting, conditional logic, multi-channel routing), you can write a Python dataset that sends alerts directly using the Definite SDK.
datasets:
check_anomalies:
engine: sql
sql: |
SELECT metric, value, threshold
FROM kpi_monitor
WHERE value > threshold
send_alerts:
engine: python
dependsOn: [check_anomalies]
inputs:
- ref: check_anomalies
as: anomalies
code: |
from definite_sdk import DefiniteClient
client = DefiniteClient()
for _, row in anomalies.iterrows():
client.message_client.send(
channel="alerts",
text=f"Alert: {row['metric']} is {row['value']} (threshold: {row['threshold']})"
)
timeoutMs: 60000
See Custom Functions for more on the Definite SDK and Python environments.
Quick Reference: Complete Pipeline Example
version: 1
schemaVersion: "2025-01"
kind: pipeline
metadata:
name: Weekly Revenue Report
description: Aggregates revenue by region and sends to Slack every Monday
datasets:
revenue_by_region:
engine: sql
sql: |
SELECT region, SUM(amount) as total_revenue, COUNT(*) as order_count
FROM orders
WHERE created_at >= current_date - interval '7 days'
GROUP BY region
ORDER BY total_revenue DESC
summary:
engine: python
dependsOn: [revenue_by_region]
inputs:
- ref: revenue_by_region
as: df
code: |
total = df['total_revenue'].sum()
df['pct_of_total'] = (df['total_revenue'] / total * 100).round(1)
result = df
timeoutMs: 30000
automations:
- trigger:
type: schedule
config:
type: cron
cron: "0 9 * * 1" # Every Monday at 9 AM UTC
destination:
type: slack
channelId: "C1234567890"
integrationId: "slack-integration-id"
format: csv
skipIfEmpty: true
Next Steps