Fleets
Run the same agent across hundreds of inputs in parallel. Fleets handle concurrency limits, cost caps, failure policies, and streaming results as items complete.
primitivesEnabled: true. Concurrency is capped at the lower of your requested value and your provider's maxConcurrent limit.Dispatching a fleet
Call session.fleets.dispatch() with an agent definition, an array of inputs, and your concurrency and cost constraints.
import { Theazo } from class="cb-str">'theazo'
const theazo = new Theazo({ apiKey: class="cb-str">'th_live_...' })
const session = await theazo.sessions.forUser(class="cb-str">'user_123')
const fleet = await session.fleets.dispatch({
agent: class="cb-str">'content-writer',
inputs: [
{ topic: class="cb-str">'AI infrastructure', tone: class="cb-str">'technical' },
{ topic: class="cb-str">'Founder fundraising', tone: class="cb-str">'narrative' },
{ topic: class="cb-str">'YC application tips', tone: class="cb-str">'direct' },
class="cb-cmt">// ... up to thousands of items
],
concurrency: class="cb-num">10,
timeout: class="cb-num">300, class="cb-cmt">// per-item timeout in seconds
maxCost: { amount: class="cb-num">5000, currency: class="cb-str">'usd' },
failurePolicy: class="cb-str">'continue', class="cb-cmt">// 'continue' | 'stop-on-first'
})
console.log(fleet.id) class="cb-cmt">// 'fleet_abc123'
console.log(fleet.status) class="cb-cmt">// 'running'Failure policies
continueFailed items are recorded but the fleet keeps running. Completed items are still available. Use this when partial results are acceptable.
stop-on-firstAny failure immediately cancels all pending and in-progress items. Use this when all-or-nothing semantics matter.
Fleet status
Poll fleet.status() to get a summary of progress.
const status = await fleet.status()
console.log(status.total) class="cb-cmt">// 500
console.log(status.completed) class="cb-cmt">// 234
console.log(status.failed) class="cb-cmt">// 3
console.log(status.running) class="cb-cmt">// 10
console.log(status.pending) class="cb-cmt">// 253
console.log(status.cost) class="cb-cmt">// { amount: 1870, currency: 'usd' }
console.log(status.status) class="cb-cmt">// 'running' | 'completed' | 'failed' | 'cancelled'Reading results
Paginated results
Fetch completed results in pages. You can filter by status to get only failed items for retry logic.
// First page of completed results
const page = await fleet.results({
status: class="cb-str">'completed',
limit: class="cb-num">50,
})
console.log(page.items) class="cb-cmt">// array of FleetItem results
console.log(page.nextCursor) class="cb-cmt">// pass to next call, null if done
// Fetch next page
const nextPage = await fleet.results({
status: class="cb-str">'completed',
limit: class="cb-num">50,
cursor: page.nextCursor,
})
// FleetItem shape:
// {
// index: 0,
// input: { topic: 'AI infrastructure', tone: 'technical' },
// output: '...',
// status: 'completed',
// cost: { amount: 12, currency: 'usd' },
// duration: 18, // seconds
// error: null,
// }Streaming results
Use fleet.stream() to process results as they complete rather than polling. This is an async iterable — each yielded item is a completed (or failed) fleet item.
const fleet = await session.fleets.dispatch({
agent: class="cb-str">'classifier',
inputs: emails, class="cb-cmt">// array of 500 email objects
concurrency: class="cb-num">20,
maxCost: { amount: class="cb-num">10000, currency: class="cb-str">'usd' },
failurePolicy: class="cb-str">'continue',
})
let totalCost = class="cb-num">0
for await (const item of fleet.stream()) {
if (item.status === class="cb-str">'completed') {
console.log(class="cb-str">`[${item.index}] ${item.output.category} — ${item.duration}s`)
totalCost += item.cost.amount
} else {
console.error(class="cb-str">`[${item.index}] Failed: ${item.error}`)
}
}
console.log(class="cb-str">'Total cost (cents):', totalCost)Cancelling a fleet
Cancelling stops all pending items immediately. In-flight items run to completion. Results collected so far remain available via fleet.results().
await fleet.cancel()
const status = await fleet.status()
console.log(status.status) class="cb-cmt">// 'cancelled'Example: 500-item batch
import { Theazo } from class="cb-str">'theazo'
const theazo = new Theazo({ apiKey: class="cb-str">'th_live_...' })
const session = await theazo.sessions.forUser(class="cb-str">'user_789', {
limits: {
maxCost: { amount: class="cb-num">20000, currency: class="cb-str">'usd', period: class="cb-str">'day' },
},
})
// Build 500 inputs from your database
const tickets = await db.supportTickets.findMany({ limit: class="cb-num">500 })
const inputs = tickets.map(t => ({
id: t.id,
subject: t.subject,
body: t.body,
}))
// Dispatch with concurrency of 10 and a $50 cost cap
const fleet = await session.fleets.dispatch({
agent: class="cb-str">'ticket-classifier',
inputs,
concurrency: class="cb-num">10,
timeout: class="cb-num">60,
maxCost: { amount: class="cb-num">5000, currency: class="cb-str">'usd' },
failurePolicy: class="cb-str">'continue',
})
console.log(class="cb-str">`Fleet ${fleet.id} dispatched with ${inputs.length} items`)
// Stream results and write to DB as they arrive
const results: Record = {}
for await (const item of fleet.stream()) {
const ticket = tickets[item.index]
if (item.status === class="cb-str">'completed') {
results[ticket.id] = item.output.category
await db.supportTickets.update({
where: { id: ticket.id },
data: { category: item.output.category, classifiedAt: new Date() },
})
} else {
console.error(class="cb-str">`Ticket ${ticket.id} failed: ${item.error}`)
}
}
const final = await fleet.status()
console.log(class="cb-str">'Completed:', final.completed, class="cb-str">'/', final.total)
console.log(class="cb-str">'Failed:', final.failed)
console.log(class="cb-str">'Total cost:', final.cost) class="cb-cmt">// { amount: 3241, currency: 'usd' } API reference
session.fleets.dispatch(config)Promise<Fleet>Dispatch a fleet. Starts agents immediately up to concurrency limit.fleet.status()Promise<FleetStatus>Summary: total, completed, failed, running, pending, cost, overall status.fleet.results({ status, limit, cursor })Promise<FleetResultPage>Paginated item results. Filter by status: completed | failed | pending.fleet.stream()AsyncIterable<FleetItem>Yields items as they complete. Ends when all items are done or fleet is cancelled.fleet.cancel()Promise<void>Stop pending items. In-flight items run to completion.session.fleets.list()Promise<Fleet[]>List all fleets for this session, newest first.session.fleets.get(fleetId)Promise<Fleet>Fetch a fleet by ID.