> ## Documentation Index
> Fetch the complete documentation index at: https://docs.definite.app/llms.txt
> Use this file to discover all available pages before exploring further.

# Stream API

> Ingest JSON data directly into DuckLake via HTTPS

The Stream API lets you push JSON data directly into DuckLake tables. Send data via HTTPS, and Definite writes it to your data lake with automatic schema inference and partitioning. Both append and merge (upsert) ingestion modes are supported.

## How it works

<Steps>
  <Step title="Send a POST">
    POST JSON or NDJSON data to the Stream API endpoint with your target table.
  </Step>

  <Step title="Authenticate">
    Include your API key in the `Authorization` header.
  </Step>

  <Step title="Data lands in DuckLake">
    Definite writes your data to an Iceberg table with automatic schema handling.
  </Step>

  <Step title="Query immediately">
    Your data is available for querying right away.
  </Step>
</Steps>

***

## Endpoint

**POST** `https://api.definite.app/v2/stream`

***

## Authentication

Include your API key in the Authorization header:

```
Authorization: Bearer YOUR_API_KEY
```

Your API key can be found in the bottom left user menu of the Definite app.

***

## Request Body

### Append

```json theme={null}
{
  "data": [
    {"id": 1, "name": "Alice", "email": "alice@example.com"},
    {"id": 2, "name": "Bob", "email": "bob@example.com"}
  ],
  "config": {
    "table": "bronze.customers",
    "mode": "append",
    "wait": false,
    "tags": {
      "source": "my-app"
    }
  }
}
```

### Merge (upsert)

```json theme={null}
{
  "data": [
    {"id": 1, "name": "Alice", "email": "alice@new.com"},
    {"id": 2, "name": "Bob",   "email": "bob@new.com"}
  ],
  "config": {
    "table": "bronze.customers",
    "mode": "merge",
    "primary_key": ["id"]
  }
}
```

### Fields

| Field                | Type            | Required                     | Description                                                                                     |
| -------------------- | --------------- | ---------------------------- | ----------------------------------------------------------------------------------------------- |
| `data`               | object or array | Yes                          | Single record or array of records to ingest                                                     |
| `config.table`       | string          | Yes                          | Target table in `schema.table` format (e.g., `bronze.events`)                                   |
| `config.mode`        | string          | No                           | Ingestion mode. `append` (default) or `merge`. `merge` requires `primary_key`.                  |
| `config.primary_key` | array of string | Required when `mode="merge"` | List of exactly **one** column name to key the upsert on. Composite keys are not yet supported. |
| `config.wait`        | boolean         | No                           | Wait for commit and return snapshot ID. Default: `false`                                        |
| `config.tags`        | object          | No                           | Optional metadata tags for tracing                                                              |

***

## Ingestion modes

### `append`

Inserts every row into the target table as-is. This is the default.

### `merge`

A classic upsert/merge keyed on `primary_key`. Existing rows whose PK matches an incoming row are replaced; new rows are inserted. Use this when your source has a stable unique key.

**Constraints:**

* `primary_key` must be a list of exactly one column (v2 limitation; composite keys are planned).
* Every incoming row must include the PK column with a non-null value.
* Within-batch duplicate PK values cause the **whole batch** to be rejected — dedupe upstream before sending.

```bash theme={null}
curl -X POST "https://api.definite.app/v2/stream" \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "data": [
      {"id": 1, "user_id": "u_42", "amount": 21.50},
      {"id": 2, "user_id": "u_77", "amount":  9.00}
    ],
    "config": {
      "table": "bronze.events",
      "mode": "merge",
      "primary_key": ["id"]
    }
  }'
```

***

## Response

```json theme={null}
{
  "success": true,
  "request_id": "req_abc123def456",
  "stream_id": "st_xyz789ghi012",
  "table": "bronze.customers",
  "accepted": 2,
  "successful_rows": 2,
  "rejected_rows": 0,
  "partitions": ["2024-01-15"],
  "snapshot_id": null,
  "warnings": [],
  "errors": []
}
```

### Response Fields

| Field             | Description                                    |
| ----------------- | ---------------------------------------------- |
| `success`         | Whether the ingestion was successful           |
| `request_id`      | Unique identifier for this request             |
| `stream_id`       | Unique identifier for this stream              |
| `table`           | Fully qualified table name                     |
| `accepted`        | Number of rows parsed and accepted             |
| `successful_rows` | Number of rows successfully written            |
| `rejected_rows`   | Number of rows rejected due to validation      |
| `partitions`      | Human-friendly partition summary               |
| `snapshot_id`     | Iceberg snapshot ID (present when `wait=true`) |
| `warnings`        | Warning messages                               |
| `errors`          | Error messages                                 |

***

## Table format

`config.table` accepts two forms:

* `"namespace.table"` (e.g., `"bronze.events"`)
* `"LAKE.namespace.table"` (e.g., `"LAKE.bronze.events"`)

The target namespace and table must already exist. The endpoint **does not** create schemas or tables — if either is missing, the request returns `404`.

***

## Type coercion

Incoming values are cast to each target column's declared type at write time, so the wire format is forgiving:

* **Mixed types in one column** — an `int` and a `string` in the same `VARCHAR` column both insert cleanly as strings.
* **Boolean caveat** — Python `True` / `False` becomes `TRUE` / `FALSE` in a `BOOLEAN` column, but lowercase `'true'` / `'false'` in a `VARCHAR` column. Send a string literal if case matters.

***

## Limits

| Parameter            | Limit  | Description                           |
| -------------------- | ------ | ------------------------------------- |
| Max payload size     | 10 MB  | Maximum request body size             |
| Max rows per request | 50,000 | Maximum number of records per request |
| Max field size       | 1 MB   | Maximum size of any individual field  |
| Max nested depth     | 10     | Maximum JSON nesting depth            |

***

## Content Types

The Stream API accepts:

* **JSON** (`application/json`) - Single object or array of objects
* **NDJSON** (`application/x-ndjson`) - Newline-delimited JSON

### Compression

You can compress your payload to reduce transfer time:

```
Content-Encoding: gzip
```

or

```
Content-Encoding: zstd
```

***

## Examples

### Python

```python theme={null}
import httpx

API_KEY = "YOUR_API_KEY"
API_URL = "https://api.definite.app/v2/stream"

def push_to_definite(table: str, rows: list[dict]) -> dict:
    """Push data to Definite Stream API"""

    payload = {
        "data": rows,
        "config": {
            "table": table,
            "mode": "append",
        }
    }

    response = httpx.post(
        API_URL,
        json=payload,
        headers={"Authorization": f"Bearer {API_KEY}"},
        timeout=30.0
    )
    response.raise_for_status()
    return response.json()

# Example usage
result = push_to_definite(
    table="bronze.events",
    rows=[
        {"event_type": "page_view", "user_id": "123", "timestamp": "2024-01-15T10:30:00Z"},
        {"event_type": "click", "user_id": "123", "timestamp": "2024-01-15T10:31:00Z"},
    ]
)

print(f"Ingested {result['successful_rows']} rows")
```

### Python merge (upsert)

```python theme={null}
import httpx

API_KEY = "YOUR_API_KEY"
API_URL = "https://api.definite.app/v2/stream"

def upsert_to_definite(table: str, rows: list[dict], primary_key: str) -> dict:
    """Upsert rows into Definite keyed on a single column"""

    payload = {
        "data": rows,
        "config": {
            "table": table,
            "mode": "merge",
            "primary_key": [primary_key],
        }
    }

    response = httpx.post(
        API_URL,
        json=payload,
        headers={"Authorization": f"Bearer {API_KEY}"},
        timeout=30.0
    )
    response.raise_for_status()
    return response.json()

# Existing rows with matching `id` are replaced; new rows are inserted.
result = upsert_to_definite(
    table="bronze.customers",
    rows=[
        {"id": 1, "name": "Alice", "email": "alice@new.com"},
        {"id": 2, "name": "Bob",   "email": "bob@new.com"},
    ],
    primary_key="id",
)
```

### Python with compression

```python theme={null}
import gzip
import json
import httpx

def push_compressed(table: str, rows: list[dict]) -> dict:
    """Push gzip-compressed data to Definite"""

    payload = json.dumps({
        "data": rows,
        "config": {"table": table, "mode": "append"}
    }).encode("utf-8")

    compressed = gzip.compress(payload)

    response = httpx.post(
        "https://api.definite.app/v2/stream",
        content=compressed,
        headers={
            "Authorization": f"Bearer {API_KEY}",
            "Content-Type": "application/json",
            "Content-Encoding": "gzip",
        },
        timeout=30.0
    )
    response.raise_for_status()
    return response.json()
```

### cURL

```bash theme={null}
curl -X POST "https://api.definite.app/v2/stream" \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "data": [
      {"event_type": "signup", "user_id": "456"},
      {"event_type": "purchase", "user_id": "456", "amount": 99.99}
    ],
    "config": {
      "table": "bronze.events",
      "mode": "append"
    }
  }'
```

### cURL with NDJSON

```bash theme={null}
echo '{"event_type": "page_view", "user_id": "123"}
{"event_type": "click", "user_id": "123"}
{"event_type": "purchase", "user_id": "123"}' | \
curl -X POST "https://api.definite.app/v2/stream" \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/x-ndjson" \
  -d @-
```

***

## Error Handling

### HTTP Status Codes

| Status | Meaning                                                      |
| ------ | ------------------------------------------------------------ |
| `200`  | Success - data ingested                                      |
| `400`  | Bad request - invalid JSON or schema                         |
| `401`  | Unauthorized - invalid or missing API key                    |
| `404`  | Table not found - namespace or table does not exist          |
| `413`  | Payload too large - exceeds 10MB limit                       |
| `422`  | Invalid request - e.g., `mode="merge"` without `primary_key` |
| `429`  | Rate limited - too many requests                             |
| `500`  | Server error - retry with backoff                            |

### Retry Strategy

For transient errors (429, 5xx), implement exponential backoff:

```python theme={null}
import httpx
from httpx_retry import RetryTransport

transport = RetryTransport(
    retries=3,
    backoff_factor=0.5,
    status_forcelist=[429, 502, 503, 504],
)

client = httpx.Client(transport=transport, timeout=30.0)
```

***

## Best Practices

1. **Batch your data** - Send multiple records per request (up to 50,000) rather than one at a time
2. **Use compression** - For large payloads, enable gzip compression to reduce transfer time
3. **Handle partial failures** - Check `rejected_rows` in the response; some rows may fail validation
4. **Use `mode="merge"` for upserts** - When your source has a stable unique key, use `mode="merge"` with `primary_key` instead of appending duplicates and deduping downstream
5. **Use appropriate tables** - Organize data into logical tables (e.g., `bronze.events`, `bronze.users`)

***

## Related

* [Push-Based Data Ingestion](/definite-api/push-based-ingestion) - Run your own extractor for sensitive deployments
* [Webhooks](/definite-api/webhooks) - Trigger Definite blocks from external events
