Skip to main content
Push-based ingestion lets you run data extraction on your own infrastructure while still landing data in Definite’s DuckLake. This pattern is ideal for organizations with strict data residency requirements or databases that aren’t accessible from the public internet.

When to Use This Pattern

Data Residency

Your compliance requirements mandate that raw data stays within your network

Private Networks

Your database is behind a firewall or in a private VPC with no public access

Custom Extraction

You need custom transformation logic before data leaves your environment

Existing CDC Pipeline

You already have Debezium, Fivetran, or another CDC tool running internally
If your database is publicly accessible and you don’t have data residency requirements, consider using Definite’s built-in connectors instead. They’re fully managed and require no infrastructure on your end.

Architecture Overview

With push-based ingestion, you run the data extraction within your own infrastructure. Only the extracted data is sent to Definite via HTTPS.

Your Infrastructure

Your Database (Postgres, MySQL, etc.) lives in your VPC or private network.

Your Extractor

You run an Extractor (Lambda, ECS, EC2, or any compute) that queries your database and formats the data.

HTTPS to Definite

Your extractor sends data via HTTPS POST to api.definite.app/v2/stream. Only outbound traffic—no inbound connections required.

DuckLake

Data lands in DuckLake (Iceberg tables on GCS) and is immediately queryable.
Key benefits:
  • Raw data never leaves your network
  • You control the extraction schedule and logic
  • Only outbound HTTPS traffic required (no inbound connections)
  • Works with any database or data source

Example: Postgres Incremental Sync

This example shows a Python script that incrementally syncs data from Postgres to Definite. It uses a watermark column (like updated_at) to only sync changed rows.

Full Implementation

#!/usr/bin/env python3
"""
Incremental sync from Postgres to Definite.
Extracts rows modified since last sync and pushes to Stream API.
"""

import json
import os
from datetime import datetime, timezone
from pathlib import Path

import httpx
import psycopg

# Configuration
DEFINITE_API_URL = "https://api.definite.app/v2/stream"
DEFINITE_API_KEY = os.environ["DEFINITE_API_KEY"]
POSTGRES_DSN = os.environ["POSTGRES_DSN"]
STATE_FILE = Path("/tmp/sync_state.json")
BATCH_SIZE = 1000

# Tables to sync: (source_table, destination_table, watermark_column)
TABLES = [
    ("public.customers", "bronze.customers", "updated_at"),
    ("public.orders", "bronze.orders", "updated_at"),
    ("public.products", "bronze.products", "updated_at"),
]


def load_state() -> dict:
    """Load sync state from disk"""
    if STATE_FILE.exists():
        return json.loads(STATE_FILE.read_text())
    return {}


def save_state(state: dict) -> None:
    """Save sync state to disk"""
    STATE_FILE.write_text(json.dumps(state, default=str))


def push_to_definite(table: str, rows: list[dict]) -> dict:
    """Push batch to Definite Stream API"""
    response = httpx.post(
        DEFINITE_API_URL,
        json={
            "data": rows,
            "config": {
                "table": table,
                "mode": "append",
                "tags": {"source": "postgres-sync"},
            },
        },
        headers={"Authorization": f"Bearer {DEFINITE_API_KEY}"},
        timeout=60.0,
    )
    response.raise_for_status()
    return response.json()


def sync_table(
    source: str,
    dest: str,
    watermark_col: str,
    last_sync: datetime | None,
) -> datetime | None:
    """Sync a single table, returning the new watermark"""

    # Build query
    if last_sync:
        query = f"""
            SELECT * FROM {source}
            WHERE {watermark_col} > %s
            ORDER BY {watermark_col}
        """
        params = (last_sync,)
    else:
        # Initial full sync
        query = f"SELECT * FROM {source} ORDER BY {watermark_col}"
        params = ()

    max_watermark = last_sync
    total_rows = 0

    with psycopg.connect(POSTGRES_DSN) as conn:
        with conn.cursor() as cur:
            cur.execute(query, params)
            columns = [desc.name for desc in cur.description]

            batch = []
            for record in cur:
                row = dict(zip(columns, record))

                # Track max watermark
                row_watermark = row.get(watermark_col)
                if row_watermark and (not max_watermark or row_watermark > max_watermark):
                    max_watermark = row_watermark

                # Convert datetime to ISO format for JSON
                for key, value in row.items():
                    if isinstance(value, datetime):
                        row[key] = value.isoformat()

                batch.append(row)

                # Push batch when full
                if len(batch) >= BATCH_SIZE:
                    result = push_to_definite(dest, batch)
                    total_rows += result["successful_rows"]
                    print(f"  Pushed {result['successful_rows']} rows to {dest}")
                    batch = []

            # Push remaining rows
            if batch:
                result = push_to_definite(dest, batch)
                total_rows += result["successful_rows"]
                print(f"  Pushed {result['successful_rows']} rows to {dest}")

    print(f"  Total: {total_rows} rows synced")
    return max_watermark


def main():
    """Run incremental sync for all configured tables"""
    state = load_state()

    for source, dest, watermark_col in TABLES:
        print(f"Syncing {source} -> {dest}")

        last_sync = state.get(source)
        if last_sync:
            last_sync = datetime.fromisoformat(last_sync)

        new_watermark = sync_table(source, dest, watermark_col, last_sync)

        if new_watermark:
            state[source] = new_watermark.isoformat()

    save_state(state)
    print("Sync complete!")


if __name__ == "__main__":
    main()

Environment Variables

export DEFINITE_API_KEY="your-api-key"
export POSTGRES_DSN="postgresql://user:pass@localhost:5432/mydb"

Handling Deletes and Updates

The Stream API uses append-only semantics. To handle updates and deletes from your source database, include CDC metadata in your records:

Include CDC Operation Type

# When extracting, add CDC metadata
row["_cdc_op"] = "u"  # c=create, u=update, d=delete
row["_cdc_ts"] = datetime.now(timezone.utc).isoformat()

Create a View for Current State

In DuckLake, create a view that shows only the current state of each record:
CREATE VIEW silver.customers AS
SELECT * FROM (
    SELECT
        *,
        ROW_NUMBER() OVER (
            PARTITION BY id
            ORDER BY _cdc_ts DESC
        ) as _row_num
    FROM bronze.customers
    WHERE _cdc_op != 'd'
)
WHERE _row_num = 1;
This pattern gives you:
  • Full history in the bronze layer
  • Current state in the silver layer
  • Ability to query point-in-time snapshots

Deployment Options

AWS Lambda (Scheduled)

Run your sync on a schedule using EventBridge:
# handler.py
def lambda_handler(event, context):
    main()
    return {"statusCode": 200, "body": "Sync complete"}
# serverless.yml
functions:
  sync:
    handler: handler.lambda_handler
    timeout: 900
    events:
      - schedule: rate(5 minutes)
    environment:
      DEFINITE_API_KEY: ${ssm:/definite/api-key}
      POSTGRES_DSN: ${ssm:/postgres/dsn}

ECS Fargate (Scheduled Task)

{
  "family": "postgres-sync",
  "containerDefinitions": [{
    "name": "sync",
    "image": "your-registry/postgres-sync:latest",
    "environment": [
      {"name": "DEFINITE_API_KEY", "value": "your-key"},
      {"name": "POSTGRES_DSN", "value": "your-dsn"}
    ]
  }]
}
Schedule with EventBridge or CloudWatch Events.

EC2 / On-Premise (Cron)

# /etc/cron.d/postgres-sync
*/5 * * * * ubuntu /opt/sync/run.sh >> /var/log/sync.log 2>&1

Error Handling

Implement retry logic with exponential backoff for transient failures:
import time
from httpx import HTTPStatusError

def push_with_retry(table: str, rows: list[dict], max_retries: int = 3) -> dict:
    """Push with exponential backoff retry"""
    for attempt in range(max_retries):
        try:
            return push_to_definite(table, rows)
        except HTTPStatusError as e:
            if e.response.status_code >= 500 or e.response.status_code == 429:
                # Transient error - retry
                wait = 2 ** attempt
                print(f"Retry {attempt + 1}/{max_retries} in {wait}s...")
                time.sleep(wait)
            else:
                # Client error - don't retry
                raise

    raise Exception(f"Failed after {max_retries} retries")

Dead Letter Queue

For production deployments, consider sending failed batches to a dead letter queue:
import boto3

sqs = boto3.client("sqs")
DLQ_URL = os.environ.get("DLQ_URL")

def push_with_dlq(table: str, rows: list[dict]) -> dict:
    try:
        return push_with_retry(table, rows)
    except Exception as e:
        if DLQ_URL:
            sqs.send_message(
                QueueUrl=DLQ_URL,
                MessageBody=json.dumps({
                    "table": table,
                    "rows": rows,
                    "error": str(e),
                })
            )
        raise

Monitoring

Track these metrics to ensure your sync is healthy:
MetricDescription
rows_extractedRows read from source database
rows_pushedRows sent to Definite
push_latency_msTime to push each batch
sync_lag_secondsTime since last successful sync
errors_totalFailed push attempts

Health Check Endpoint

If running as a service, expose a health endpoint:
from fastapi import FastAPI

app = FastAPI()

@app.get("/health")
def health():
    return {
        "status": "healthy",
        "last_sync": load_state(),
        "tables": len(TABLES),
    }

Alternative: Debezium + Kafka

For high-volume, real-time CDC, you can use Debezium with a Kafka HTTP Sink connector:
Postgres → Debezium → Kafka → HTTP Sink → Definite Stream API
This approach provides:
  • True CDC from the Postgres WAL
  • Exactly-once delivery semantics
  • Lower latency than batch polling
See the Debezium documentation for setup instructions.