Workflows
Multi-agent pipelines with DAG execution. Chain agents together, run branches in parallel, wait for human approvals, and pass data between steps using JSONPath.
primitivesEnabled: true on your platform config. They are available on Cloud, Team, and Enterprise plans.Creating a workflow
Define a workflow once. Run it many times with different inputs. Steps are executed as a DAG — independent steps run in parallel automatically.
import { Theazo } from class="cb-str">'theazo'
const theazo = new Theazo({ apiKey: class="cb-str">'th_live_...' })
const workflow = await theazo.workflows.create({
name: class="cb-str">'lead-enrichment',
onFailure: class="cb-str">'stop', class="cb-cmt">// 'stop' | 'continue' | 'retry'
retries: class="cb-num">2,
timeout: class="cb-num">3600, class="cb-cmt">// seconds
steps: [
{
id: class="cb-str">'scrape',
type: class="cb-str">'agent',
agent: class="cb-str">'web-researcher',
task: class="cb-str">'Scrape company info from {{ input.website }}',
},
{
id: class="cb-str">'enrich',
type: class="cb-str">'agent',
agent: class="cb-str">'data-enricher',
dependsOn: [class="cb-str">'scrape'],
task: class="cb-str">'Enrich lead data with LinkedIn and Clearbit',
inputMap: {
companyData: class="cb-str">'$.scrape.output.company',
rawHtml: class="cb-str">'$.scrape.output.html',
},
},
{
id: class="cb-str">'score',
type: class="cb-str">'agent',
agent: class="cb-str">'lead-scorer',
dependsOn: [class="cb-str">'enrich'],
task: class="cb-str">'Score this lead 0-100 based on ICP fit',
inputMap: {
enrichedLead: class="cb-str">'$.enrich.output.lead',
},
},
],
})Step types
Each step has a type that controls how it executes.
agentdefaultRuns an agent. Requires agent and task fields.
parallelFans out to multiple sub-steps simultaneously. All must complete before dependents proceed.
{
id: class="cb-str">'research-all',
type: class="cb-str">'parallel',
steps: [
{ id: class="cb-str">'research-twitter', type: class="cb-str">'agent', agent: class="cb-str">'twitter-agent', task: class="cb-str">'...' },
{ id: class="cb-str">'research-linkedin', type: class="cb-str">'agent', agent: class="cb-str">'linkedin-agent', task: class="cb-str">'...' },
{ id: class="cb-str">'research-news', type: class="cb-str">'agent', agent: class="cb-str">'news-agent', task: class="cb-str">'...' },
],
}conditionBranches based on a JSONPath expression. Evaluates to truthy/falsy.
{
id: class="cb-str">'check-score',
type: class="cb-str">'condition',
expression: class="cb-str">'$.score.output.score >= 70',
dependsOn: [class="cb-str">'score'],
onTrue: { next: class="cb-str">'send-to-crm' },
onFalse: { next: class="cb-str">'archive' },
}delayPauses execution for a fixed duration before proceeding.
{
id: class="cb-str">'wait-24h',
type: class="cb-str">'delay',
duration: class="cb-num">86400, class="cb-cmt">// seconds
dependsOn: [class="cb-str">'send-intro-email'],
}approvalPauses and waits for human approval before continuing. Integrates with the Approvals API.
{
id: class="cb-str">'approve-send',
type: class="cb-str">'approval',
action: class="cb-str">'send_email',
timeout: class="cb-str">'24h',
defaultAction: class="cb-str">'deny',
dependsOn: [class="cb-str">'draft-email'],
}Data passing with JSONPath
Steps pass data forward using inputMap. Keys become variables available to the step. Values are JSONPath expressions evaluated against the full run state.
// JSONPath syntax: $.{stepId}.output.{field}
inputMap: {
leads: class="cb-str">'$.scrape.output.leads', class="cb-cmt">// array from scrape step
companyName: class="cb-str">'$.scrape.output.company.name', class="cb-cmt">// nested field
rawInput: class="cb-str">'$.input.website', class="cb-cmt">// original workflow input
}All inputMap values are JSONPath strings — they serialize to JSONB in the database and are safe to store. Never use JavaScript functions in step definitions.
Running a workflow
const run = await workflow.run({
userId: class="cb-str">'user_123',
input: {
website: class="cb-str">'https://acme.com'="cb-cmt">//acme.com',
priority: class="cb-str">'high',
},
})
console.log(run.id) class="cb-cmt">// 'wfrun_abc123'
console.log(run.status) class="cb-cmt">// 'running'Checking run status
const run = await theazo.workflows.getRun(class="cb-str">'wfrun_abc123')
console.log(run.status) class="cb-cmt">// 'completed' | 'running' | 'failed' | 'cancelled'
console.log(run.steps) class="cb-cmt">// per-step status, cost, output
console.log(run.cost) class="cb-cmt">// { amount: 2340, currency: 'usd' }
console.log(run.duration) class="cb-cmt">// 184 (seconds)Cancelling a run
await theazo.workflows.cancelRun(class="cb-str">'wfrun_abc123')
// Any in-progress agents are paused; partial results preservedComplete example
A full lead enrichment pipeline: scrape in parallel, merge, score, conditionally route to CRM or archive.
import { Theazo } from class="cb-str">'theazo'
const theazo = new Theazo({ apiKey: class="cb-str">'th_live_...' })
const workflow = await theazo.workflows.create({
name: class="cb-str">'lead-pipeline',
onFailure: class="cb-str">'stop',
retries: class="cb-num">1,
timeout: class="cb-num">7200,
steps: [
class="cb-cmt">// Step 1: parallel research across three sources
{
id: class="cb-str">'research',
type: class="cb-str">'parallel',
steps: [
{ id: class="cb-str">'twitter', type: class="cb-str">'agent', agent: class="cb-str">'social-agent', task: class="cb-str">'Find Twitter presence for {{ input.company }}' },
{ id: class="cb-str">'linkedin', type: class="cb-str">'agent', agent: class="cb-str">'social-agent', task: class="cb-str">'Find LinkedIn profile for {{ input.company }}' },
{ id: class="cb-str">'news', type: class="cb-str">'agent', agent: class="cb-str">'news-agent', task: class="cb-str">'Find recent news for {{ input.company }}' },
],
},
class="cb-cmt">// Step 2: merge all research into one enriched record
{
id: class="cb-str">'merge',
type: class="cb-str">'agent',
agent: class="cb-str">'data-merger',
dependsOn: [class="cb-str">'research'],
task: class="cb-str">'Merge all research into a single enriched lead record',
inputMap: {
twitter: class="cb-str">'$.twitter.output',
linkedin: class="cb-str">'$.linkedin.output',
news: class="cb-str">'$.news.output',
company: class="cb-str">'$.input.company',
},
},
class="cb-cmt">// Step 3: score the lead
{
id: class="cb-str">'score',
type: class="cb-str">'agent',
agent: class="cb-str">'scorer',
dependsOn: [class="cb-str">'merge'],
task: class="cb-str">'Score this lead 0-100 for ICP fit. Output JSON with score and reasoning.',
inputMap: { lead: class="cb-str">'$.merge.output.lead' },
},
class="cb-cmt">// Step 4: route based on score
{
id: class="cb-str">'route',
type: class="cb-str">'condition',
dependsOn: [class="cb-str">'score'],
expression: class="cb-str">'$.score.output.score >= 70',
onTrue: { next: class="cb-str">'send-to-crm' },
onFalse: { next: class="cb-str">'archive' },
},
class="cb-cmt">// Step 5a: high-score → push to CRM with approval
{
id: class="cb-str">'send-to-crm',
type: class="cb-str">'approval',
action: class="cb-str">'update_crm',
timeout: class="cb-str">'4h',
defaultAction: class="cb-str">'deny',
},
class="cb-cmt">// Step 5b: low-score → archive
{
id: class="cb-str">'archive',
type: class="cb-str">'agent',
agent: class="cb-str">'archiver',
task: class="cb-str">'Archive this lead with reason for future reference',
inputMap: { lead: class="cb-str">'$.merge.output.lead', score: class="cb-str">'$.score.output' },
},
],
})
// Run it for a specific user input
const run = await workflow.run({
userId: class="cb-str">'user_456',
input: { company: class="cb-str">'Stripe', website: class="cb-str">'https://stripe.com'cb-cmt">//stripe.com' },
})
// Poll for completion
while (run.status === class="cb-str">'running') {
await new Promise(r => setTimeout(r, class="cb-num">5000))
const updated = await theazo.workflows.getRun(run.id)
console.log(class="cb-str">'Steps done:', updated.steps.filter(s => s.status === class="cb-str">'completed').length)
}
const final = await theazo.workflows.getRun(run.id)
console.log(class="cb-str">'Status:', final.status)
console.log(class="cb-str">'Total cost:', final.cost) class="cb-cmt">// { amount: 3820, currency: 'usd' }API reference
theazo.workflows.create(config)Promise<Workflow>Define a new workflow. Returns a Workflow object you can call .run() on.theazo.workflows.list()Promise<Workflow[]>List all workflow definitions for this platform.theazo.workflows.get(workflowId)Promise<Workflow>Fetch a workflow definition by ID.theazo.workflows.delete(workflowId)Promise<void>Delete a workflow definition. Does not affect in-progress runs.workflow.run({ userId, input })Promise<WorkflowRun>Start a new run of this workflow.theazo.workflows.getRun(runId)Promise<WorkflowRun>Fetch current status and step results for a run.theazo.workflows.listRuns(workflowId)Promise<WorkflowRun[]>List all runs for a given workflow, newest first.theazo.workflows.cancelRun(runId)Promise<void>Cancel an in-progress run. In-flight agents are paused.