Get Started

Fleets

Run the same agent across hundreds of inputs in parallel. Fleets handle concurrency limits, cost caps, failure policies, and streaming results as items complete.

Fleets require primitivesEnabled: true. Concurrency is capped at the lower of your requested value and your provider's maxConcurrent limit.

Dispatching a fleet

Call session.fleets.dispatch() with an agent definition, an array of inputs, and your concurrency and cost constraints.

fleet.ts
import { Theazo } from 'theazo'

const theazo = new Theazo({ apiKey: 'th_live_...' })

const session = await theazo.sessions.forUser('user_123')

const fleet = await session.fleets.dispatch({
  agent: 'content-writer',
  inputs: [
    { topic: 'AI infrastructure', tone: 'technical' },
    { topic: 'Founder fundraising', tone: 'narrative' },
    { topic: 'YC application tips', tone: 'direct' },
    // ... up to thousands of items
  ],
  concurrency: 10,
  timeout: 300,            // per-item timeout in seconds
  maxCost: { amount: 5000, currency: 'usd' },
  failurePolicy: 'continue',  // 'continue' | 'stop-on-first' | 'pause'
})

console.log(fleet.id)      // 'fleet_abc123'
console.log(fleet.status)  // 'running'

Failure policies

continue

Failed items are recorded but the fleet keeps running. Completed items are still available. Use this when partial results are acceptable.

stop-on-first

Any failure immediately cancels all pending and in-progress items. Use this when all-or-nothing semantics matter.

pause

Stops processing on failure and preserves all completed items. The fleet enters a paused state and can be manually resumed after investigation. Use this when you want to inspect failures before deciding whether to continue.

Fleet status

Poll fleet.status() to get a summary of progress.

const status = await fleet.status()

console.log(status.total)       // 500
console.log(status.completed)   // 234
console.log(status.failed)      // 3
console.log(status.running)     // 10
console.log(status.pending)     // 253
console.log(status.cost)        // { amount: 1870, currency: 'usd' }
console.log(status.status)      // 'running' | 'completed' | 'failed' | 'cancelled'

Reading results

Paginated results

Fetch completed results in pages. You can filter by status to get only failed items for retry logic.

// First page of completed results
const page = await fleet.results({
  status: 'completed',
  limit: 50,
})

console.log(page.items)        // array of FleetItem results
console.log(page.nextCursor)   // pass to next call, null if done

// Fetch next page
const nextPage = await fleet.results({
  status: 'completed',
  limit: 50,
  cursor: page.nextCursor,
})
// FleetItem shape:
// {
//   index:    0,
//   input:    { topic: 'AI infrastructure', tone: 'technical' },
//   output:   '...',
//   status:   'completed',
//   cost:     { amount: 12, currency: 'usd' },
//   duration: 18,   // seconds
//   error:    null,
// }

Streaming results

Use fleet.stream() to process results as they complete rather than polling. This is an async iterable — each yielded item is a completed (or failed) fleet item.

stream.ts
const fleet = await session.fleets.dispatch({
  agent: 'classifier',
  inputs: emails,   // array of 500 email objects
  concurrency: 20,
  maxCost: { amount: 10000, currency: 'usd' },
  failurePolicy: 'continue',
})

let totalCost = 0

for await (const item of fleet.stream()) {
  if (item.status === 'completed') {
    console.log(`[${item.index}] ${item.output.category} — ${item.duration}s`)
    totalCost += item.cost.amount
  } else {
    console.error(`[${item.index}] Failed: ${item.error}`)
  }
}

console.log('Total cost (cents):', totalCost)

SSE transport

Under the hood, fleet.stream() uses Server-Sent Events (SSE) via the GET /v1/fleets/:id/stream endpoint. You can consume SSE directly if you need lower-level control — for example, from a browser or a non-TypeScript client.

sse-events.ts
// The SSE endpoint emits four event types:

// 1. item.completed — fired each time an item finishes successfully
// data: { index: 0, input: {...}, output: '...', status: 'completed',
//         cost: { amount: 12, currency: 'usd' }, duration: 18, error: null }

// 2. fleet.progress — periodic summary of fleet progress
// data: { total: 500, completed: 234, failed: 3, running: 10, pending: 253,
//         cost: { amount: 1870, currency: 'usd' } }

// 3. fleet.completed — all items are done (terminal)
// data: { total: 500, completed: 497, failed: 3,
//         cost: { amount: 3241, currency: 'usd' } }

// 4. fleet.failed — fleet-level failure (terminal)
// data: { error: 'Cost cap exceeded', failedAt: 312 }

Consuming the SSE endpoint directly with EventSource:

sse-client.ts
const url = 'https://api.theazo.com/v1/fleets/fleet_abc123/stream'
const source = new EventSource(url, {
  headers: { 'Authorization': 'Bearer th_live_...' },
})

source.addEventListener('item.completed', (e) => {
  const item = JSON.parse(e.data)
  console.log(`[${item.index}] done — ${item.duration}s`)
})

source.addEventListener('fleet.progress', (e) => {
  const progress = JSON.parse(e.data)
  console.log(`${progress.completed}/${progress.total} complete`)
})

source.addEventListener('fleet.completed', (e) => {
  const summary = JSON.parse(e.data)
  console.log('Fleet finished:', summary)
  source.close()
})

source.addEventListener('fleet.failed', (e) => {
  const failure = JSON.parse(e.data)
  console.error('Fleet failed:', failure.error)
  source.close()
})
SSE auto-reconnects on disconnect. The server tracks progress via Last-Event-ID — on reconnect, only events after the last received ID are sent. No items are missed and no duplicates are delivered.

Cancelling a fleet

Cancelling stops all pending items immediately. In-flight items run to completion. Results collected so far remain available via fleet.results().

await fleet.cancel()

const status = await fleet.status()
console.log(status.status)  // 'cancelled'

Example: 500-item batch

batch-classify.ts
import { Theazo } from 'theazo'

const theazo = new Theazo({ apiKey: 'th_live_...' })
const session = await theazo.sessions.forUser('user_789', {
  limits: {
    maxCost: { amount: 20000, currency: 'usd', period: 'day' },
  },
})

// Build 500 inputs from your database
const tickets = await db.supportTickets.findMany({ limit: 500 })
const inputs = tickets.map(t => ({
  id: t.id,
  subject: t.subject,
  body: t.body,
}))

// Dispatch with concurrency of 10 and a $50 cost cap
const fleet = await session.fleets.dispatch({
  agent: 'ticket-classifier',
  inputs,
  concurrency: 10,
  timeout: 60,
  maxCost: { amount: 5000, currency: 'usd' },
  failurePolicy: 'continue',
})

console.log(`Fleet ${fleet.id} dispatched with ${inputs.length} items`)

// Stream results and write to DB as they arrive
const results: Record<string, string> = {}

for await (const item of fleet.stream()) {
  const ticket = tickets[item.index]

  if (item.status === 'completed') {
    results[ticket.id] = item.output.category
    await db.supportTickets.update({
      where: { id: ticket.id },
      data:  { category: item.output.category, classifiedAt: new Date() },
    })
  } else {
    console.error(`Ticket ${ticket.id} failed: ${item.error}`)
  }
}

const final = await fleet.status()
console.log('Completed:', final.completed, '/', final.total)
console.log('Failed:',    final.failed)
console.log('Total cost:', final.cost)  // { amount: 3241, currency: 'usd' }

API reference

session.fleets.dispatch(config)Promise<Fleet>Dispatch a fleet. Starts agents immediately up to concurrency limit.
fleet.status()Promise<FleetStatus>Summary: total, completed, failed, running, pending, cost, overall status.
fleet.results({ status, limit, cursor })Promise<FleetResultPage>Paginated item results. Filter by status: completed | failed | pending.
fleet.stream()AsyncIterable<FleetItem>Yields items as they complete. Ends when all items are done or fleet is cancelled.
fleet.cancel()Promise<void>Stop pending items. In-flight items run to completion.
session.fleets.list()Promise<Fleet[]>List all fleets for this session, newest first.
session.fleets.get(fleetId)Promise<Fleet>Fetch a fleet by ID.
Was this page helpful?
Ask anything...⌘I