Skip to main content
The Stream API lets you push JSON data directly into DuckLake tables. Send data via HTTPS, and Definite writes it to your data lake with automatic schema inference and partitioning. Both append and merge (upsert) ingestion modes are supported.

How it works

1

Send a POST

POST JSON or NDJSON data to the Stream API endpoint with your target table.
2

Authenticate

Include your API key in the Authorization header.
3

Data lands in DuckLake

Definite writes your data to an Iceberg table with automatic schema handling.
4

Query immediately

Your data is available for querying right away.

Endpoint

POST https://api.definite.app/v2/stream

Authentication

Include your API key in the Authorization header:
Authorization: Bearer YOUR_API_KEY
Your API key can be found in the bottom left user menu of the Definite app.

Request Body

Append

{
  "data": [
    {"id": 1, "name": "Alice", "email": "alice@example.com"},
    {"id": 2, "name": "Bob", "email": "bob@example.com"}
  ],
  "config": {
    "table": "bronze.customers",
    "mode": "append",
    "wait": false,
    "tags": {
      "source": "my-app"
    }
  }
}

Merge (upsert)

{
  "data": [
    {"id": 1, "name": "Alice", "email": "alice@new.com"},
    {"id": 2, "name": "Bob",   "email": "bob@new.com"}
  ],
  "config": {
    "table": "bronze.customers",
    "mode": "merge",
    "primary_key": ["id"]
  }
}

Fields

FieldTypeRequiredDescription
dataobject or arrayYesSingle record or array of records to ingest
config.tablestringYesTarget table in schema.table format (e.g., bronze.events)
config.modestringNoIngestion mode. append (default) or merge. merge requires primary_key.
config.primary_keyarray of stringRequired when mode="merge"List of exactly one column name to key the upsert on. Composite keys are not yet supported.
config.waitbooleanNoWait for commit and return snapshot ID. Default: false
config.tagsobjectNoOptional metadata tags for tracing

Ingestion modes

append

Inserts every row into the target table as-is. This is the default.

merge

A classic upsert/merge keyed on primary_key. Existing rows whose PK matches an incoming row are replaced; new rows are inserted. Use this when your source has a stable unique key. Constraints:
  • primary_key must be a list of exactly one column (v2 limitation; composite keys are planned).
  • Every incoming row must include the PK column with a non-null value.
  • Within-batch duplicate PK values cause the whole batch to be rejected — dedupe upstream before sending.
curl -X POST "https://api.definite.app/v2/stream" \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "data": [
      {"id": 1, "user_id": "u_42", "amount": 21.50},
      {"id": 2, "user_id": "u_77", "amount":  9.00}
    ],
    "config": {
      "table": "bronze.events",
      "mode": "merge",
      "primary_key": ["id"]
    }
  }'

Response

{
  "success": true,
  "request_id": "req_abc123def456",
  "stream_id": "st_xyz789ghi012",
  "table": "bronze.customers",
  "accepted": 2,
  "successful_rows": 2,
  "rejected_rows": 0,
  "partitions": ["2024-01-15"],
  "snapshot_id": null,
  "warnings": [],
  "errors": []
}

Response Fields

FieldDescription
successWhether the ingestion was successful
request_idUnique identifier for this request
stream_idUnique identifier for this stream
tableFully qualified table name
acceptedNumber of rows parsed and accepted
successful_rowsNumber of rows successfully written
rejected_rowsNumber of rows rejected due to validation
partitionsHuman-friendly partition summary
snapshot_idIceberg snapshot ID (present when wait=true)
warningsWarning messages
errorsError messages

Table format

config.table accepts two forms:
  • "namespace.table" (e.g., "bronze.events")
  • "LAKE.namespace.table" (e.g., "LAKE.bronze.events")
The target namespace and table must already exist. The endpoint does not create schemas or tables — if either is missing, the request returns 404.

Type coercion

Incoming values are cast to each target column’s declared type at write time, so the wire format is forgiving:
  • Mixed types in one column — an int and a string in the same VARCHAR column both insert cleanly as strings.
  • Boolean caveat — Python True / False becomes TRUE / FALSE in a BOOLEAN column, but lowercase 'true' / 'false' in a VARCHAR column. Send a string literal if case matters.

Limits

ParameterLimitDescription
Max payload size10 MBMaximum request body size
Max rows per request50,000Maximum number of records per request
Max field size1 MBMaximum size of any individual field
Max nested depth10Maximum JSON nesting depth

Content Types

The Stream API accepts:
  • JSON (application/json) - Single object or array of objects
  • NDJSON (application/x-ndjson) - Newline-delimited JSON

Compression

You can compress your payload to reduce transfer time:
Content-Encoding: gzip
or
Content-Encoding: zstd

Examples

Python

import httpx

API_KEY = "YOUR_API_KEY"
API_URL = "https://api.definite.app/v2/stream"

def push_to_definite(table: str, rows: list[dict]) -> dict:
    """Push data to Definite Stream API"""

    payload = {
        "data": rows,
        "config": {
            "table": table,
            "mode": "append",
        }
    }

    response = httpx.post(
        API_URL,
        json=payload,
        headers={"Authorization": f"Bearer {API_KEY}"},
        timeout=30.0
    )
    response.raise_for_status()
    return response.json()

# Example usage
result = push_to_definite(
    table="bronze.events",
    rows=[
        {"event_type": "page_view", "user_id": "123", "timestamp": "2024-01-15T10:30:00Z"},
        {"event_type": "click", "user_id": "123", "timestamp": "2024-01-15T10:31:00Z"},
    ]
)

print(f"Ingested {result['successful_rows']} rows")

Python merge (upsert)

import httpx

API_KEY = "YOUR_API_KEY"
API_URL = "https://api.definite.app/v2/stream"

def upsert_to_definite(table: str, rows: list[dict], primary_key: str) -> dict:
    """Upsert rows into Definite keyed on a single column"""

    payload = {
        "data": rows,
        "config": {
            "table": table,
            "mode": "merge",
            "primary_key": [primary_key],
        }
    }

    response = httpx.post(
        API_URL,
        json=payload,
        headers={"Authorization": f"Bearer {API_KEY}"},
        timeout=30.0
    )
    response.raise_for_status()
    return response.json()

# Existing rows with matching `id` are replaced; new rows are inserted.
result = upsert_to_definite(
    table="bronze.customers",
    rows=[
        {"id": 1, "name": "Alice", "email": "alice@new.com"},
        {"id": 2, "name": "Bob",   "email": "bob@new.com"},
    ],
    primary_key="id",
)

Python with compression

import gzip
import json
import httpx

def push_compressed(table: str, rows: list[dict]) -> dict:
    """Push gzip-compressed data to Definite"""

    payload = json.dumps({
        "data": rows,
        "config": {"table": table, "mode": "append"}
    }).encode("utf-8")

    compressed = gzip.compress(payload)

    response = httpx.post(
        "https://api.definite.app/v2/stream",
        content=compressed,
        headers={
            "Authorization": f"Bearer {API_KEY}",
            "Content-Type": "application/json",
            "Content-Encoding": "gzip",
        },
        timeout=30.0
    )
    response.raise_for_status()
    return response.json()

cURL

curl -X POST "https://api.definite.app/v2/stream" \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "data": [
      {"event_type": "signup", "user_id": "456"},
      {"event_type": "purchase", "user_id": "456", "amount": 99.99}
    ],
    "config": {
      "table": "bronze.events",
      "mode": "append"
    }
  }'

cURL with NDJSON

echo '{"event_type": "page_view", "user_id": "123"}
{"event_type": "click", "user_id": "123"}
{"event_type": "purchase", "user_id": "123"}' | \
curl -X POST "https://api.definite.app/v2/stream" \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/x-ndjson" \
  -d @-

Error Handling

HTTP Status Codes

StatusMeaning
200Success - data ingested
400Bad request - invalid JSON or schema
401Unauthorized - invalid or missing API key
404Table not found - namespace or table does not exist
413Payload too large - exceeds 10MB limit
422Invalid request - e.g., mode="merge" without primary_key
429Rate limited - too many requests
500Server error - retry with backoff

Retry Strategy

For transient errors (429, 5xx), implement exponential backoff:
import httpx
from httpx_retry import RetryTransport

transport = RetryTransport(
    retries=3,
    backoff_factor=0.5,
    status_forcelist=[429, 502, 503, 504],
)

client = httpx.Client(transport=transport, timeout=30.0)

Best Practices

  1. Batch your data - Send multiple records per request (up to 50,000) rather than one at a time
  2. Use compression - For large payloads, enable gzip compression to reduce transfer time
  3. Handle partial failures - Check rejected_rows in the response; some rows may fail validation
  4. Use mode="merge" for upserts - When your source has a stable unique key, use mode="merge" with primary_key instead of appending duplicates and deduping downstream
  5. Use appropriate tables - Organize data into logical tables (e.g., bronze.events, bronze.users)