> ## Documentation Index
> Fetch the complete documentation index at: https://docs.definite.app/llms.txt
> Use this file to discover all available pages before exploring further.

# Pipelines and Automations

> Build multi-step data pipelines with dependencies, schedule them on a cron, and send results to Slack or email.

<Note>
  Users need the **Admin** [role](/workspace) to create and edit pipeline Docs. Analysts have read-only access.
</Note>

A pipeline Doc is a special Doc type designed for **automated, multi-step data workflows**. Instead of an interactive dashboard, a pipeline executes datasets in order, optionally on a schedule, and delivers results to destinations like Slack or email.

## What is a Pipeline Doc?

Set `kind: pipeline` in your Doc YAML to create a pipeline. Unlike a standard dashboard, pipelines:

* Execute datasets in dependency order (a DAG)
* Don't require a layout (it's auto-generated if omitted)
* Are designed to run unattended on a schedule or in response to events

```yaml theme={null}
version: 1
schemaVersion: "2025-01"
kind: pipeline
metadata:
  name: Daily Sales Report
  description: Pulls order data, enriches it, sends to Slack

datasets:
  # ... your datasets here

automations:
  # ... your triggers and destinations here
```

## Dataset Dependencies (dependsOn)

Use the `dependsOn` array to declare that a dataset must wait for other datasets to finish before it runs. Definite uses these declarations to build a DAG (directed acyclic graph) and execute datasets in the correct order.

```yaml theme={null}
datasets:
  raw_orders:
    engine: sql
    sql: |
      SELECT id, customer_id, amount, created_at
      FROM orders
      WHERE created_at >= current_date - interval '30 days'

  customer_lookup:
    engine: cube
    cube: Customers
    measures: [count]
    dimensions: [id, name, region]

  enriched_report:
    engine: python
    dependsOn: [raw_orders, customer_lookup]
    inputs:
      - ref: raw_orders
        as: orders_df
      - ref: customer_lookup
        as: customers_df
    code: |
      result = orders_df.merge(customers_df, on='customer_id')
      result['daily_total'] = result.groupby('created_at')['amount'].transform('sum')
    timeoutMs: 60000
```

In this example, `enriched_report` waits for both `raw_orders` and `customer_lookup` to complete before running.

<Tip>
  You can chain as many steps as you need. Each dataset can depend on one or more upstream datasets, forming a multi-step pipeline.
</Tip>

## Automations

The top-level `automations` array defines when your pipeline runs and where results go. Each automation has a **trigger** and an optional **destination**.

<Warning>
  Always use the plural key `automations` (not `automation`). The singular form is not supported and will cause your automations to be silently removed.
</Warning>

```yaml theme={null}
automations:
  - trigger:
      type: schedule
      config:
        type: cron
        cron: "0 9 * * 1-5"
    destination:
      type: slack
      channelId: "C1234567890"
      integrationId: "your-slack-integration-id"
      format: csv
      skipIfEmpty: true
```

### Schedule Triggers

Schedule triggers run your pipeline on a cron. Cron expressions use the standard 5-field format (no seconds).

```yaml theme={null}
automations:
  - trigger:
      type: schedule
      config:
        type: cron
        cron: "0 9 * * *"  # Every day at 9 AM UTC
```

<Note>
  The minimum interval is **5 minutes**. Cron expressions that resolve to a shorter interval will be rejected.
</Note>

#### Common Cron Examples

| Schedule                 | Cron Expression | Description                   |
| ------------------------ | --------------- | ----------------------------- |
| Every hour               | `0 * * * *`     | Top of every hour             |
| Every day at 9 AM UTC    | `0 9 * * *`     | Once daily                    |
| Weekdays at 9 AM UTC     | `0 9 * * 1-5`   | Monday through Friday         |
| Every Monday at 8 AM UTC | `0 8 * * 1`     | Weekly on Monday              |
| First of every month     | `0 0 1 * *`     | Midnight on the 1st           |
| Every 15 minutes         | `*/15 * * * *`  | Four times per hour           |
| Every 6 hours            | `0 */6 * * *`   | At 00:00, 06:00, 12:00, 18:00 |

### Event Triggers

Event triggers fire in response to something happening in your workspace.

#### on\_sync\_end

Runs the pipeline when a data source sync completes. Useful for refreshing reports right after new data lands.

```yaml theme={null}
automations:
  - trigger:
      type: event
      config:
        type: on_sync_end
        srcIntegrationId: "your-integration-id"
    destination:
      type: email
      address: analytics-team@company.com
      format: pdf
```

#### on\_job\_end

Runs the pipeline when another automation job finishes. Use this to chain pipelines together.

```yaml theme={null}
automations:
  - trigger:
      type: event
      config:
        type: on_job_end
        jobId: "upstream-job-id"
```

#### webhook

Runs the pipeline when an external system sends a webhook.

```yaml theme={null}
automations:
  - trigger:
      type: event
      config:
        type: webhook
```

### Destinations

Destinations control where pipeline results are delivered. If you omit the destination, the pipeline executes but doesn't send output anywhere (useful for pipelines that write data back to your warehouse).

#### Slack

Post results to a Slack channel. Requires a connected Slack integration.

```yaml theme={null}
destination:
  type: slack
  channelId: "C1234567890"
  integrationId: "your-slack-integration-id"
  format: csv
  skipIfEmpty: true
```

#### Email

Send results as an email attachment.

```yaml theme={null}
destination:
  type: email
  address: team@company.com
  format: pdf
  skipIfEmpty: true
```

#### Google Sheets

Push results directly to a Google Sheet.

```yaml theme={null}
destination:
  type: sheets
  spreadsheet_id: "your-spreadsheet-id"
  sheet: "Sheet1"
  header: true
  overwrite_sheet: true
  integration_id: "your-gsheets-integration-id"
```

#### Google Cloud Storage

Write results as a file to a GCS bucket.

```yaml theme={null}
destination:
  type: gcs
  bucket: "gs://your-bucket"
  blob_name: "reports/daily-report.parquet"
  format: parquet
  integration_id: "your-gcs-integration-id"
```

#### Webhook

POST results to an HTTP endpoint.

```yaml theme={null}
destination:
  type: webhook
  url: "https://your-api.com/ingest"
```

#### None (execute only)

Explicitly run without sending output. Equivalent to omitting `destination`.

```yaml theme={null}
destination:
  type: none
```

#### Destination Options

| Option        | Type    | Description                                     |
| ------------- | ------- | ----------------------------------------------- |
| `format`      | string  | Output format for the results                   |
| `skipIfEmpty` | boolean | Don't send if the final dataset returns no rows |

**Available formats:**

| Format    | Description                              |
| --------- | ---------------------------------------- |
| `auto`    | Automatically choose based on data shape |
| `csv`     | Comma-separated values                   |
| `json`    | JSON array of objects                    |
| `png`     | Chart image (for visualization tiles)    |
| `pdf`     | PDF document                             |
| `parquet` | Apache Parquet (columnar, compressed)    |
| `txt`     | Plain text                               |

## Python Datasets in Pipelines

Python datasets are powerful building blocks for pipelines. They can transform data from upstream datasets, call external APIs, or run complex logic.

### Inline Code vs. Shell Command

You can run Python in two ways:

**Inline code** (most common): write Python directly in the YAML.

```yaml theme={null}
datasets:
  summary:
    engine: python
    dependsOn: [raw_data]
    inputs:
      - ref: raw_data
        as: df
    code: |
      result = df.groupby('region').agg({
        'revenue': 'sum',
        'orders': 'count'
      }).reset_index()
    timeoutMs: 60000
```

**Shell command**: run a script file instead.

```yaml theme={null}
datasets:
  processed:
    engine: python
    command: "uv run transform.py"
    inputs: [raw_data]
    timeoutMs: 120000
```

### Inputs

The `inputs` array passes upstream dataset results into your Python code as DataFrames. You can use shorthand (just the dataset ID) or explicit binding:

```yaml theme={null}
# Shorthand: variable name matches the dataset ID
inputs: [raw_data, customer_lookup]

# Explicit: bind to a custom variable name
inputs:
  - ref: raw_data
    as: orders_df
  - ref: customer_lookup
    as: customers_df
```

### fiEnvId (Persistent Sandbox)

Set `fiEnvId` to run your code in a persistent Fi sandbox. This is useful when your script needs state or packages from a previous session.

```yaml theme={null}
datasets:
  analysis:
    engine: python
    fiEnvId: "fi-thread-uuid"
    code: |
      import pandas as pd
      summary = df.describe()
    timeoutMs: 300000
```

<Tip>
  Use `_THREAD_ID` as the `fiEnvId` value to reuse the current Fi thread's sandbox, keeping packages and state across runs.
</Tip>

### timeoutMs

Set execution timeout in milliseconds. The default is often too low for pipelines that process large datasets or call external APIs.

```yaml theme={null}
timeoutMs: 300000  # 5 minutes
```

<Warning>
  For pipelines, set `timeoutMs` to at least **300000** (5 minutes). Python datasets that hit the timeout will fail silently, and the pipeline will stop.
</Warning>

## Custom Alert Workflows

For cases where built-in destinations aren't flexible enough (custom message formatting, conditional logic, multi-channel routing), you can write a Python dataset that sends alerts directly using the [Definite SDK](/custom-functions/definite-sdk).

```yaml theme={null}
datasets:
  check_anomalies:
    engine: sql
    sql: |
      SELECT metric, value, threshold
      FROM kpi_monitor
      WHERE value > threshold

  send_alerts:
    engine: python
    dependsOn: [check_anomalies]
    inputs:
      - ref: check_anomalies
        as: anomalies
    code: |
      from definite_sdk import DefiniteClient
      client = DefiniteClient()

      for _, row in anomalies.iterrows():
          client.message_client.send(
              channel="alerts",
              text=f"Alert: {row['metric']} is {row['value']} (threshold: {row['threshold']})"
          )
    timeoutMs: 60000
```

See [Custom Functions](/custom-functions) for more on the Definite SDK and Python environments.

## Quick Reference: Complete Pipeline Example

```yaml theme={null}
version: 1
schemaVersion: "2025-01"
kind: pipeline
metadata:
  name: Weekly Revenue Report
  description: Aggregates revenue by region and sends to Slack every Monday

datasets:
  revenue_by_region:
    engine: sql
    sql: |
      SELECT region, SUM(amount) as total_revenue, COUNT(*) as order_count
      FROM orders
      WHERE created_at >= current_date - interval '7 days'
      GROUP BY region
      ORDER BY total_revenue DESC

  summary:
    engine: python
    dependsOn: [revenue_by_region]
    inputs:
      - ref: revenue_by_region
        as: df
    code: |
      total = df['total_revenue'].sum()
      df['pct_of_total'] = (df['total_revenue'] / total * 100).round(1)
      result = df
    timeoutMs: 30000

automations:
  - trigger:
      type: schedule
      config:
        type: cron
        cron: "0 9 * * 1"  # Every Monday at 9 AM UTC
    destination:
      type: slack
      channelId: "C1234567890"
      integrationId: "slack-integration-id"
      format: csv
      skipIfEmpty: true
```

## Next Steps

<CardGroup cols={2}>
  <Card title="Custom Functions" icon="code" href="/custom-functions">
    Write Python scripts with the Definite SDK
  </Card>

  <Card title="Tiles" icon="grid-2" href="/analyze-build/docs/tiles">
    Learn about tile types and visualization options
  </Card>
</CardGroup>
