Get Started

Fleets

Run the same agent across hundreds of inputs in parallel. Fleets handle concurrency limits, cost caps, failure policies, and streaming results as items complete.

Fleets require primitivesEnabled: true. Concurrency is capped at the lower of your requested value and your provider's maxConcurrent limit.

Dispatching a fleet

Call session.fleets.dispatch() with an agent definition, an array of inputs, and your concurrency and cost constraints.

fleet.ts
import { Theazo } from class="cb-str">'theazo'

const theazo = new Theazo({ apiKey: class="cb-str">'th_live_...' })

const session = await theazo.sessions.forUser(class="cb-str">'user_123')

const fleet = await session.fleets.dispatch({
  agent: class="cb-str">'content-writer',
  inputs: [
    { topic: class="cb-str">'AI infrastructure', tone: class="cb-str">'technical' },
    { topic: class="cb-str">'Founder fundraising', tone: class="cb-str">'narrative' },
    { topic: class="cb-str">'YC application tips', tone: class="cb-str">'direct' },
    class="cb-cmt">// ... up to thousands of items
  ],
  concurrency: class="cb-num">10,
  timeout: class="cb-num">300,            class="cb-cmt">// per-item timeout in seconds
  maxCost: { amount: class="cb-num">5000, currency: class="cb-str">'usd' },
  failurePolicy: class="cb-str">'continue',  class="cb-cmt">// 'continue' | 'stop-on-first'
})

console.log(fleet.id)      class="cb-cmt">// 'fleet_abc123'
console.log(fleet.status)  class="cb-cmt">// 'running'

Failure policies

continue

Failed items are recorded but the fleet keeps running. Completed items are still available. Use this when partial results are acceptable.

stop-on-first

Any failure immediately cancels all pending and in-progress items. Use this when all-or-nothing semantics matter.

Fleet status

Poll fleet.status() to get a summary of progress.

const status = await fleet.status()

console.log(status.total)       class="cb-cmt">// 500
console.log(status.completed)   class="cb-cmt">// 234
console.log(status.failed)      class="cb-cmt">// 3
console.log(status.running)     class="cb-cmt">// 10
console.log(status.pending)     class="cb-cmt">// 253
console.log(status.cost)        class="cb-cmt">// { amount: 1870, currency: 'usd' }
console.log(status.status)      class="cb-cmt">// 'running' | 'completed' | 'failed' | 'cancelled'

Reading results

Paginated results

Fetch completed results in pages. You can filter by status to get only failed items for retry logic.

// First page of completed results
const page = await fleet.results({
  status: class="cb-str">'completed',
  limit: class="cb-num">50,
})

console.log(page.items)        class="cb-cmt">// array of FleetItem results
console.log(page.nextCursor)   class="cb-cmt">// pass to next call, null if done

// Fetch next page
const nextPage = await fleet.results({
  status: class="cb-str">'completed',
  limit: class="cb-num">50,
  cursor: page.nextCursor,
})

// FleetItem shape:
// {
//   index:    0,
//   input:    { topic: 'AI infrastructure', tone: 'technical' },
//   output:   '...',
//   status:   'completed',
//   cost:     { amount: 12, currency: 'usd' },
//   duration: 18,   // seconds
//   error:    null,
// }

Streaming results

Use fleet.stream() to process results as they complete rather than polling. This is an async iterable — each yielded item is a completed (or failed) fleet item.

stream.ts
const fleet = await session.fleets.dispatch({
  agent: class="cb-str">'classifier',
  inputs: emails,   class="cb-cmt">// array of 500 email objects
  concurrency: class="cb-num">20,
  maxCost: { amount: class="cb-num">10000, currency: class="cb-str">'usd' },
  failurePolicy: class="cb-str">'continue',
})

let totalCost = class="cb-num">0

for await (const item of fleet.stream()) {
  if (item.status === class="cb-str">'completed') {
    console.log(class="cb-str">`[${item.index}] ${item.output.category} — ${item.duration}s`)
    totalCost += item.cost.amount
  } else {
    console.error(class="cb-str">`[${item.index}] Failed: ${item.error}`)
  }
}

console.log(class="cb-str">'Total cost (cents):', totalCost)

Cancelling a fleet

Cancelling stops all pending items immediately. In-flight items run to completion. Results collected so far remain available via fleet.results().

await fleet.cancel()

const status = await fleet.status()
console.log(status.status)  class="cb-cmt">// 'cancelled'

Example: 500-item batch

batch-classify.ts
import { Theazo } from class="cb-str">'theazo'

const theazo = new Theazo({ apiKey: class="cb-str">'th_live_...' })
const session = await theazo.sessions.forUser(class="cb-str">'user_789', {
  limits: {
    maxCost: { amount: class="cb-num">20000, currency: class="cb-str">'usd', period: class="cb-str">'day' },
  },
})

// Build 500 inputs from your database
const tickets = await db.supportTickets.findMany({ limit: class="cb-num">500 })
const inputs = tickets.map(t => ({
  id: t.id,
  subject: t.subject,
  body: t.body,
}))

// Dispatch with concurrency of 10 and a $50 cost cap
const fleet = await session.fleets.dispatch({
  agent: class="cb-str">'ticket-classifier',
  inputs,
  concurrency: class="cb-num">10,
  timeout: class="cb-num">60,
  maxCost: { amount: class="cb-num">5000, currency: class="cb-str">'usd' },
  failurePolicy: class="cb-str">'continue',
})

console.log(class="cb-str">`Fleet ${fleet.id} dispatched with ${inputs.length} items`)

// Stream results and write to DB as they arrive
const results: Record = {}

for await (const item of fleet.stream()) {
  const ticket = tickets[item.index]

  if (item.status === class="cb-str">'completed') {
    results[ticket.id] = item.output.category
    await db.supportTickets.update({
      where: { id: ticket.id },
      data:  { category: item.output.category, classifiedAt: new Date() },
    })
  } else {
    console.error(class="cb-str">`Ticket ${ticket.id} failed: ${item.error}`)
  }
}

const final = await fleet.status()
console.log(class="cb-str">'Completed:', final.completed, class="cb-str">'/', final.total)
console.log(class="cb-str">'Failed:',    final.failed)
console.log(class="cb-str">'Total cost:', final.cost)  class="cb-cmt">// { amount: 3241, currency: 'usd' }

API reference

session.fleets.dispatch(config)Promise<Fleet>Dispatch a fleet. Starts agents immediately up to concurrency limit.
fleet.status()Promise<FleetStatus>Summary: total, completed, failed, running, pending, cost, overall status.
fleet.results({ status, limit, cursor })Promise<FleetResultPage>Paginated item results. Filter by status: completed | failed | pending.
fleet.stream()AsyncIterable<FleetItem>Yields items as they complete. Ends when all items are done or fleet is cancelled.
fleet.cancel()Promise<void>Stop pending items. In-flight items run to completion.
session.fleets.list()Promise<Fleet[]>List all fleets for this session, newest first.
session.fleets.get(fleetId)Promise<Fleet>Fetch a fleet by ID.
Was this page helpful?
Ask anything...⌘I