> ## Documentation Index
> Fetch the complete documentation index at: https://docs.definite.app/llms.txt
> Use this file to discover all available pages before exploring further.

# Push-Based Data Ingestion

> Run your own data extractor for sensitive or private network deployments

Push-based ingestion lets you run data extraction on your own infrastructure while still landing data in Definite's DuckLake. This pattern is ideal for organizations with strict data residency requirements or databases that aren't accessible from the public internet.

## When to Use This Pattern

<CardGroup cols={2}>
  <Card title="Data Residency" icon="shield">
    Your compliance requirements mandate that raw data stays within your network
  </Card>

  <Card title="Private Networks" icon="lock">
    Your database is behind a firewall or in a private VPC with no public access
  </Card>

  <Card title="Custom Extraction" icon="code">
    You need custom transformation logic before data leaves your environment
  </Card>

  <Card title="Existing CDC Pipeline" icon="arrows-rotate">
    You already have Debezium, Fivetran, or another CDC tool running internally
  </Card>
</CardGroup>

<Info>
  If your database is publicly accessible and you don't have data residency requirements, consider using Definite's [built-in connectors](/extractors) instead. They're fully managed and require no infrastructure on your end.
</Info>

***

## Architecture Overview

With push-based ingestion, you run the data extraction within your own infrastructure. Only the extracted data is sent to Definite via HTTPS.

<Steps>
  <Step title="Your Infrastructure" icon="building">
    Your **Database** (Postgres, MySQL, etc.) lives in your VPC or private network.
  </Step>

  <Step title="Your Extractor" icon="code">
    You run an **Extractor** (Lambda, ECS, EC2, or any compute) that queries your database and formats the data.
  </Step>

  <Step title="HTTPS to Definite" icon="arrow-right">
    Your extractor sends data via **HTTPS POST** to `api.definite.app/v2/stream`. Only outbound traffic—no inbound connections required.
  </Step>

  <Step title="DuckLake" icon="database">
    Data lands in **DuckLake** (Iceberg tables on GCS) and is immediately queryable.
  </Step>
</Steps>

**Key benefits:**

* Raw data never leaves your network
* You control the extraction schedule and logic
* Only outbound HTTPS traffic required (no inbound connections)
* Works with any database or data source

***

## Example: Postgres Incremental Sync

This example shows a Python script that incrementally syncs data from Postgres to Definite. It uses a watermark column (like `updated_at`) to only sync changed rows.

### Full Implementation

```python theme={null}
#!/usr/bin/env python3
"""
Incremental sync from Postgres to Definite.
Extracts rows modified since last sync and pushes to Stream API.
"""

import json
import os
from datetime import datetime, timezone
from pathlib import Path

import httpx
import psycopg

# Configuration
DEFINITE_API_URL = "https://api.definite.app/v2/stream"
DEFINITE_API_KEY = os.environ["DEFINITE_API_KEY"]
POSTGRES_DSN = os.environ["POSTGRES_DSN"]
STATE_FILE = Path("/tmp/sync_state.json")
BATCH_SIZE = 1000

# Tables to sync: (source_table, destination_table, watermark_column)
TABLES = [
    ("public.customers", "bronze.customers", "updated_at"),
    ("public.orders", "bronze.orders", "updated_at"),
    ("public.products", "bronze.products", "updated_at"),
]


def load_state() -> dict:
    """Load sync state from disk"""
    if STATE_FILE.exists():
        return json.loads(STATE_FILE.read_text())
    return {}


def save_state(state: dict) -> None:
    """Save sync state to disk"""
    STATE_FILE.write_text(json.dumps(state, default=str))


def push_to_definite(table: str, rows: list[dict]) -> dict:
    """Push batch to Definite Stream API"""
    response = httpx.post(
        DEFINITE_API_URL,
        json={
            "data": rows,
            "config": {
                "table": table,
                "mode": "append",
                "tags": {"source": "postgres-sync"},
            },
        },
        headers={"Authorization": f"Bearer {DEFINITE_API_KEY}"},
        timeout=60.0,
    )
    response.raise_for_status()
    return response.json()


def sync_table(
    source: str,
    dest: str,
    watermark_col: str,
    last_sync: datetime | None,
) -> datetime | None:
    """Sync a single table, returning the new watermark"""

    # Build query
    if last_sync:
        query = f"""
            SELECT * FROM {source}
            WHERE {watermark_col} > %s
            ORDER BY {watermark_col}
        """
        params = (last_sync,)
    else:
        # Initial full sync
        query = f"SELECT * FROM {source} ORDER BY {watermark_col}"
        params = ()

    max_watermark = last_sync
    total_rows = 0

    with psycopg.connect(POSTGRES_DSN) as conn:
        with conn.cursor() as cur:
            cur.execute(query, params)
            columns = [desc.name for desc in cur.description]

            batch = []
            for record in cur:
                row = dict(zip(columns, record))

                # Track max watermark
                row_watermark = row.get(watermark_col)
                if row_watermark and (not max_watermark or row_watermark > max_watermark):
                    max_watermark = row_watermark

                # Convert datetime to ISO format for JSON
                for key, value in row.items():
                    if isinstance(value, datetime):
                        row[key] = value.isoformat()

                batch.append(row)

                # Push batch when full
                if len(batch) >= BATCH_SIZE:
                    result = push_to_definite(dest, batch)
                    total_rows += result["successful_rows"]
                    print(f"  Pushed {result['successful_rows']} rows to {dest}")
                    batch = []

            # Push remaining rows
            if batch:
                result = push_to_definite(dest, batch)
                total_rows += result["successful_rows"]
                print(f"  Pushed {result['successful_rows']} rows to {dest}")

    print(f"  Total: {total_rows} rows synced")
    return max_watermark


def main():
    """Run incremental sync for all configured tables"""
    state = load_state()

    for source, dest, watermark_col in TABLES:
        print(f"Syncing {source} -> {dest}")

        last_sync = state.get(source)
        if last_sync:
            last_sync = datetime.fromisoformat(last_sync)

        new_watermark = sync_table(source, dest, watermark_col, last_sync)

        if new_watermark:
            state[source] = new_watermark.isoformat()

    save_state(state)
    print("Sync complete!")


if __name__ == "__main__":
    main()
```

### Environment Variables

```bash theme={null}
export DEFINITE_API_KEY="your-api-key"
export POSTGRES_DSN="postgresql://user:pass@localhost:5432/mydb"
```

***

## Handling Deletes and Updates

The Stream API uses append-only semantics. To handle updates and deletes from your source database, include CDC metadata in your records:

### Include CDC Operation Type

```python theme={null}
# When extracting, add CDC metadata
row["_cdc_op"] = "u"  # c=create, u=update, d=delete
row["_cdc_ts"] = datetime.now(timezone.utc).isoformat()
```

### Create a View for Current State

In DuckLake, create a view that shows only the current state of each record:

```sql theme={null}
CREATE VIEW silver.customers AS
SELECT * FROM (
    SELECT
        *,
        ROW_NUMBER() OVER (
            PARTITION BY id
            ORDER BY _cdc_ts DESC
        ) as _row_num
    FROM bronze.customers
    WHERE _cdc_op != 'd'
)
WHERE _row_num = 1;
```

This pattern gives you:

* Full history in the `bronze` layer
* Current state in the `silver` layer
* Ability to query point-in-time snapshots

***

## Deployment Options

### AWS Lambda (Scheduled)

Run your sync on a schedule using EventBridge:

```python theme={null}
# handler.py
def lambda_handler(event, context):
    main()
    return {"statusCode": 200, "body": "Sync complete"}
```

```yaml theme={null}
# serverless.yml
functions:
  sync:
    handler: handler.lambda_handler
    timeout: 900
    events:
      - schedule: rate(5 minutes)
    environment:
      DEFINITE_API_KEY: ${ssm:/definite/api-key}
      POSTGRES_DSN: ${ssm:/postgres/dsn}
```

### ECS Fargate (Scheduled Task)

```json theme={null}
{
  "family": "postgres-sync",
  "containerDefinitions": [{
    "name": "sync",
    "image": "your-registry/postgres-sync:latest",
    "environment": [
      {"name": "DEFINITE_API_KEY", "value": "your-key"},
      {"name": "POSTGRES_DSN", "value": "your-dsn"}
    ]
  }]
}
```

Schedule with EventBridge or CloudWatch Events.

### EC2 / On-Premise (Cron)

```bash theme={null}
# /etc/cron.d/postgres-sync
*/5 * * * * ubuntu /opt/sync/run.sh >> /var/log/sync.log 2>&1
```

***

## Error Handling

Implement retry logic with exponential backoff for transient failures:

```python theme={null}
import time
from httpx import HTTPStatusError

def push_with_retry(table: str, rows: list[dict], max_retries: int = 3) -> dict:
    """Push with exponential backoff retry"""
    for attempt in range(max_retries):
        try:
            return push_to_definite(table, rows)
        except HTTPStatusError as e:
            if e.response.status_code >= 500 or e.response.status_code == 429:
                # Transient error - retry
                wait = 2 ** attempt
                print(f"Retry {attempt + 1}/{max_retries} in {wait}s...")
                time.sleep(wait)
            else:
                # Client error - don't retry
                raise

    raise Exception(f"Failed after {max_retries} retries")
```

### Dead Letter Queue

For production deployments, consider sending failed batches to a dead letter queue:

```python theme={null}
import boto3

sqs = boto3.client("sqs")
DLQ_URL = os.environ.get("DLQ_URL")

def push_with_dlq(table: str, rows: list[dict]) -> dict:
    try:
        return push_with_retry(table, rows)
    except Exception as e:
        if DLQ_URL:
            sqs.send_message(
                QueueUrl=DLQ_URL,
                MessageBody=json.dumps({
                    "table": table,
                    "rows": rows,
                    "error": str(e),
                })
            )
        raise
```

***

## Monitoring

Track these metrics to ensure your sync is healthy:

| Metric             | Description                     |
| ------------------ | ------------------------------- |
| `rows_extracted`   | Rows read from source database  |
| `rows_pushed`      | Rows sent to Definite           |
| `push_latency_ms`  | Time to push each batch         |
| `sync_lag_seconds` | Time since last successful sync |
| `errors_total`     | Failed push attempts            |

### Health Check Endpoint

If running as a service, expose a health endpoint:

```python theme={null}
from fastapi import FastAPI

app = FastAPI()

@app.get("/health")
def health():
    return {
        "status": "healthy",
        "last_sync": load_state(),
        "tables": len(TABLES),
    }
```

***

## Alternative: Debezium + Kafka

For high-volume, real-time CDC, you can use Debezium with a Kafka HTTP Sink connector:

```
Postgres → Debezium → Kafka → HTTP Sink → Definite Stream API
```

This approach provides:

* True CDC from the Postgres WAL
* Exactly-once delivery semantics
* Lower latency than batch polling

See the [Debezium documentation](https://debezium.io/) for setup instructions.

***

## Related

* [Stream API Reference](/definite-api/stream) - Full API documentation
* [Extractors](/extractors) - Definite's built-in managed connectors
* [Webhooks](/definite-api/webhooks) - Trigger blocks from external events
