Documentation Index Fetch the complete documentation index at: https://docs.definite.app/llms.txt
Use this file to discover all available pages before exploring further.
Push-based ingestion lets you run data extraction on your own infrastructure while still landing data in Definite’s DuckLake. This pattern is ideal for organizations with strict data residency requirements or databases that aren’t accessible from the public internet.
When to Use This Pattern
Data Residency Your compliance requirements mandate that raw data stays within your network
Private Networks Your database is behind a firewall or in a private VPC with no public access
Custom Extraction You need custom transformation logic before data leaves your environment
Existing CDC Pipeline You already have Debezium, Fivetran, or another CDC tool running internally
If your database is publicly accessible and you don’t have data residency requirements, consider using Definite’s built-in connectors instead. They’re fully managed and require no infrastructure on your end.
Architecture Overview
With push-based ingestion, you run the data extraction within your own infrastructure. Only the extracted data is sent to Definite via HTTPS.
Your Infrastructure
Your Database (Postgres, MySQL, etc.) lives in your VPC or private network.
Your Extractor
You run an Extractor (Lambda, ECS, EC2, or any compute) that queries your database and formats the data.
HTTPS to Definite
Your extractor sends data via HTTPS POST to api.definite.app/v2/stream. Only outbound traffic—no inbound connections required.
DuckLake
Data lands in DuckLake (Iceberg tables on GCS) and is immediately queryable.
Key benefits:
Raw data never leaves your network
You control the extraction schedule and logic
Only outbound HTTPS traffic required (no inbound connections)
Works with any database or data source
Example: Postgres Incremental Sync
This example shows a Python script that incrementally syncs data from Postgres to Definite. It uses a watermark column (like updated_at) to only sync changed rows.
Full Implementation
#!/usr/bin/env python3
"""
Incremental sync from Postgres to Definite.
Extracts rows modified since last sync and pushes to Stream API.
"""
import json
import os
from datetime import datetime, timezone
from pathlib import Path
import httpx
import psycopg
# Configuration
DEFINITE_API_URL = "https://api.definite.app/v2/stream"
DEFINITE_API_KEY = os.environ[ "DEFINITE_API_KEY" ]
POSTGRES_DSN = os.environ[ "POSTGRES_DSN" ]
STATE_FILE = Path( "/tmp/sync_state.json" )
BATCH_SIZE = 1000
# Tables to sync: (source_table, destination_table, watermark_column)
TABLES = [
( "public.customers" , "bronze.customers" , "updated_at" ),
( "public.orders" , "bronze.orders" , "updated_at" ),
( "public.products" , "bronze.products" , "updated_at" ),
]
def load_state () -> dict :
"""Load sync state from disk"""
if STATE_FILE .exists():
return json.loads( STATE_FILE .read_text())
return {}
def save_state ( state : dict ) -> None :
"""Save sync state to disk"""
STATE_FILE .write_text(json.dumps(state, default = str ))
def push_to_definite ( table : str , rows : list[ dict ]) -> dict :
"""Push batch to Definite Stream API"""
response = httpx.post(
DEFINITE_API_URL ,
json = {
"data" : rows,
"config" : {
"table" : table,
"mode" : "append" ,
"tags" : { "source" : "postgres-sync" },
},
},
headers = { "Authorization" : f "Bearer { DEFINITE_API_KEY } " },
timeout = 60.0 ,
)
response.raise_for_status()
return response.json()
def sync_table (
source : str ,
dest : str ,
watermark_col : str ,
last_sync : datetime | None ,
) -> datetime | None :
"""Sync a single table, returning the new watermark"""
# Build query
if last_sync:
query = f """
SELECT * FROM { source }
WHERE { watermark_col } > %s
ORDER BY { watermark_col }
"""
params = (last_sync,)
else :
# Initial full sync
query = f "SELECT * FROM { source } ORDER BY { watermark_col } "
params = ()
max_watermark = last_sync
total_rows = 0
with psycopg.connect( POSTGRES_DSN ) as conn:
with conn.cursor() as cur:
cur.execute(query, params)
columns = [desc.name for desc in cur.description]
batch = []
for record in cur:
row = dict ( zip (columns, record))
# Track max watermark
row_watermark = row.get(watermark_col)
if row_watermark and ( not max_watermark or row_watermark > max_watermark):
max_watermark = row_watermark
# Convert datetime to ISO format for JSON
for key, value in row.items():
if isinstance (value, datetime):
row[key] = value.isoformat()
batch.append(row)
# Push batch when full
if len (batch) >= BATCH_SIZE :
result = push_to_definite(dest, batch)
total_rows += result[ "successful_rows" ]
print ( f " Pushed { result[ 'successful_rows' ] } rows to { dest } " )
batch = []
# Push remaining rows
if batch:
result = push_to_definite(dest, batch)
total_rows += result[ "successful_rows" ]
print ( f " Pushed { result[ 'successful_rows' ] } rows to { dest } " )
print ( f " Total: { total_rows } rows synced" )
return max_watermark
def main ():
"""Run incremental sync for all configured tables"""
state = load_state()
for source, dest, watermark_col in TABLES :
print ( f "Syncing { source } -> { dest } " )
last_sync = state.get(source)
if last_sync:
last_sync = datetime.fromisoformat(last_sync)
new_watermark = sync_table(source, dest, watermark_col, last_sync)
if new_watermark:
state[source] = new_watermark.isoformat()
save_state(state)
print ( "Sync complete!" )
if __name__ == "__main__" :
main()
Environment Variables
export DEFINITE_API_KEY = "your-api-key"
export POSTGRES_DSN = "postgresql://user:pass@localhost:5432/mydb"
Handling Deletes and Updates
The Stream API uses append-only semantics. To handle updates and deletes from your source database, include CDC metadata in your records:
Include CDC Operation Type
# When extracting, add CDC metadata
row[ "_cdc_op" ] = "u" # c=create, u=update, d=delete
row[ "_cdc_ts" ] = datetime.now(timezone.utc).isoformat()
Create a View for Current State
In DuckLake, create a view that shows only the current state of each record:
CREATE VIEW silver .customers AS
SELECT * FROM (
SELECT
* ,
ROW_NUMBER () OVER (
PARTITION BY id
ORDER BY _cdc_ts DESC
) as _row_num
FROM bronze . customers
WHERE _cdc_op != 'd'
)
WHERE _row_num = 1 ;
This pattern gives you:
Full history in the bronze layer
Current state in the silver layer
Ability to query point-in-time snapshots
Deployment Options
AWS Lambda (Scheduled)
Run your sync on a schedule using EventBridge:
# handler.py
def lambda_handler ( event , context ):
main()
return { "statusCode" : 200 , "body" : "Sync complete" }
# serverless.yml
functions :
sync :
handler : handler.lambda_handler
timeout : 900
events :
- schedule : rate(5 minutes)
environment :
DEFINITE_API_KEY : ${ssm:/definite/api-key}
POSTGRES_DSN : ${ssm:/postgres/dsn}
ECS Fargate (Scheduled Task)
{
"family" : "postgres-sync" ,
"containerDefinitions" : [{
"name" : "sync" ,
"image" : "your-registry/postgres-sync:latest" ,
"environment" : [
{ "name" : "DEFINITE_API_KEY" , "value" : "your-key" },
{ "name" : "POSTGRES_DSN" , "value" : "your-dsn" }
]
}]
}
Schedule with EventBridge or CloudWatch Events.
EC2 / On-Premise (Cron)
# /etc/cron.d/postgres-sync
* /5 * * * * ubuntu /opt/sync/run.sh >> /var/log/sync.log 2>&1
Error Handling
Implement retry logic with exponential backoff for transient failures:
import time
from httpx import HTTPStatusError
def push_with_retry ( table : str , rows : list[ dict ], max_retries : int = 3 ) -> dict :
"""Push with exponential backoff retry"""
for attempt in range (max_retries):
try :
return push_to_definite(table, rows)
except HTTPStatusError as e:
if e.response.status_code >= 500 or e.response.status_code == 429 :
# Transient error - retry
wait = 2 ** attempt
print ( f "Retry { attempt + 1 } / { max_retries } in { wait } s..." )
time.sleep(wait)
else :
# Client error - don't retry
raise
raise Exception ( f "Failed after { max_retries } retries" )
Dead Letter Queue
For production deployments, consider sending failed batches to a dead letter queue:
import boto3
sqs = boto3.client( "sqs" )
DLQ_URL = os.environ.get( "DLQ_URL" )
def push_with_dlq ( table : str , rows : list[ dict ]) -> dict :
try :
return push_with_retry(table, rows)
except Exception as e:
if DLQ_URL :
sqs.send_message(
QueueUrl = DLQ_URL ,
MessageBody = json.dumps({
"table" : table,
"rows" : rows,
"error" : str (e),
})
)
raise
Monitoring
Track these metrics to ensure your sync is healthy:
Metric Description rows_extractedRows read from source database rows_pushedRows sent to Definite push_latency_msTime to push each batch sync_lag_secondsTime since last successful sync errors_totalFailed push attempts
Health Check Endpoint
If running as a service, expose a health endpoint:
from fastapi import FastAPI
app = FastAPI()
@app.get ( "/health" )
def health ():
return {
"status" : "healthy" ,
"last_sync" : load_state(),
"tables" : len ( TABLES ),
}
Alternative: Debezium + Kafka
For high-volume, real-time CDC, you can use Debezium with a Kafka HTTP Sink connector:
Postgres → Debezium → Kafka → HTTP Sink → Definite Stream API
This approach provides:
True CDC from the Postgres WAL
Exactly-once delivery semantics
Lower latency than batch polling
See the Debezium documentation for setup instructions.