Fleets
Run the same agent across hundreds of inputs in parallel. Fleets handle concurrency limits, cost caps, failure policies, and streaming results as items complete.
primitivesEnabled: true. Concurrency is capped at the lower of your requested value and your provider's maxConcurrent limit.Dispatching a fleet
Call session.fleets.dispatch() with an agent definition, an array of inputs, and your concurrency and cost constraints.
import { Theazo } from 'theazo'
const theazo = new Theazo({ apiKey: 'th_live_...' })
const session = await theazo.sessions.forUser('user_123')
const fleet = await session.fleets.dispatch({
agent: 'content-writer',
inputs: [
{ topic: 'AI infrastructure', tone: 'technical' },
{ topic: 'Founder fundraising', tone: 'narrative' },
{ topic: 'YC application tips', tone: 'direct' },
// ... up to thousands of items
],
concurrency: 10,
timeout: 300, // per-item timeout in seconds
maxCost: { amount: 5000, currency: 'usd' },
failurePolicy: 'continue', // 'continue' | 'stop-on-first' | 'pause'
})
console.log(fleet.id) // 'fleet_abc123'
console.log(fleet.status) // 'running'Failure policies
continueFailed items are recorded but the fleet keeps running. Completed items are still available. Use this when partial results are acceptable.
stop-on-firstAny failure immediately cancels all pending and in-progress items. Use this when all-or-nothing semantics matter.
pauseStops processing on failure and preserves all completed items. The fleet enters a paused state and can be manually resumed after investigation. Use this when you want to inspect failures before deciding whether to continue.
Fleet status
Poll fleet.status() to get a summary of progress.
const status = await fleet.status()
console.log(status.total) // 500
console.log(status.completed) // 234
console.log(status.failed) // 3
console.log(status.running) // 10
console.log(status.pending) // 253
console.log(status.cost) // { amount: 1870, currency: 'usd' }
console.log(status.status) // 'running' | 'completed' | 'failed' | 'cancelled'Reading results
Paginated results
Fetch completed results in pages. You can filter by status to get only failed items for retry logic.
// First page of completed results
const page = await fleet.results({
status: 'completed',
limit: 50,
})
console.log(page.items) // array of FleetItem results
console.log(page.nextCursor) // pass to next call, null if done
// Fetch next page
const nextPage = await fleet.results({
status: 'completed',
limit: 50,
cursor: page.nextCursor,
})// FleetItem shape:
// {
// index: 0,
// input: { topic: 'AI infrastructure', tone: 'technical' },
// output: '...',
// status: 'completed',
// cost: { amount: 12, currency: 'usd' },
// duration: 18, // seconds
// error: null,
// }Streaming results
Use fleet.stream() to process results as they complete rather than polling. This is an async iterable — each yielded item is a completed (or failed) fleet item.
const fleet = await session.fleets.dispatch({
agent: 'classifier',
inputs: emails, // array of 500 email objects
concurrency: 20,
maxCost: { amount: 10000, currency: 'usd' },
failurePolicy: 'continue',
})
let totalCost = 0
for await (const item of fleet.stream()) {
if (item.status === 'completed') {
console.log(`[${item.index}] ${item.output.category} — ${item.duration}s`)
totalCost += item.cost.amount
} else {
console.error(`[${item.index}] Failed: ${item.error}`)
}
}
console.log('Total cost (cents):', totalCost)SSE transport
Under the hood, fleet.stream() uses Server-Sent Events (SSE) via the GET /v1/fleets/:id/stream endpoint. You can consume SSE directly if you need lower-level control — for example, from a browser or a non-TypeScript client.
// The SSE endpoint emits four event types:
// 1. item.completed — fired each time an item finishes successfully
// data: { index: 0, input: {...}, output: '...', status: 'completed',
// cost: { amount: 12, currency: 'usd' }, duration: 18, error: null }
// 2. fleet.progress — periodic summary of fleet progress
// data: { total: 500, completed: 234, failed: 3, running: 10, pending: 253,
// cost: { amount: 1870, currency: 'usd' } }
// 3. fleet.completed — all items are done (terminal)
// data: { total: 500, completed: 497, failed: 3,
// cost: { amount: 3241, currency: 'usd' } }
// 4. fleet.failed — fleet-level failure (terminal)
// data: { error: 'Cost cap exceeded', failedAt: 312 }Consuming the SSE endpoint directly with EventSource:
const url = 'https://api.theazo.com/v1/fleets/fleet_abc123/stream'
const source = new EventSource(url, {
headers: { 'Authorization': 'Bearer th_live_...' },
})
source.addEventListener('item.completed', (e) => {
const item = JSON.parse(e.data)
console.log(`[${item.index}] done — ${item.duration}s`)
})
source.addEventListener('fleet.progress', (e) => {
const progress = JSON.parse(e.data)
console.log(`${progress.completed}/${progress.total} complete`)
})
source.addEventListener('fleet.completed', (e) => {
const summary = JSON.parse(e.data)
console.log('Fleet finished:', summary)
source.close()
})
source.addEventListener('fleet.failed', (e) => {
const failure = JSON.parse(e.data)
console.error('Fleet failed:', failure.error)
source.close()
})Last-Event-ID — on reconnect, only events after the last received ID are sent. No items are missed and no duplicates are delivered.Cancelling a fleet
Cancelling stops all pending items immediately. In-flight items run to completion. Results collected so far remain available via fleet.results().
await fleet.cancel()
const status = await fleet.status()
console.log(status.status) // 'cancelled'Example: 500-item batch
import { Theazo } from 'theazo'
const theazo = new Theazo({ apiKey: 'th_live_...' })
const session = await theazo.sessions.forUser('user_789', {
limits: {
maxCost: { amount: 20000, currency: 'usd', period: 'day' },
},
})
// Build 500 inputs from your database
const tickets = await db.supportTickets.findMany({ limit: 500 })
const inputs = tickets.map(t => ({
id: t.id,
subject: t.subject,
body: t.body,
}))
// Dispatch with concurrency of 10 and a $50 cost cap
const fleet = await session.fleets.dispatch({
agent: 'ticket-classifier',
inputs,
concurrency: 10,
timeout: 60,
maxCost: { amount: 5000, currency: 'usd' },
failurePolicy: 'continue',
})
console.log(`Fleet ${fleet.id} dispatched with ${inputs.length} items`)
// Stream results and write to DB as they arrive
const results: Record<string, string> = {}
for await (const item of fleet.stream()) {
const ticket = tickets[item.index]
if (item.status === 'completed') {
results[ticket.id] = item.output.category
await db.supportTickets.update({
where: { id: ticket.id },
data: { category: item.output.category, classifiedAt: new Date() },
})
} else {
console.error(`Ticket ${ticket.id} failed: ${item.error}`)
}
}
const final = await fleet.status()
console.log('Completed:', final.completed, '/', final.total)
console.log('Failed:', final.failed)
console.log('Total cost:', final.cost) // { amount: 3241, currency: 'usd' }API reference
session.fleets.dispatch(config)Promise<Fleet>Dispatch a fleet. Starts agents immediately up to concurrency limit.fleet.status()Promise<FleetStatus>Summary: total, completed, failed, running, pending, cost, overall status.fleet.results({ status, limit, cursor })Promise<FleetResultPage>Paginated item results. Filter by status: completed | failed | pending.fleet.stream()AsyncIterable<FleetItem>Yields items as they complete. Ends when all items are done or fleet is cancelled.fleet.cancel()Promise<void>Stop pending items. In-flight items run to completion.session.fleets.list()Promise<Fleet[]>List all fleets for this session, newest first.session.fleets.get(fleetId)Promise<Fleet>Fetch a fleet by ID.