# AsyncBase — full LLM documentation
Single-file concatenation of every doc under /docs/llm/. Topological order: concepts → operations → SDKs.


<!-- ──── overview.md ──── -->

# AsyncBase overview

AsyncBase is a managed queue SaaS. One HTTP/JSON API, four SDKs, Bearer auth.

## Mental model

- **Queue**: a named stream of messages. Created on first enqueue. No schema.
- **Message**: `{ id, payload, enqueued_at, attempt, ack_deadline }`.
  Payload is arbitrary JSON ≤ 256 KB.
- **Consumer group**: a named cursor into a queue. Different groups get
  independent copies (like Kafka). Same group → load-balanced across
  consumers in it.
- **Consumer**: a process inside a group. Identified by an opaque string.
- **DLQ**: Dead Letter Queue. A message lands here after N nacks (default 3).
  Redrive moves it back to the stream.

## Semantics in one sentence each

- **Delivery**: at-least-once. Always handle idempotency in your consumer.
- **Ordering**: per-queue FIFO by default for enqueue order, but delivery
  across consumers is concurrent. Strict per-key FIFO requires `fifo_group`.
- **Retries**: exponential backoff `2^attempt × 1000 ms`, default 3 attempts,
  then DLQ.
- **Visibility timeout**: 30 s default. Call `heartbeat` to extend.
- **Idempotency**: `Idempotency-Key` header on `send` dedups for 24 h.

## Base URL

`https://api.asyncbase.dev`

## Auth

`Authorization: Bearer sk_live_...` or `sk_test_...`. See [auth.md](./auth.md).

## Rate limits (per tenant)

| Plan | req/s | queues | msgs/mo |
|------|-------|--------|---------|
| free | 10 | 3 | 10 000 |
| pro | 100 | 50 | 1 000 000 |
| scale | 2 000 | ∞ | 50 000 000 |

429 response carries `Retry-After: 1`. See [rate-limits.md](./rate-limits.md).


<!-- ──── auth.md ──── -->

# Authentication

Bearer auth. No cookies, no sessions.

## API key format

- Live: `sk_live_[A-Za-z0-9_-]+`
- Test: `sk_test_[A-Za-z0-9_-]+`

`sk_test_*` keys hit the same infra but a separate plan counter so you can
exercise the API without eating production quota.

## Request header

```
Authorization: Bearer sk_live_...
```

## Failure codes

| HTTP | code | Meaning |
|------|------|---------|
| 401 | `AUTH_INVALID` | header missing, wrong format, or key not in DB |
| 401 | `AUTH_KEY_EXPIRED` | key was revoked or rotated past the 1 h grace |
| 503 | `INTERNAL_ERROR` | Postgres outage with no stale cache entry |

## Rotation

```
POST /v1/api-keys/rotate
  body: { "name"?: string, "revoke_immediately"?: bool }

→ 201 {
  new_key: "sk_live_...",          # shown ONCE
  new_key_prefix: "sk_live_XXXX",
  old_key_prefix: "sk_live_YYYY",
  revoked_old: false,
  grace_period_seconds: 3600
}
```

Old key stays valid 1 h unless `revoke_immediately: true`, which also
publishes an instant cache invalidation to every API instance (< 1 s).

## Performance notes

Every `sk_*` hit is cached in an in-process LRU (TTL 5 min, stale-while-error
60 min). If Postgres drops out, auth falls back to the stale entry rather
than 503'ing active users.


<!-- ──── errors.md ──── -->

# Errors

## Envelope (every non-2xx response)

```json
{
  "error": {
    "code": "CONSTANT_CASE_CODE",
    "message": "Human-readable one-liner",
    "docs": "https://asyncbase.dev/docs/errors/CONSTANT_CASE_CODE"
  }
}
```

Status code matches HTTP semantics. Client libraries raise a typed
`AsyncBaseError` exposing `code`, `message`, `docs`, HTTP status, and
the `X-Request-Id` header.

## Every code

| HTTP | code | When |
|------|------|------|
| 400 | `QUEUE_NAME_INVALID` | queue name fails `^[a-zA-Z0-9_-]{1,128}$`; body validation; delay + dedupe_id combined; dedupe_id without fifo_group |
| 400 | `MESSAGE_NOT_FOUND` | `msg_id` param fails `^msg_[a-zA-Z0-9]+$` |
| 401 | `AUTH_INVALID` | missing/wrong Bearer, or key not in DB |
| 401 | `AUTH_KEY_EXPIRED` | key revoked or rotated past 1 h grace |
| 403 | `QUEUE_LIMIT_REACHED` | plan queue count exceeded (Free=3, Pro=50) |
| 403 | `PLAN_LIMIT_REACHED` | monthly message count exceeded |
| 404 | `QUEUE_NOT_FOUND` | unknown route, or queue has no messages |
| 409 | `AUTH_INVALID` | Supabase user conflict (bootstrap only) |
| 413 | `PAYLOAD_TOO_LARGE` | JSON payload > 256 KB |
| 429 | `RATE_LIMITED` | per-tenant req/s exceeded; carries `Retry-After: 1` + `X-RateLimit-*` |
| 500 | `INTERNAL_ERROR` | unexpected server issue |
| 503 | `INTERNAL_ERROR` | Postgres unreachable AND no stale cache |
| 503 | `NOT_CONFIGURED` | server missing required config (bootstrap secret, etc.) |

## Recommended client behavior

| Code | Retry? | Backoff |
|------|--------|---------|
| 429 | yes | honor `Retry-After`, else 1 s + jitter |
| 503 | yes | 1 s, 2 s, 4 s (cap 30 s) |
| 500 | once | 1 s |
| 4xx (others) | no | raise to caller |

## SDK error helpers

- Node: `err instanceof AsyncBaseError && err.code === "RATE_LIMITED"`
- Python: `except AsyncBaseError as e: if e.code == "RATE_LIMITED": ...`
- PHP: `catch (AsyncBaseError $e) if ($e->errorCode === 'RATE_LIMITED')` (PHP
  `Exception::$code` is int-only, so we use `$errorCode`)
- Go: `asyncbase.IsCode(err, "RATE_LIMITED")`


<!-- ──── rate-limits.md ──── -->

# Rate limits

Per-tenant request-per-second limits, reset every wall-clock second.

| Plan | req/s | Burst | Behavior |
|------|-------|-------|----------|
| free | 10 | none | 11th req in same second → 429 |
| pro | 100 | none | same |
| scale | 2 000 | none | same |

## Headers (on every 2xx + 429 under `/v1/*`)

| Header | Value |
|--------|-------|
| `X-RateLimit-Limit` | current plan's req/s |
| `X-RateLimit-Remaining` | left this second |
| `X-RateLimit-Reset` | unix second when next window starts |
| `Retry-After` | `1` (on 429 only) |

## Failure envelope

```json
{
  "error": {
    "code": "RATE_LIMITED",
    "message": "Exceeded 100 req/s for plan 'pro'. Try again in 1s.",
    "docs": "https://asyncbase.dev/docs/errors/RATE_LIMITED"
  }
}
```

## Client behavior

- Honor `Retry-After`. All SDKs do this automatically with jitter.
- Don't loop on 429 — you'll just eat more quota. Sleep `Retry-After + random(0..100ms)` before retry.
- If you're hitting the limit steadily, upgrade plan or batch enqueues via
  `fifo_group` pipelines.

## Scope

- Counted per tenant, NOT per API key. Rotating keys doesn't reset the counter.
- Counted per API instance × plan — distributed via Redis INCR, so truly global.
- `/health` and `/docs` endpoints are NOT rate-limited.


<!-- ──── enqueue.md ──── -->

# Enqueue

## HTTP

```
POST /v1/queues/{name}/messages
Authorization: Bearer sk_...
Content-Type: application/json
Idempotency-Key: <uuid or your-key>       # optional; dedups 24 h
```

## Request body

```json
{
  "payload": {  /* arbitrary JSON, ≤ 256 KB */  },
  "delay": "30s",                // optional; "5s" | "10m" | "2h" | "7d"
  "retries": 5,                  // optional; default 3
  "ttl": "7d",                   // optional; drop if not consumed by then
  "fifo_group": "user-42",       // optional; per-group ordering
  "dedupe_id": "order-9001"      // optional; requires fifo_group; 5 min window
}
```

## Responses

### 201 Created — new message

```json
{ "id": "msg_abc123", "enqueued_at": "2026-04-18T00:00:00Z" }
```

### 201 Created — delayed message

```json
{
  "id": "msg_abc123",
  "enqueued_at": "2026-04-18T00:00:00Z",
  "deliver_at": "2026-04-18T00:00:30Z"
}
```

### 200 OK — idempotency replay

Header: `Idempotency-Replay: true`
Body: same as 201 of the original request.

### 200 OK — FIFO dedup hit

```json
{
  "id": "msg_abc123",
  "enqueued_at": "2026-04-18T00:00:00Z",
  "deduped": true
}
```

### 400 / 413 / 429 — see [errors.md](./errors.md)

## Response headers on every 2xx/429

| Header | Value |
|--------|-------|
| `X-Request-Id` | `req_xxx` — copy into support tickets |
| `X-RateLimit-Limit` | per-plan req/s |
| `X-RateLimit-Remaining` | drop this second |
| `X-RateLimit-Reset` | unix second the window resets |

## Canonical examples

### Fire-and-forget

```bash
curl -sfX POST https://api.asyncbase.dev/v1/queues/emails/messages \
  -H "Authorization: Bearer sk_live_..." \
  -H "Content-Type: application/json" \
  -d '{"payload":{"to":"a@b.com","subject":"hi"}}'
```

### Delayed

```bash
curl ... -d '{"payload":{...},"delay":"30s"}'
```

### Idempotent (retry-safe)

```bash
curl ... -H "Idempotency-Key: order-9001-attempt" -d '{...}'
```

### FIFO with SQS-style dedup

```bash
curl ... -d '{"payload":{...},"fifo_group":"user-42","dedupe_id":"event-xyz"}'
```

## Interactions you should know

- `delay` + `dedupe_id` together → 400. Pick one.
- `dedupe_id` without `fifo_group` → 400.
- Payload > 256 KB → 413 `PAYLOAD_TOO_LARGE`.
- Same `Idempotency-Key` within 24 h → replay with 200.


<!-- ──── consume.md ──── -->

# Consume

Every consumer belongs to a **group**. Same group → load-balanced. Different
groups → independent copies of every message (like Kafka).

## The mandatory ordering (IMPORTANT)

Consumer groups are created with the Redis `$` semantic ("messages after now").
So consumers MUST prime the group **before** producers start sending, or the
group will skip existing in-flight messages.

Correct boot order:

1. Consumer process starts + issues its first `pull_messages` (creates group).
2. Producer starts enqueuing.

In tests: call `pull_messages` once with `wait_seconds: 0` right after
creating the client, before any `send_message`.

## Pull

```
GET /v1/queues/{name}/messages
    ?group=workers
    &consumer=c-abcd                  # optional; default: random
    &limit=10                         # 1..100
    &wait=5                           # long-poll seconds, 0..20
    &visibility_seconds=30            # default 30
Authorization: Bearer sk_...
```

### 200 OK

```json
{
  "messages": [
    {
      "id": "msg_abc",
      "payload": { "...": "..." },
      "enqueued_at": "2026-04-18T00:00:00Z",
      "attempt": 1,
      "ack_deadline": "2026-04-18T00:00:30Z"
    }
  ]
}
```

Empty pull = `{ "messages": [] }` after wait timeout (not an error).

## Ack (success)

```
POST /v1/queues/{name}/messages/{id}/ack?group=workers
Authorization: Bearer sk_...
```

→ `200 { "id": "msg_abc", "acked_at": "..." }`. Idempotent.

## Nack (failure — retry or DLQ)

```
POST /v1/queues/{name}/messages/{id}/nack?group=workers&attempt=<current attempt>
Authorization: Bearer sk_...
```

→ 200 with one of:

```json
// retry scheduled
{ "id":"msg_abc", "nacked_at":"...", "retry_at":"...", "next_attempt": 2, "backoff_ms": 2000 }

// moved to DLQ
{ "id":"msg_abc", "nacked_at":"...", "moved_to_dlq": true }
```

Backoff math: `2^attempt × 1000 ms` (2 s, 4 s, 8 s, ...). See [retries.md](./retries.md).

## Heartbeat (extend visibility for long handlers)

```
POST /v1/queues/{name}/messages/{id}/heartbeat?group=workers&consumer=c-abcd
Authorization: Bearer sk_...
```

Call every `visibility_seconds - 5` while the handler runs.

## Canonical loop (pseudocode)

```
while not shutdown:
    batch = pull(queue, group, limit=10, wait=5, visibility=60)
    for msg in batch:
        try:
            handle(msg.payload)
            ack(queue, msg.id, group)
        except Error:
            nack(queue, msg.id, group, attempt=msg.attempt)
```

## Pitfalls

- Ack BEFORE the handler finishes → on crash, the work is lost.
- Nack with wrong `attempt` → server may double-DLQ. Always pass `msg.attempt`.
- Two processes using the same `consumer` string in the same group → they
  steal each other's messages via XAUTOCLAIM. Use unique consumer IDs.
- Handler > `visibility_seconds` without heartbeat → message gets reclaimed
  to another consumer. You'll see the same id twice.


<!-- ──── retries.md ──── -->

# Retries + DLQ

## Backoff formula

```
backoff_ms = 2^attempt × 1000
attempt=1 → 2000 ms
attempt=2 → 4000 ms
attempt=3 → 8000 ms
attempt=4 → 16 000 ms
attempt=5 → 32 000 ms
...
```

After `retries` (default 3) consecutive nacks, the message is moved to the
queue's Dead Letter Queue.

## Nack request

```
POST /v1/queues/{name}/messages/{id}/nack?group=workers&attempt=<N>
```

`attempt` MUST be the value from `msg.attempt` you just handled. Passing it
wrong can double-DLQ the message.

## Responses

```json
// retry scheduled
{
  "id": "msg_abc",
  "nacked_at": "2026-04-18T00:00:00Z",
  "retry_at": "2026-04-18T00:00:02Z",
  "next_attempt": 2,
  "backoff_ms": 2000
}

// retries exhausted → DLQ
{
  "id": "msg_abc",
  "nacked_at": "2026-04-18T00:00:00Z",
  "moved_to_dlq": true
}
```

## Inspect DLQ

```
GET /v1/queues/{name}/dlq?limit=50
```

Returns:
```json
{
  "queue": "emails",
  "total": 12,
  "returned": 12,
  "messages": [
    {
      "msg_id": "msg_abc",
      "payload": "{...}",
      "original_enqueued_at": "...",
      "delivery_count": 3,
      "moved_to_dlq_at": "...",
      "group": "workers",
      "reason": "max_retries_exceeded"
    }
  ]
}
```

## Redrive

```
POST /v1/queues/{name}/dlq/redrive
body: { "batch": 100 }
```

Moves up to `batch` messages (default 100, max 1000) from DLQ back onto the
main stream for re-delivery. Idempotent — calling twice is safe but pulls
the NEXT batch off the DLQ, not the same one.

```json
{ "redriven": 12, "message_ids": ["msg_abc", "..."], "remaining": 0 }
```

## Alerts

Register a webhook that fires when DLQ size crosses a threshold:

```
POST /v1/alerts
{
  "type": "dlq_threshold",
  "queue_name": "emails",        // or null for tenant-wide
  "threshold": 100,
  "webhook_url": "https://hooks.slack.com/...",
  "webhook_kind": "slack"        // or "generic"
}
```

Debounced 1 hour per rule. Worker sweep evaluates every 60 s.

## Override per message

```ts
await q.send("emails", payload, { retries: 10 })  // per-msg override
```


<!-- ──── fifo.md ──── -->

# FIFO + dedup

Two independent features, usually paired.

## `fifo_group`: per-key ordering

Messages with the same `fifo_group` string are delivered strictly in the
order they were enqueued. Messages across different groups run concurrently.

```json
POST /v1/queues/emails/messages
{ "payload": {...}, "fifo_group": "user-42" }
```

All messages for `user-42` serialize. `user-43` messages are independent.

## `dedupe_id`: SQS-style dedup

Within a `fifo_group`, a second send with the same `dedupe_id` within 5 min
is rejected (replies 200 with `deduped: true`, no new message enqueued).

```json
POST /v1/queues/emails/messages
{ "payload": {...}, "fifo_group": "user-42", "dedupe_id": "order-9001" }
```

Window: 5 minutes per (tenant, queue, fifo_group, dedupe_id).

## Constraints

- `dedupe_id` requires `fifo_group`. Otherwise 400.
- `dedupe_id` cannot be combined with `delay`. Otherwise 400.
- If you only need retry-safety (not ordering), prefer `Idempotency-Key`
  header — 24 h window, no fifo_group needed.

## Idempotency-Key vs dedupe_id

| Feature | Idempotency-Key | dedupe_id |
|---------|-----------------|-----------|
| Window | 24 h | 5 min |
| Needs fifo_group | no | yes |
| Works with delay | yes | no |
| Semantics | SDK retry safety | SQS-style producer-side dedup |

Pick Idempotency-Key for "my network flaked, safe to retry". Pick dedupe_id
when you have a natural business key (order id, event id) that you want
serialized + dedup'd together.


<!-- ──── delay.md ──── -->

# Delayed delivery

Send a message now, deliver it later.

```json
POST /v1/queues/{name}/messages
{ "payload": {...}, "delay": "30s" }
```

## Duration syntax

`^\d+[smhd]$` — one of:

| Suffix | Meaning |
|--------|---------|
| `s` | seconds |
| `m` | minutes |
| `h` | hours |
| `d` | days |

Examples: `"5s"`, `"10m"`, `"2h"`, `"7d"`. Anything else → 400 `QUEUE_NAME_INVALID`.

## Response

```json
{
  "id": "msg_abc",
  "enqueued_at": "2026-04-18T00:00:00Z",
  "deliver_at": "2026-04-18T00:00:30Z"
}
```

`deliver_at` is present only when `delay` was set.

## Behavior

- Delayed messages go to a per-tenant ZSET, NOT the stream. Consumers can't
  see them.
- A worker tick every 1 s migrates due messages from the ZSET to the stream.
- Precision: ~1 s. Don't rely on it for sub-second scheduling.
- Cancelling is not supported. Workaround: use `fifo_group` + `dedupe_id`
  so a later send dedups the earlier one.

## Constraints

- `delay` + `dedupe_id` → 400. Choose one.
- Max `delay`: 7 days. Longer → 400.
- TTL eviction applies to delayed messages too. A message with `delay:"8d"`
  and default TTL gets dropped before delivery.


<!-- ──── tail.md ──── -->

# Live tail (Server-Sent Events)

Observe every new enqueue on a queue in real time — without consuming.

## Request

```
GET /v1/queues/{name}/tail
Accept: text/event-stream
Authorization: Bearer sk_...
```

## Stream frames

```
event: open
data: { "queue": "emails", "started_at": "2026-04-18T00:00:00Z" }

event: message
data: { "id": "msg_abc", "stream_id": "1776499375642-0",
        "payload": {...}, "enqueued_at": "ISO",
        "fifo_group": "user-42" }

event: error
data: { "code": "INTERNAL_ERROR", "message": "..." }

: keepalive     ← every 15 s to defeat proxy idle timeouts
```

## Semantics

- Starts from `$` (Redis): only new enqueues AFTER the connection opens.
- Does NOT consume. Messages still flow to your consumer groups normally.
- One Redis connection per tail. Don't open dozens concurrently.

## Client example (browser / Node / Deno)

```js
const es = new EventSource(
  "https://api.asyncbase.dev/v1/queues/emails/tail",
  // EventSource can't set headers directly — use a polyfill or proxy with the
  // Authorization header attached server-side. Our dashboard does this via
  // a Next.js route handler that adds Bearer before proxying.
)
es.addEventListener("message", (e) => {
  const frame = JSON.parse(e.data)
  console.log("new message:", frame.id, frame.payload)
})
```

## Use cases

- Dashboard live view of queue throughput.
- Dev-mode debugging (tail + curl enqueue).
- Cross-team observability (give read-only key to a product manager, they
  subscribe without impacting consumers).


<!-- ──── sdk/node.md ──── -->

# SDK: Node / TypeScript

```bash
npm i @asyncbase/sdk          # or bun add / pnpm add
```

## Construct

```ts
import { Queue, AsyncBaseError } from "@asyncbase/sdk"

const q = new Queue(process.env.ASYNCBASE_KEY!, {
  baseUrl: "https://api.asyncbase.dev",   // optional
  fetch: globalThis.fetch,                 // optional override
})
```

One `Queue` per process. Thread-safe (just `fetch` under the hood).

## Send

```ts
const r = await q.send("emails", { to: "a@b.com", subject: "hi" }, {
  delay: "30s",
  idempotencyKey: "order-9001",
  fifoGroup: "user-42",
  dedupeId: "event-xyz",
  retries: 5,
  ttl: "7d",
})
// → { id: "msg_abc", enqueued_at: "ISO", deliver_at?: "ISO", deduped?: true }
```

## Pull + ack / nack

```ts
const msgs = await q.pull("emails", {
  group: "workers",
  consumer: "c-hostname-pid",   // optional, default random
  batchSize: 10,                 // default 10, max 100
  waitSeconds: 5,                // long-poll, max 20
  visibilitySeconds: 60,         // default 30
})

for (const msg of msgs) {
  try {
    await handle(msg.payload)
    await msg.ack()
  } catch {
    await msg.nack()   // exp-backoff, auto-attempt based on msg.attempt
  }
}
```

## Streaming consume

```ts
const abort = new AbortController()
process.on("SIGTERM", () => abort.abort())

for await (const msg of q.consume("emails", {
  group: "workers",
  visibilitySeconds: 60,
  signal: abort.signal,
})) {
  try { await handle(msg.payload); await msg.ack() }
  catch { await msg.nack() }
}
```

## Heartbeat (long handlers)

```ts
const msg = (await q.pull("emails", { group: "workers", batchSize: 1 }))[0]
if (!msg) return
const beat = setInterval(() => msg.heartbeat().catch(() => {}), 25_000)
try { await longWork(msg.payload); await msg.ack() }
finally { clearInterval(beat) }
```

## Error handling

```ts
import { AsyncBaseError } from "@asyncbase/sdk"
try { await q.send(...) }
catch (err) {
  if (err instanceof AsyncBaseError && err.code === "RATE_LIMITED") {
    await new Promise(r => setTimeout(r, 1000))
  } else throw err
}
```

## TypeScript types exported

```ts
import type {
  SendOptions, PullOptions, ConsumeOptions,
  ConsumedMessage, NackResponse, HeartbeatResponse,
  Message, EnqueueRequest, EnqueueResponse,
  DequeueResponse, AckResponse,
  ErrorCode, ErrorEnvelope,
} from "@asyncbase/sdk"
```

## Test pattern (bun test / vitest)

```ts
import { Queue } from "@asyncbase/sdk"

test("send", async () => {
  const calls: Request[] = []
  const q = new Queue("sk_test_fake", {
    baseUrl: "http://fake",
    fetch: async (url, init) => {
      calls.push(new Request(url as string, init))
      return new Response(JSON.stringify({ id: "msg_1", enqueued_at: "x" }), { status: 201 })
    },
  })
  const r = await q.send("emails", { x: 1 })
  expect(r.id).toBe("msg_1")
  expect(calls[0].headers.get("authorization")).toBe("Bearer sk_test_fake")
})
```


<!-- ──── sdk/python.md ──── -->

# SDK: Python

```bash
pip install asyncbase          # or uv add / poetry add
```

Requires Python 3.10+. Sync (`Queue`) and async (`AsyncQueue`) clients share
the same surface; both use `httpx` under the hood.

## Construct

```python
from asyncbase import Queue, AsyncQueue, AsyncBaseError
import os

q = Queue(os.environ["ASYNCBASE_KEY"])            # sync
aq = AsyncQueue(os.environ["ASYNCBASE_KEY"])       # async

# Context manager closes the underlying httpx client:
with Queue(os.environ["ASYNCBASE_KEY"]) as q:
    ...
async with AsyncQueue(os.environ["ASYNCBASE_KEY"]) as aq:
    ...
```

## Send

```python
r = q.send(
    "emails",
    {"to": "a@b.com", "subject": "hi"},
    delay="30s",
    idempotency_key="order-9001",
    fifo_group="user-42",
    dedupe_id="event-xyz",
    retries=5,
    ttl="7d",
)
# r.id, r.enqueued_at, r.deliver_at, r.deduped
```

## Pull / ack / nack (sync)

```python
msgs = q.pull(
    "emails",
    group="workers",
    consumer="c-hostname-pid",
    limit=10,
    wait_seconds=5,
    visibility_seconds=60,
)
for m in msgs:
    try:
        handle(m.payload)
        m.ack()
    except Exception:
        m.nack()
```

## Streaming consume (async — recommended for servers)

```python
import asyncio
from asyncbase import AsyncQueue

async def main():
    aq = AsyncQueue(os.environ["ASYNCBASE_KEY"])
    async for msg in aq.consume("emails", group="workers", visibility_seconds=60):
        try:
            await handle(msg.payload)
            await msg.ack()
        except Exception:
            await msg.nack()

asyncio.run(main())
```

## Heartbeat

```python
import threading, time
stop = threading.Event()

def beat(msg):
    while not stop.wait(25):
        try: msg.heartbeat()
        except AsyncBaseError: break

for m in q.pull("emails", group="workers", limit=1):
    t = threading.Thread(target=beat, args=(m,), daemon=True)
    t.start()
    try:
        long_work(m.payload)
        m.ack()
    finally:
        stop.set()
```

## Error handling

```python
from asyncbase import AsyncBaseError

try:
    q.send("emails", {...})
except AsyncBaseError as e:
    if e.code == "RATE_LIMITED":
        time.sleep(1)
    else:
        raise
```

`AsyncBaseError` attributes: `.code`, `.message`, `.status`, `.docs`.

## Test pattern (pytest + respx)

```python
import respx, httpx, pytest
from asyncbase import Queue

@respx.mock
def test_send():
    respx.post("https://api.asyncbase.dev/v1/queues/emails/messages").respond(
        201, json={"id": "msg_1", "enqueued_at": "2026-04-18T00:00:00Z"},
    )
    with Queue("sk_test_fake") as q:
        r = q.send("emails", {"x": 1}, idempotency_key="k-1")
    assert r.id == "msg_1"
```


<!-- ──── sdk/php.md ──── -->

# SDK: PHP

```bash
composer require asyncbase/asyncbase
```

Requires PHP 8.1+. Uses Guzzle 7 for HTTP.

## Construct

```php
use AsyncBase\Queue;

$q = new Queue(
    getenv('ASYNCBASE_KEY'),
    'https://api.asyncbase.dev',  // optional
    30.0,                          // timeout seconds
    // 4th arg: ?GuzzleHttp\HandlerStack for test mocking
);
```

## Laravel binding

```php
// app/Providers/AppServiceProvider.php
use AsyncBase\Queue;

public function register(): void
{
    $this->app->singleton(Queue::class, fn () =>
        new Queue(env('ASYNCBASE_KEY'), env('ASYNCBASE_BASE_URL') ?: null)
    );
}
```

Then inject: `public function __construct(private readonly Queue $queue) {}`.

## Send

```php
$r = $q->send(
    'emails',
    ['to' => 'a@b.com', 'subject' => 'hi'],
    idempotencyKey: 'order-9001',
    delay: '30s',
    retries: 5,
    ttl: '7d',
    fifoGroup: 'user-42',
    dedupeId: 'event-xyz',
);
// $r->id, $r->enqueuedAt, $r->deliverAt, $r->deduped
```

## Pull / ack / nack

```php
$msgs = $q->pull(
    'emails',
    group: 'workers',
    consumer: 'c-hostname-pid',
    limit: 10,
    waitSeconds: 5,
    visibilitySeconds: 60,
);
foreach ($msgs as $msg) {
    try {
        handle($msg->payload);
        $msg->ack();
    } catch (\Throwable $e) {
        $msg->nack();
    }
}
```

## Streaming consume

```php
foreach ($q->consume('emails', group: 'workers', visibilitySeconds: 60) as $msg) {
    try {
        handle($msg->payload);
        $msg->ack();
    } catch (\Throwable $e) {
        $msg->nack();
    }
}
```

Run under supervisor / systemd so restarts on crash.

## Heartbeat

Call `$msg->heartbeat()` every `visibilitySeconds - 5` while the handler
runs. In Laravel Artisan commands you can `pcntl_signal(SIGALRM, fn() =>
$msg->heartbeat())` with `pcntl_alarm(25)`.

## Error handling

```php
use AsyncBase\AsyncBaseError;

try {
    $q->send('emails', [...]);
} catch (AsyncBaseError $e) {
    // NOTE: PHP's built-in Exception uses $code for an int, so AsyncBase
    // exposes the API code as $errorCode.
    if ($e->errorCode === 'RATE_LIMITED') {
        sleep(1);
    } else {
        throw $e;
    }
}
```

`AsyncBaseError`: `->errorCode`, `->getMessage()`, `->status`, `->docs`.

## Test pattern (phpunit + Guzzle MockHandler)

```php
use GuzzleHttp\Handler\MockHandler;
use GuzzleHttp\HandlerStack;
use GuzzleHttp\Psr7\Response;
use AsyncBase\Queue;

$mock = new MockHandler([
    new Response(201, [], json_encode(['id' => 'msg_1', 'enqueued_at' => 'x'])),
]);
$stack = HandlerStack::create($mock);
$q = new Queue('sk_test_fake', null, 30.0, $stack);

$r = $q->send('emails', ['x' => 1]);
$this->assertSame('msg_1', $r->id);
```


<!-- ──── sdk/go.md ──── -->

# SDK: Go

```bash
go get github.com/asyncbase/asyncbase-go
```

Requires Go 1.22+. Zero deps beyond stdlib.

## Construct

```go
import "github.com/asyncbase/asyncbase-go/asyncbase"

c := asyncbase.New(os.Getenv("ASYNCBASE_KEY"))
// or:
c := asyncbase.New(
    os.Getenv("ASYNCBASE_KEY"),
    asyncbase.WithBaseURL("https://api.asyncbase.dev"),
    asyncbase.WithHTTPClient(&http.Client{Timeout: 10 * time.Second}),
)
```

One `*Client` per process. Safe for concurrent use.

## Send

```go
r, err := c.Send(ctx, "emails", map[string]any{
    "to": "a@b.com", "subject": "hi",
}, &asyncbase.SendOptions{
    Delay:          "30s",
    IdempotencyKey: "order-9001",
    FifoGroup:      "user-42",
    DedupeID:       "event-xyz",
    Retries:        5,
    TTL:            "7d",
})
// r.ID, r.EnqueuedAt, r.DeliverAt, r.Deduped
```

## Pull / ack / nack

```go
msgs, err := c.Pull(ctx, "emails", asyncbase.PullOptions{
    Group:             "workers",
    Consumer:          "c-hostname-pid", // optional; auto-generated if empty
    BatchSize:         10,
    WaitSeconds:       5,
    VisibilitySeconds: 60,
})
for _, m := range msgs {
    if err := handle(m.Payload); err != nil {
        m.Nack(ctx, nil)
    } else {
        m.Ack(ctx)
    }
}
```

## Streaming consume (channels)

```go
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()

msgs, errs := c.Consume(ctx, "emails",
    asyncbase.PullOptions{Group: "workers", VisibilitySeconds: 60},
    time.Second, // idle poll interval
)

for {
    select {
    case <-ctx.Done():
        return
    case m, ok := <-msgs:
        if !ok { return }
        if err := handle(m.Payload); err != nil {
            m.Nack(ctx, nil)
        } else {
            m.Ack(ctx)
        }
    case err := <-errs:
        log.Printf("consume error: %v", err)
        return
    }
}
```

## Heartbeat

```go
go func() {
    t := time.NewTicker(25 * time.Second)
    defer t.Stop()
    for {
        select {
        case <-ctx.Done(): return
        case <-t.C:
            if _, err := m.Heartbeat(ctx); err != nil { return }
        }
    }
}()
```

## Error handling

```go
_, err := c.Send(ctx, "emails", payload, nil)
if asyncbase.IsCode(err, "RATE_LIMITED") {
    time.Sleep(time.Second)
} else if err != nil {
    return err
}
```

`*asyncbase.Error`: `.Code`, `.Message`, `.Docs`, `.Status`, `.RequestID`.

## Test pattern (httptest)

```go
func TestSend(t *testing.T) {
    srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        _ = json.NewEncoder(w).Encode(map[string]any{"id": "msg_1", "enqueued_at": "x"})
    }))
    defer srv.Close()

    c := asyncbase.New("sk_test_fake", asyncbase.WithBaseURL(srv.URL))
    r, err := c.Send(context.Background(), "emails", map[string]any{"x": 1}, nil)
    if err != nil { t.Fatal(err) }
    if r.ID != "msg_1" { t.Fatalf("got %s", r.ID) }
}
```


<!-- ──── mcp.md ──── -->

# MCP server — `@asyncbase/mcp`

An MCP (Model Context Protocol) server that exposes AsyncBase queue
primitives as tools any MCP-aware client can call. Drop it into Claude
Desktop, Cursor, Zed, Windsurf, or your own agent.

## Install

Nothing to install explicitly — the server runs via `npx`. You just need
Node 18+ on the machine running the MCP client.

Requires an API key: `sk_live_...` or `sk_test_...` (see [auth.md](./auth.md)).
Create one at https://app.asyncbase.dev/settings/api-keys.

## Claude Desktop config

Edit `~/Library/Application Support/Claude/claude_desktop_config.json`
(macOS) or `%APPDATA%\Claude\claude_desktop_config.json` (Windows):

```json
{
  "mcpServers": {
    "asyncbase": {
      "command": "npx",
      "args": ["-y", "@asyncbase/mcp"],
      "env": {
        "ASYNCBASE_API_KEY": "sk_live_..."
      }
    }
  }
}
```

Restart Claude Desktop. The asyncbase tools will appear in the hammer menu.

## Cursor config

Cursor → Settings → MCP → Add new MCP server. Paste:

```json
{
  "mcpServers": {
    "asyncbase": {
      "command": "npx",
      "args": ["-y", "@asyncbase/mcp"],
      "env": { "ASYNCBASE_API_KEY": "sk_live_..." }
    }
  }
}
```

## Claude Code (CLI) config

```bash
claude mcp add asyncbase npx -- -y @asyncbase/mcp \
  --env ASYNCBASE_API_KEY=sk_live_...
```

Or edit `~/.claude.json` directly.

## Zed / Windsurf / other

Any client that speaks MCP stdio with the 2024-11-05 protocol works.
Point the client at `npx -y @asyncbase/mcp` with the env var.

## Pointing at a non-default API

For staging or self-hosted:

```json
"env": {
  "ASYNCBASE_API_KEY": "sk_test_...",
  "ASYNCBASE_API_URL": "https://staging.asyncbase.dev"
}
```

## Exposed tools (7)

| Tool | Purpose |
|------|---------|
| `send_message` | Enqueue a JSON payload. Supports delay, FIFO, dedup, TTL. |
| `pull_messages` | Long-poll N messages from a consumer group. |
| `ack_message` | Mark a message as processed. |
| `nack_message` | Schedule retry or move to DLQ. |
| `list_dlq` | Inspect Dead Letter Queue entries. |
| `redrive_dlq` | Move DLQ entries back to the stream. |
| `get_docs` | Fetch AsyncBase docs. **Call before generating client code.** |

The full JSON Schema for each tool is returned by `tools/list` on the MCP
protocol. Every tool returns the raw AsyncBase API response as JSON text
content so models can parse + summarize.

## `get_docs` is the secret sauce

When an AI writes code that calls AsyncBase, it should call `get_docs`
first to fetch the canonical API surface for the topic it's about to
touch. Topics match `docs/llm/*.md`:

```json
{ "tool": "get_docs", "args": { "topic": "enqueue" } }
{ "tool": "get_docs", "args": { "topic": "sdk/python" } }
{ "tool": "get_docs", "args": { "topic": "index" } }   // list topics
{ "tool": "get_docs", "args": { "topic": "all" } }     // full dump
```

This keeps generated code in sync with the real API even as the server
evolves past the model's training cutoff.

## Smoke test

After setup, ask Claude / Cursor something like:

> "Send a test message to queue 'smoke' with payload {hello: 'mcp'}, then
> list the DLQ for 'smoke'."

You should see a successful `send_message` followed by `list_dlq` in the
tool-use trace. If the model can't find the tools, restart the client.

## Troubleshooting

| Symptom | Fix |
|---------|-----|
| "ASYNCBASE_API_KEY env var is required" | Add it to the `env` block in the config, not shell env |
| `AUTH_INVALID` on every call | Key was rotated or revoked — issue a fresh one at the dashboard |
| `RATE_LIMITED` during a back-and-forth | Your plan's req/s limit. Pro: 100/s, Free: 10/s. Upgrade or space requests |
| Tools not showing up | Restart the MCP client. Verify JSON config has no trailing commas |
| `npx` fetches fail on first run | Pin a version: `"args": ["-y", "@asyncbase/mcp@0.0.1"]` |


<!-- ──── claude-skill.md ──── -->

# Claude Code skill — `asyncbase`

A Claude Code (and any Claude.ai project) skill that scaffolds AsyncBase
into an existing codebase. When activated, Claude detects the project's
stack (Node / Python / PHP / Go + framework), installs the right SDK,
wires env vars, generates a send helper and a consume loop in the
idiomatic style for that framework, and verifies with a probe.

Pair it with the [MCP server](./mcp.md) so Claude can actually call
`get_docs` and the queue tools during scaffolding, not just write code
that might be wrong.

## Install

### Claude Code (CLI — recommended)

```bash
# In the project where you want to use AsyncBase:
mkdir -p ~/.claude/skills/asyncbase
curl -L https://api.asyncbase.dev/skill/asyncbase.tar | tar -xC ~/.claude/skills/asyncbase
```

Or clone from GitHub:

```bash
git clone --depth 1 https://github.com/asyncbase/asyncbase.git /tmp/ab
cp -r /tmp/ab/packages/claude-skill/asyncbase ~/.claude/skills/asyncbase
```

Verify:

```bash
ls ~/.claude/skills/asyncbase/SKILL.md
```

### Claude.ai (project skill)

Skills live per-project on claude.ai. Upload the folder as a project file:

1. Open your Claude.ai project → Settings → Skills (or Files).
2. Upload `packages/claude-skill/asyncbase/` (SKILL.md + references/).
3. Chat in that project — Claude will auto-detect when the skill applies.

## Use

In Claude Code / Cursor / Claude.ai:

> "Add AsyncBase to this project — a queue for sending emails async."

Claude reads `SKILL.md`, detects your stack, and:

1. Fetches canonical docs via `get_docs` (if the MCP server is also installed).
2. Installs the SDK via your package manager.
3. Creates a send helper (e.g. `lib/queue.ts`).
4. Creates a consumer appropriate to the framework:
   - NestJS: `Injectable` with `OnApplicationBootstrap`
   - FastAPI: lifespan-managed async task
   - Laravel: Artisan command
   - Next.js / plain Node: separate `consumer.ts` entry
   - Go: goroutine + context cancel
5. Adds `ASYNCBASE_KEY` to `.env.local` / `.env` and `.env.example`.
6. Prints the exact command to run the consumer.

Templates live in `~/.claude/skills/asyncbase/references/` — Claude reads
them before editing your files so the generated code matches the
framework's conventions.

## Updating

Skills don't auto-update. Pull when the SDK has a new release:

```bash
cd ~/.claude/skills/asyncbase
git pull  # if you cloned
# or re-run the tar fetch from "Install" above
```

Check the version embedded in SKILL.md to know what you're running.

## Remove

```bash
rm -rf ~/.claude/skills/asyncbase
```

## Troubleshooting

| Symptom | Fix |
|---------|-----|
| Skill doesn't activate | Ensure `SKILL.md` has valid YAML frontmatter with `name: asyncbase` |
| Claude skips `get_docs` | The MCP server needs to be installed separately — see [mcp.md](./mcp.md) |
| Wrong language detected | Tell Claude explicitly: "use the asyncbase skill for the Python/FastAPI stack" |
| Generated imports are stale | Make sure the MCP server is installed so the skill can fetch live docs |
