Workflows
Multi-agent pipelines with DAG execution. Define a graph of steps once, run it many times with different inputs. Independent branches execute in parallel automatically. Steps can fan out, branch on conditions, wait for humans, call webhooks, transform data, and dynamically splice new steps into a running DAG.
primitivesEnabled: true on your platform config. Available on Cloud, Team, and Enterprise plans.Creating a workflow
Use the typed builder API to construct a workflow. Call .build() at the end to produce a WorkflowCreateOpts object you pass to theazo.workflows.create().
import { Theazo, workflow } from 'theazo'
const theazo = new Theazo({ apiKey: 'th_live_...' })
const wf = await theazo.workflows.create(
workflow('lead-enrichment')
.step({
id: 'scrape',
type: 'agent',
agent: 'web-researcher',
task: 'Scrape company info from {{ input.website }}',
})
.step({
id: 'enrich',
type: 'agent',
agent: 'data-enricher',
dependsOn: ['scrape'],
task: 'Enrich lead data with LinkedIn and Clearbit',
inputMap: {
companyData: '$.scrape.output.company',
rawHtml: '$.scrape.output.html',
},
})
.condition({
id: 'check-fit',
dependsOn: ['enrich'],
expression: '$.enrich.output.icp_score >= 70',
onTrue: { next: 'send-to-crm' },
onFalse: { next: 'archive' },
})
.build()
)
console.log(wf.id) // 'wf_a1b2c3'
console.log(wf.version) // 1Input and output schemas
Attach JSON Schema validators to inputSchema and outputSchema. Theazo validates the run input at trigger time and the final output before marking the run complete. Invalid inputs are rejected with a 422 before any agents start.
const wf = await theazo.workflows.create(
workflow('lead-enrichment')
.inputSchema({
type: 'object',
required: ['website', 'userId'],
properties: {
website: { type: 'string', format: 'uri' },
userId: { type: 'string' },
priority: { type: 'string', enum: ['low', 'high'] },
},
})
.outputSchema({
type: 'object',
required: ['score', 'enrichedLead'],
properties: {
score: { type: 'number', minimum: 0, maximum: 100 },
enrichedLead: { type: 'object' },
},
})
// ... steps ...
.build()
)Step types
Each step has a type that controls how it executes. All step types share id, dependsOn, inputMap, and onStepFailure. There are 10 step types.
agentdefaultRuns an agent definition against a task prompt. The agent gets its own sandbox and model loop.
{
id: 'scrape',
type: 'agent',
agent: 'web-researcher',
task: 'Scrape company info from {{ input.website }}',
inputMap: { website: '$.input.website' },
}parallelFans out to multiple sub-steps simultaneously. All sub-steps must complete (or fail per policy) before downstream dependents proceed.
{
id: 'research-all',
type: 'parallel',
steps: [
{ id: 'twitter', type: 'agent', agent: 'social-agent', task: 'Twitter presence for {{ input.company }}' },
{ id: 'linkedin', type: 'agent', agent: 'social-agent', task: 'LinkedIn profile for {{ input.company }}' },
{ id: 'news', type: 'agent', agent: 'news-agent', task: 'Recent press for {{ input.company }}' },
],
}conditionBranches execution based on a JSONPath expression evaluated against the current run state. Resolves to the onTrue or onFalse branch.
{
id: 'check-score',
type: 'condition',
dependsOn: ['score'],
expression: '$.score.output.score >= 70',
onTrue: { next: 'send-to-crm' },
onFalse: { next: 'archive' },
}delayPauses execution for a fixed duration before proceeding. The agent sandbox is released during the wait — no compute cost accrues.
{
id: 'wait-24h',
type: 'delay',
duration: 86400, // seconds
dependsOn: ['send-intro-email'],
}approvalPauses the run and waits for a human approval decision via the Approvals API or dashboard. On timeout, the defaultAction applies.
{
id: 'approve-send',
type: 'approval',
action: 'send_email',
timeout: '4h',
defaultAction: 'deny',
dependsOn: ['draft-email'],
}webhookPOSTs a payload to an external URL and waits for a callback (or fires-and-forgets). Body is constructed from inputMap values.
{
id: 'notify-crm',
type: 'webhook',
url: 'https://hooks.yourapp.com/lead-created',
method: 'POST',
waitForCallback: false,
inputMap: { lead: '$.enrich.output.lead' },
headers: { 'X-Source': 'theazo' },
}transformRuns a pure JSONPath reshaping operation — no agent, no sandbox. Use to restructure data between steps without paying for compute. Transform expressions are serializable JSONB; never use JS functions here.
{
id: 'reshape',
type: 'transform',
dependsOn: ['research-all'],
output: {
twitterHandle: '$.twitter.output.handle',
linkedinUrl: '$.linkedin.output.url',
latestHeadline: '$.news.output.articles[0].title',
company: '$.input.company',
},
}mapIterates an array from a previous step and runs a sub-step for each element. Controlled concurrency via concurrency. Produces an array output in the same order as the input.
{
id: 'score-each',
type: 'map',
dependsOn: ['scrape'],
over: '$.scrape.output.leads', // JSONPath to the array
as: 'lead', // binding name inside sub-step
concurrency: 5,
step: {
id: 'score-lead',
type: 'agent',
agent: 'scorer',
task: 'Score this lead: {{ lead.company }}',
inputMap: { lead: '$.lead' },
},
}plannerDynamic DAG expansion. Runs an agent whose output is a list of new step definitions. Theazo splices those steps into the running DAG at runtime. Useful when the number or shape of tasks is not known at workflow-definition time.
{
id: 'plan',
type: 'planner',
agent: 'task-planner',
task: 'Given these {{ input.count }} leads, produce a list of research tasks',
// Agent must output: { steps: WorkflowStep[] }
// Theazo validates the schema and splices them into the DAG
dependsOn: ['pre-filter'],
}waitSuspends the run until a named external event arrives via POST /v1/workflows/runs/:id/resume. Optionally times out and routes to a fallback step. Use for long-running async integrations.
{
id: 'wait-payment',
type: 'wait',
event: 'payment.confirmed',
timeout: '48h',
onTimeout: { next: 'send-reminder' },
dependsOn: ['send-invoice'],
}Data passing with JSONPath
Steps pass data forward using inputMap. Keys become named variables available to the step. Values are JSONPath strings evaluated against the full run state. All values are serializable to JSONB — never use JavaScript functions in step definitions.
// JSONPath reference patterns
inputMap: {
leads: '$.scrape.output.leads', // array from prior step
companyName: '$.scrape.output.company.name', // nested field
rawInput: '$.input.website', // original workflow input
firstItem: '$.scrape.output.leads[0]', // array index
}Shared state
Steps can declare which keys they read and write from a shared state bag. The state bag persists across the entire run and survives hibernation. Declare stateReads and stateWrites per step so Theazo can detect conflicts and serialize concurrent writers.
const wf = await theazo.workflows.create(
workflow('pipeline')
.step({
id: 'writer',
type: 'agent',
agent: 'data-agent',
task: 'Fetch and normalize the company record',
stateWrites: ['company', 'metadata'], // keys this step writes
})
.step({
id: 'reader',
type: 'agent',
agent: 'scorer',
task: 'Score this company',
dependsOn: ['writer'],
stateReads: ['company'], // keys this step reads
})
.build()
)
// Pass initial state on run
const run = await wf.run({
userId: 'user_123',
input: { website: 'https://acme.com' },
sharedState: { retryCount: 0, campaign: 'q2-outbound' },
})Workflow policy
Attach a WorkflowPolicy to enforce tool restrictions, cost budgets, and approval gates across all steps. Policy applies to every agent launched by the workflow — individual step overrides are not supported.
const wf = await theazo.workflows.create(
workflow('secure-pipeline')
.policy({
allowTools: ['web_search', 'read_file', 'http_get'],
denyTools: ['shell_exec', 'write_file'],
requireApprovalFor: ['send_email', 'update_crm'],
maxCostPerStep: { amount: 50, currency: 'usd' }, // $0.50 per step
maxTotalCost: { amount: 500, currency: 'usd' }, // $5.00 per run
})
// ... steps ...
.build()
)If a step exceeds maxCostPerStep, it is paused and a workflow.step.cost_exceeded webhook fires. If the run total exceeds maxTotalCost, the entire run is cancelled.
Per-step error routing
By default a failing step stops the run (respecting the top-level onFailure policy). Override per step with onStepFailure to route to a recovery step instead.
{
id: 'enrich',
type: 'agent',
agent: 'data-enricher',
dependsOn: ['scrape'],
task: 'Enrich lead from external APIs',
onStepFailure: { action: 'goto', target: 'enrich-fallback' },
},
{
id: 'enrich-fallback',
type: 'agent',
agent: 'data-enricher-lite',
task: 'Minimal enrichment using only public data',
inputMap: { company: '$.scrape.output.company' },
}Running a workflow
const run = await wf.run({
userId: 'user_123',
input: {
website: 'https://acme.com',
priority: 'high',
},
idempotencyKey: 'lead-acme-2026-06-05', // safe to retry — won't double-run
})
console.log(run.id) // 'wfrun_abc123'
console.log(run.status) // 'running'idempotencyKey is scoped per platform. Submitting the same key within 24 hours returns the existing run rather than starting a new one. Use it to safely retry on network errors.Checking run status
const run = await theazo.workflows.getRun('wfrun_abc123')
console.log(run.status) // 'completed' | 'running' | 'failed' | 'partial' | 'cancelled'
console.log(run.steps) // per-step: { id, status, cost, output, startedAt, completedAt }
console.log(run.cost) // { amount: 2340, currency: 'usd' }
console.log(run.duration) // 184 (seconds)
console.log(run.canRetryFrom) // 'enrich' — resume from this step on partial failurePartial results
When a non-critical step fails and the run continues, the run status becomes partial. The canRetryFrom field names the first failed step so you can re-run from that point without re-executing completed steps.
if (run.status === 'partial') {
// Resume from the failed step — completed steps are not re-run
const resumed = await theazo.workflows.retryFrom(run.id, run.canRetryFrom!)
console.log(resumed.id) // new run ID, shares upstream step outputs
}Cancelling a run
await theazo.workflows.cancelRun('wfrun_abc123')
// In-progress agents are paused; partial results preservedSSE streaming
Subscribe to a run's live event stream using theazo.workflows.streamRun(runId). Events are Server-Sent Events delivered as an AsyncIterable<StreamEvent>.
for await (const event of theazo.workflows.streamRun('wfrun_abc123')) {
switch (event.type) {
case 'step.started':
console.log('Started:', event.stepId)
break
case 'step.completed':
console.log('Done:', event.stepId, '— cost', event.cost)
break
case 'step.failed':
console.error('Failed:', event.stepId, event.error)
break
case 'run.completed':
console.log('Workflow done. Total cost:', event.totalCost)
break
case 'run.partial':
console.warn('Partial result. Retry from:', event.canRetryFrom)
break
case 'approval.pending':
console.log('Waiting for approval on:', event.action)
break
}
}Cost estimation
Before running an expensive workflow, call theazo.workflows.estimate() with a sample input. Theazo runs a dry-pass — resolving the DAG, counting expected steps and agent calls — and returns a cost estimate without executing any agents.
const estimate = await theazo.workflows.estimate('wf_a1b2c3', {
input: { website: 'https://acme.com', priority: 'high' },
})
console.log(estimate.minCost) // { amount: 80, currency: 'usd' }
console.log(estimate.maxCost) // { amount: 420, currency: 'usd' }
console.log(estimate.steps) // per-step estimate breakdownVersioning
Every call to theazo.workflows.update() increments the version integer. In-flight runs always execute against the version they started on. New runs pick up the latest version automatically. You can pin a run to a specific version using the version field on .run().
// Update the workflow definition — version increments atomically
const updated = await theazo.workflows.update('wf_a1b2c3', {
steps: [ /* new step list */ ],
})
console.log(updated.version) // 2
// Pin a run to an older version
const run = await wf.run({
userId: 'user_123',
input: { website: 'https://acme.com' },
version: 1, // use the previous definition
})Complete example
A full lead pipeline: parallel research, data transform, map-score each lead, conditional routing, and approval gate before CRM push.
import { Theazo, workflow } from 'theazo'
const theazo = new Theazo({ apiKey: 'th_live_...' })
const wf = await theazo.workflows.create(
workflow('lead-pipeline')
.inputSchema({
type: 'object',
required: ['company', 'website'],
properties: {
company: { type: 'string' },
website: { type: 'string', format: 'uri' },
},
})
.policy({
allowTools: ['web_search', 'http_get'],
requireApprovalFor: ['update_crm'],
maxTotalCost: { amount: 1000, currency: 'usd' },
})
// Step 1 — parallel social + news research
.step({
id: 'research',
type: 'parallel',
steps: [
{ id: 'twitter', type: 'agent', agent: 'social-agent', task: 'Twitter for {{ input.company }}' },
{ id: 'linkedin', type: 'agent', agent: 'social-agent', task: 'LinkedIn for {{ input.company }}' },
{ id: 'news', type: 'agent', agent: 'news-agent', task: 'Recent news for {{ input.company }}' },
],
})
// Step 2 — transform into a clean shape (no agent, no cost)
.step({
id: 'reshape',
type: 'transform',
dependsOn: ['research'],
output: {
twitter: '$.twitter.output',
linkedin: '$.linkedin.output',
news: '$.news.output.articles',
company: '$.input.company',
},
})
// Step 3 — score each news article in parallel (concurrency=3)
.step({
id: 'score-news',
type: 'map',
dependsOn: ['reshape'],
over: '$.reshape.output.news',
as: 'article',
concurrency: 3,
step: {
id: 'score-article',
type: 'agent',
agent: 'relevance-scorer',
task: 'Rate relevance of this article for ICP fit: {{ article.title }}',
inputMap: { article: '$.article' },
},
})
// Step 4 — final scoring with all signals
.step({
id: 'score',
type: 'agent',
agent: 'lead-scorer',
dependsOn: ['score-news'],
task: 'Score this lead 0-100 for ICP fit',
inputMap: {
twitter: '$.reshape.output.twitter',
linkedin: '$.reshape.output.linkedin',
newsScores: '$.score-news.output',
company: '$.input.company',
},
onStepFailure: { action: 'goto', target: 'fallback-score' },
})
// Step 4b — fallback if scorer fails
.step({
id: 'fallback-score',
type: 'agent',
agent: 'lead-scorer-lite',
task: 'Minimal score based only on LinkedIn data',
inputMap: { linkedin: '$.reshape.output.linkedin' },
})
// Step 5 — conditional routing
.condition({
id: 'route',
dependsOn: ['score'],
expression: '$.score.output.score >= 70',
onTrue: { next: 'approve-crm' },
onFalse: { next: 'archive' },
})
// Step 6a — high score → approval gate before CRM
.step({
id: 'approve-crm',
type: 'approval',
action: 'update_crm',
timeout: '4h',
defaultAction: 'deny',
})
// Step 6b — low score → archive
.step({
id: 'archive',
type: 'agent',
agent: 'archiver',
task: 'Archive this lead with ICP miss reason',
inputMap: { lead: '$.reshape.output', score: '$.score.output' },
})
.build()
)
// Estimate cost before running
const estimate = await theazo.workflows.estimate(wf.id, {
input: { company: 'Stripe', website: 'https://stripe.com' },
})
console.log('Estimated cost:', estimate.maxCost) // { amount: 420, currency: 'usd' }
// Run it
const run = await wf.run({
userId: 'user_456',
input: { company: 'Stripe', website: 'https://stripe.com' },
idempotencyKey: 'lead-stripe-2026-06-05',
})
// Stream live events
for await (const event of theazo.workflows.streamRun(run.id)) {
if (event.type === 'step.completed') {
console.log(event.stepId, 'done — cost:', event.cost)
}
if (event.type === 'run.completed') {
console.log('Total cost:', event.totalCost)
break
}
}API reference
/v1/workflowsCreate a workflow definition. Returns the workflow with version=1.
Parameters
namestringrequiredHuman-readable workflow name.stepsWorkflowStep[]requiredArray of step definitions. Must form a valid DAG.inputSchemaJSONSchemaJSON Schema for the run input. Validated at trigger time.outputSchemaJSONSchemaJSON Schema for the final output.policyWorkflowPolicyTool allow/deny lists, approval gates, cost caps.onFailurestring'stop' | 'continue' | 'retry'. Default: 'stop'.retriesnumberNumber of retries on failure. Default: 0.timeoutnumberMax run duration in seconds.Response
WorkflowExample
curl -X POST https://api.theazo.com/v1/workflows \
-H "Authorization: Bearer th_live_..." \
-H "Content-Type: application/json" \
-d '{
"name": "lead-enrichment",
"steps": [{ "id": "scrape", "type": "agent", "agent": "web-researcher", "task": "..." }]
}'/v1/workflowsList all workflow definitions for this platform. Ordered by createdAt descending.
Response
Workflow[]Example
curl https://api.theazo.com/v1/workflows \
-H "Authorization: Bearer th_live_..."/v1/workflows/:idFetch a single workflow definition by ID.
Response
WorkflowExample
curl https://api.theazo.com/v1/workflows/wf_a1b2c3 \
-H "Authorization: Bearer th_live_..."/v1/workflows/:idUpdate a workflow definition. Increments version atomically. In-flight runs are unaffected.
Parameters
stepsWorkflowStep[]New step list. Replaces the existing steps entirely.policyWorkflowPolicyUpdated policy.namestringRename the workflow.Response
WorkflowExample
curl -X PATCH https://api.theazo.com/v1/workflows/wf_a1b2c3 \
-H "Authorization: Bearer th_live_..." \
-H "Content-Type: application/json" \
-d '{ "name": "lead-enrichment-v2" }'/v1/workflows/:idDelete a workflow definition. Does not cancel in-progress runs.
Response
voidExample
curl -X DELETE https://api.theazo.com/v1/workflows/wf_a1b2c3 \
-H "Authorization: Bearer th_live_..."/v1/workflows/:id/runsStart a new run of a workflow. Returns 202 with the run object. Execution is async.
Parameters
userIdstringrequiredEnd-user ID scoping this run.inputobjectrequiredRun input. Validated against inputSchema if defined.idempotencyKeystringDeduplicate retries. Scoped per platform, 24h TTL.versionnumberPin run to a specific workflow version. Defaults to latest.sharedStateobjectInitial shared state bag available to all steps.Response
WorkflowRunExample
curl -X POST https://api.theazo.com/v1/workflows/wf_a1b2c3/runs \
-H "Authorization: Bearer th_live_..." \
-H "Content-Type: application/json" \
-d '{
"userId": "user_123",
"input": { "website": "https://acme.com" },
"idempotencyKey": "lead-acme-2026-06-05"
}'/v1/workflows/:id/runsList all runs for a workflow. Ordered by createdAt descending.
Parameters
statusstringFilter by status: running | completed | failed | partial | cancelled.limitnumberMax results to return. Default 50.Response
WorkflowRun[]Example
curl "https://api.theazo.com/v1/workflows/wf_a1b2c3/runs?status=completed&limit=20" \
-H "Authorization: Bearer th_live_..."/v1/workflows/runs/:runIdFetch current status, per-step results, and cost for a run.
Response
WorkflowRunExample
curl https://api.theazo.com/v1/workflows/runs/wfrun_abc123 \
-H "Authorization: Bearer th_live_..."/v1/workflows/runs/:runId/streamSSE stream of run events. Returns text/event-stream. Events: step.started, step.completed, step.failed, approval.pending, run.completed, run.partial.
Response
text/event-streamExample
curl https://api.theazo.com/v1/workflows/runs/wfrun_abc123/stream \
-H "Authorization: Bearer th_live_..." \
-H "Accept: text/event-stream"/v1/workflows/runs/:runId/cancelCancel an in-progress run. In-flight agents are paused, partial results preserved.
Response
WorkflowRunExample
curl -X POST https://api.theazo.com/v1/workflows/runs/wfrun_abc123/cancel \
-H "Authorization: Bearer th_live_..."/v1/workflows/runs/:runId/resumeResume a run paused by a wait step. Supply the event name and optional payload.
Parameters
eventstringrequiredEvent name matching the wait step's event field.payloadobjectData to inject into the run state as the event payload.Response
WorkflowRunExample
curl -X POST https://api.theazo.com/v1/workflows/runs/wfrun_abc123/resume \
-H "Authorization: Bearer th_live_..." \
-H "Content-Type: application/json" \
-d '{ "event": "payment.confirmed", "payload": { "txId": "txn_999" } }'/v1/workflows/:id/estimateDry-run cost estimation. No agents are executed. Returns min/max cost and per-step breakdown.
Parameters
inputobjectrequiredSample input used to resolve the DAG shape.Response
WorkflowEstimateExample
curl -X POST https://api.theazo.com/v1/workflows/wf_a1b2c3/estimate \
-H "Authorization: Bearer th_live_..." \
-H "Content-Type: application/json" \
-d '{ "input": { "website": "https://acme.com" } }'SDK method reference
workflow(name)WorkflowBuilderCreate a typed builder. Chain .step(), .condition(), .transform(), .policy(), .inputSchema(), .outputSchema(), then .build().theazo.workflows.create(opts)Promise<Workflow>Create a workflow definition. Accepts the output of workflow().build() or a raw WorkflowCreateOpts.theazo.workflows.list()Promise<Workflow[]>List all workflow definitions for this platform.theazo.workflows.get(workflowId)Promise<Workflow>Fetch a workflow definition by ID.theazo.workflows.update(workflowId, opts)Promise<Workflow>Update steps, policy, or name. Version increments atomically.theazo.workflows.delete(workflowId)Promise<void>Delete a workflow definition. Does not cancel in-progress runs.theazo.workflows.estimate(workflowId, { input })Promise<WorkflowEstimate>Dry-run cost estimate. No agents are executed.workflow.run({ userId, input, idempotencyKey?, version?, sharedState? })Promise<WorkflowRun>Start a new run. Async — returns 202 immediately.theazo.workflows.getRun(runId)Promise<WorkflowRun>Fetch current status, per-step results, and total cost.theazo.workflows.listRuns(workflowId, opts?)Promise<WorkflowRun[]>List runs for a workflow, newest first. Filterable by status.theazo.workflows.cancelRun(runId)Promise<void>Cancel an in-progress run. In-flight agents are paused.theazo.workflows.retryFrom(runId, stepId)Promise<WorkflowRun>Resume a partial run from a named step. Completed upstream steps are not re-run.theazo.workflows.streamRun(runId)AsyncIterable<StreamEvent>SSE stream of run events: step.started, step.completed, step.failed, approval.pending, run.completed, run.partial.