Skip to main content
The Stream API lets you push JSON data directly into DuckLake tables. Send data via HTTPS, and Definite writes it to your data lake with automatic schema inference and partitioning.

How it works

1

Send a POST

POST JSON or NDJSON data to the Stream API endpoint with your target table.
2

Authenticate

Include your API key in the Authorization header.
3

Data lands in DuckLake

Definite writes your data to an Iceberg table with automatic schema handling.
4

Query immediately

Your data is available for querying right away.

Endpoint

POST https://api.definite.app/v2/stream

Authentication

Include your API key in the Authorization header:
Authorization: Bearer YOUR_API_KEY
Your API key can be found in the bottom left user menu of the Definite app.

Request Body

{
  "data": [
    {"id": 1, "name": "Alice", "email": "[email protected]"},
    {"id": 2, "name": "Bob", "email": "[email protected]"}
  ],
  "config": {
    "table": "bronze.customers",
    "mode": "append",
    "wait": false,
    "tags": {
      "source": "my-app"
    }
  }
}

Fields

FieldTypeRequiredDescription
dataobject or arrayYesSingle record or array of records to ingest
config.tablestringYesTarget table in schema.table format (e.g., bronze.events)
config.modestringNoIngestion mode. Only append is supported. Default: append
config.waitbooleanNoWait for commit and return snapshot ID. Default: false
config.tagsobjectNoOptional metadata tags for tracing

Response

{
  "success": true,
  "request_id": "req_abc123def456",
  "stream_id": "st_xyz789ghi012",
  "table": "bronze.customers",
  "accepted": 2,
  "successful_rows": 2,
  "rejected_rows": 0,
  "partitions": ["2024-01-15"],
  "snapshot_id": null,
  "warnings": [],
  "errors": []
}

Response Fields

FieldDescription
successWhether the ingestion was successful
request_idUnique identifier for this request
stream_idUnique identifier for this stream
tableFully qualified table name
acceptedNumber of rows parsed and accepted
successful_rowsNumber of rows successfully written
rejected_rowsNumber of rows rejected due to validation
partitionsHuman-friendly partition summary
snapshot_idIceberg snapshot ID (present when wait=true)
warningsWarning messages
errorsError messages

Limits

ParameterLimitDescription
Max payload size10 MBMaximum request body size
Max rows per request50,000Maximum number of records per request
Max field size1 MBMaximum size of any individual field
Max nested depth10Maximum JSON nesting depth

Content Types

The Stream API accepts:
  • JSON (application/json) - Single object or array of objects
  • NDJSON (application/x-ndjson) - Newline-delimited JSON

Compression

You can compress your payload to reduce transfer time:
Content-Encoding: gzip
or
Content-Encoding: zstd

Examples

Python

import httpx

API_KEY = "YOUR_API_KEY"
API_URL = "https://api.definite.app/v2/stream"

def push_to_definite(table: str, rows: list[dict]) -> dict:
    """Push data to Definite Stream API"""

    payload = {
        "data": rows,
        "config": {
            "table": table,
            "mode": "append",
        }
    }

    response = httpx.post(
        API_URL,
        json=payload,
        headers={"Authorization": f"Bearer {API_KEY}"},
        timeout=30.0
    )
    response.raise_for_status()
    return response.json()

# Example usage
result = push_to_definite(
    table="bronze.events",
    rows=[
        {"event_type": "page_view", "user_id": "123", "timestamp": "2024-01-15T10:30:00Z"},
        {"event_type": "click", "user_id": "123", "timestamp": "2024-01-15T10:31:00Z"},
    ]
)

print(f"Ingested {result['successful_rows']} rows")

Python with compression

import gzip
import json
import httpx

def push_compressed(table: str, rows: list[dict]) -> dict:
    """Push gzip-compressed data to Definite"""

    payload = json.dumps({
        "data": rows,
        "config": {"table": table, "mode": "append"}
    }).encode("utf-8")

    compressed = gzip.compress(payload)

    response = httpx.post(
        "https://api.definite.app/v2/stream",
        content=compressed,
        headers={
            "Authorization": f"Bearer {API_KEY}",
            "Content-Type": "application/json",
            "Content-Encoding": "gzip",
        },
        timeout=30.0
    )
    response.raise_for_status()
    return response.json()

cURL

curl -X POST "https://api.definite.app/v2/stream" \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "data": [
      {"event_type": "signup", "user_id": "456"},
      {"event_type": "purchase", "user_id": "456", "amount": 99.99}
    ],
    "config": {
      "table": "bronze.events",
      "mode": "append"
    }
  }'

cURL with NDJSON

echo '{"event_type": "page_view", "user_id": "123"}
{"event_type": "click", "user_id": "123"}
{"event_type": "purchase", "user_id": "123"}' | \
curl -X POST "https://api.definite.app/v2/stream" \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/x-ndjson" \
  -d @-

Error Handling

HTTP Status Codes

StatusMeaning
200Success - data ingested
400Bad request - invalid JSON or schema
401Unauthorized - invalid or missing API key
413Payload too large - exceeds 10MB limit
429Rate limited - too many requests
500Server error - retry with backoff

Retry Strategy

For transient errors (429, 5xx), implement exponential backoff:
import httpx
from httpx_retry import RetryTransport

transport = RetryTransport(
    retries=3,
    backoff_factor=0.5,
    status_forcelist=[429, 502, 503, 504],
)

client = httpx.Client(transport=transport, timeout=30.0)

Best Practices

  1. Batch your data - Send multiple records per request (up to 50,000) rather than one at a time
  2. Use compression - For large payloads, enable gzip compression to reduce transfer time
  3. Handle partial failures - Check rejected_rows in the response; some rows may fail validation
  4. Include idempotency keys - Add a unique ID field to your records for deduplication
  5. Use appropriate tables - Organize data into logical tables (e.g., bronze.events, bronze.users)