Get Started

Workflows

Multi-agent pipelines with DAG execution. Chain agents together, run branches in parallel, wait for human approvals, and pass data between steps using JSONPath.

Workflows require primitivesEnabled: true on your platform config. They are available on Cloud, Team, and Enterprise plans.

Creating a workflow

Define a workflow once. Run it many times with different inputs. Steps are executed as a DAG — independent steps run in parallel automatically.

workflow.ts
import { Theazo } from class="cb-str">'theazo'

const theazo = new Theazo({ apiKey: class="cb-str">'th_live_...' })

const workflow = await theazo.workflows.create({
  name: class="cb-str">'lead-enrichment',
  onFailure: class="cb-str">'stop',   class="cb-cmt">// 'stop' | 'continue' | 'retry'
  retries: class="cb-num">2,
  timeout: class="cb-num">3600,       class="cb-cmt">// seconds

  steps: [
    {
      id: class="cb-str">'scrape',
      type: class="cb-str">'agent',
      agent: class="cb-str">'web-researcher',
      task: class="cb-str">'Scrape company info from {{ input.website }}',
    },
    {
      id: class="cb-str">'enrich',
      type: class="cb-str">'agent',
      agent: class="cb-str">'data-enricher',
      dependsOn: [class="cb-str">'scrape'],
      task: class="cb-str">'Enrich lead data with LinkedIn and Clearbit',
      inputMap: {
        companyData: class="cb-str">'$.scrape.output.company',
        rawHtml:     class="cb-str">'$.scrape.output.html',
      },
    },
    {
      id: class="cb-str">'score',
      type: class="cb-str">'agent',
      agent: class="cb-str">'lead-scorer',
      dependsOn: [class="cb-str">'enrich'],
      task: class="cb-str">'Score this lead 0-100 based on ICP fit',
      inputMap: {
        enrichedLead: class="cb-str">'$.enrich.output.lead',
      },
    },
  ],
})

Step types

Each step has a type that controls how it executes.

agentdefault

Runs an agent. Requires agent and task fields.

parallel

Fans out to multiple sub-steps simultaneously. All must complete before dependents proceed.

{
  id: class="cb-str">'research-all',
  type: class="cb-str">'parallel',
  steps: [
    { id: class="cb-str">'research-twitter', type: class="cb-str">'agent', agent: class="cb-str">'twitter-agent', task: class="cb-str">'...' },
    { id: class="cb-str">'research-linkedin', type: class="cb-str">'agent', agent: class="cb-str">'linkedin-agent', task: class="cb-str">'...' },
    { id: class="cb-str">'research-news', type: class="cb-str">'agent', agent: class="cb-str">'news-agent', task: class="cb-str">'...' },
  ],
}
condition

Branches based on a JSONPath expression. Evaluates to truthy/falsy.

{
  id: class="cb-str">'check-score',
  type: class="cb-str">'condition',
  expression: class="cb-str">'$.score.output.score >= 70',
  dependsOn: [class="cb-str">'score'],
  onTrue:  { next: class="cb-str">'send-to-crm' },
  onFalse: { next: class="cb-str">'archive' },
}
delay

Pauses execution for a fixed duration before proceeding.

{
  id: class="cb-str">'wait-24h',
  type: class="cb-str">'delay',
  duration: class="cb-num">86400,   class="cb-cmt">// seconds
  dependsOn: [class="cb-str">'send-intro-email'],
}
approval

Pauses and waits for human approval before continuing. Integrates with the Approvals API.

{
  id: class="cb-str">'approve-send',
  type: class="cb-str">'approval',
  action: class="cb-str">'send_email',
  timeout: class="cb-str">'24h',
  defaultAction: class="cb-str">'deny',
  dependsOn: [class="cb-str">'draft-email'],
}

Data passing with JSONPath

Steps pass data forward using inputMap. Keys become variables available to the step. Values are JSONPath expressions evaluated against the full run state.

// JSONPath syntax: $.{stepId}.output.{field}
inputMap: {
  leads:       class="cb-str">'$.scrape.output.leads',          class="cb-cmt">// array from scrape step
  companyName: class="cb-str">'$.scrape.output.company.name',   class="cb-cmt">// nested field
  rawInput:    class="cb-str">'$.input.website',                class="cb-cmt">// original workflow input
}

All inputMap values are JSONPath strings — they serialize to JSONB in the database and are safe to store. Never use JavaScript functions in step definitions.

Running a workflow

run.ts
const run = await workflow.run({
  userId: class="cb-str">'user_123',
  input: {
    website: class="cb-str">'https://acme.com'="cb-cmt">//acme.com',
    priority: class="cb-str">'high',
  },
})

console.log(run.id)      class="cb-cmt">// 'wfrun_abc123'
console.log(run.status)  class="cb-cmt">// 'running'

Checking run status

const run = await theazo.workflows.getRun(class="cb-str">'wfrun_abc123')

console.log(run.status)   class="cb-cmt">// 'completed' | 'running' | 'failed' | 'cancelled'
console.log(run.steps)    class="cb-cmt">// per-step status, cost, output
console.log(run.cost)     class="cb-cmt">// { amount: 2340, currency: 'usd' }
console.log(run.duration) class="cb-cmt">// 184 (seconds)

Cancelling a run

await theazo.workflows.cancelRun(class="cb-str">'wfrun_abc123')
// Any in-progress agents are paused; partial results preserved

Complete example

A full lead enrichment pipeline: scrape in parallel, merge, score, conditionally route to CRM or archive.

lead-pipeline.ts
import { Theazo } from class="cb-str">'theazo'

const theazo = new Theazo({ apiKey: class="cb-str">'th_live_...' })

const workflow = await theazo.workflows.create({
  name: class="cb-str">'lead-pipeline',
  onFailure: class="cb-str">'stop',
  retries: class="cb-num">1,
  timeout: class="cb-num">7200,

  steps: [
    class="cb-cmt">// Step 1: parallel research across three sources
    {
      id: class="cb-str">'research',
      type: class="cb-str">'parallel',
      steps: [
        { id: class="cb-str">'twitter',  type: class="cb-str">'agent', agent: class="cb-str">'social-agent',   task: class="cb-str">'Find Twitter presence for {{ input.company }}' },
        { id: class="cb-str">'linkedin', type: class="cb-str">'agent', agent: class="cb-str">'social-agent',   task: class="cb-str">'Find LinkedIn profile for {{ input.company }}' },
        { id: class="cb-str">'news',     type: class="cb-str">'agent', agent: class="cb-str">'news-agent',     task: class="cb-str">'Find recent news for {{ input.company }}' },
      ],
    },

    class="cb-cmt">// Step 2: merge all research into one enriched record
    {
      id: class="cb-str">'merge',
      type: class="cb-str">'agent',
      agent: class="cb-str">'data-merger',
      dependsOn: [class="cb-str">'research'],
      task: class="cb-str">'Merge all research into a single enriched lead record',
      inputMap: {
        twitter:  class="cb-str">'$.twitter.output',
        linkedin: class="cb-str">'$.linkedin.output',
        news:     class="cb-str">'$.news.output',
        company:  class="cb-str">'$.input.company',
      },
    },

    class="cb-cmt">// Step 3: score the lead
    {
      id: class="cb-str">'score',
      type: class="cb-str">'agent',
      agent: class="cb-str">'scorer',
      dependsOn: [class="cb-str">'merge'],
      task: class="cb-str">'Score this lead 0-100 for ICP fit. Output JSON with score and reasoning.',
      inputMap: { lead: class="cb-str">'$.merge.output.lead' },
    },

    class="cb-cmt">// Step 4: route based on score
    {
      id: class="cb-str">'route',
      type: class="cb-str">'condition',
      dependsOn: [class="cb-str">'score'],
      expression: class="cb-str">'$.score.output.score >= 70',
      onTrue:  { next: class="cb-str">'send-to-crm' },
      onFalse: { next: class="cb-str">'archive' },
    },

    class="cb-cmt">// Step 5a: high-score → push to CRM with approval
    {
      id: class="cb-str">'send-to-crm',
      type: class="cb-str">'approval',
      action: class="cb-str">'update_crm',
      timeout: class="cb-str">'4h',
      defaultAction: class="cb-str">'deny',
    },

    class="cb-cmt">// Step 5b: low-score → archive
    {
      id: class="cb-str">'archive',
      type: class="cb-str">'agent',
      agent: class="cb-str">'archiver',
      task: class="cb-str">'Archive this lead with reason for future reference',
      inputMap: { lead: class="cb-str">'$.merge.output.lead', score: class="cb-str">'$.score.output' },
    },
  ],
})

// Run it for a specific user input
const run = await workflow.run({
  userId: class="cb-str">'user_456',
  input: { company: class="cb-str">'Stripe', website: class="cb-str">'https://stripe.com'cb-cmt">//stripe.com' },
})

// Poll for completion
while (run.status === class="cb-str">'running') {
  await new Promise(r => setTimeout(r, class="cb-num">5000))
  const updated = await theazo.workflows.getRun(run.id)
  console.log(class="cb-str">'Steps done:', updated.steps.filter(s => s.status === class="cb-str">'completed').length)
}

const final = await theazo.workflows.getRun(run.id)
console.log(class="cb-str">'Status:', final.status)
console.log(class="cb-str">'Total cost:', final.cost)  class="cb-cmt">// { amount: 3820, currency: 'usd' }

API reference

theazo.workflows.create(config)Promise<Workflow>Define a new workflow. Returns a Workflow object you can call .run() on.
theazo.workflows.list()Promise<Workflow[]>List all workflow definitions for this platform.
theazo.workflows.get(workflowId)Promise<Workflow>Fetch a workflow definition by ID.
theazo.workflows.delete(workflowId)Promise<void>Delete a workflow definition. Does not affect in-progress runs.
workflow.run({ userId, input })Promise<WorkflowRun>Start a new run of this workflow.
theazo.workflows.getRun(runId)Promise<WorkflowRun>Fetch current status and step results for a run.
theazo.workflows.listRuns(workflowId)Promise<WorkflowRun[]>List all runs for a given workflow, newest first.
theazo.workflows.cancelRun(runId)Promise<void>Cancel an in-progress run. In-flight agents are paused.
Was this page helpful?
Ask anything...⌘I