@async/pipeline

API Reference

This is the first public API surface for @async/pipeline.

Imports

Use the public package for normal authoring:

import { cache, defineCache, definePipeline, dependsOn, env, fileCache, job, memoryCache, redisCache, sh, source, task, trigger } from "@async/pipeline";

Subpaths are available for advanced use:

import { definePipeline } from "@async/pipeline/core";
import { runJob } from "@async/pipeline/node";
import { LimaCommandExecutor } from "@async/pipeline/lima";
import { createRuntime, defineRuntime } from "@async/pipeline/runtime";

definePipeline

definePipeline({
  name: "app",
  env: {},
  cache: "file:local",
  namedInputs: {},
  taskDefaults: {},
  triggers: {},
  sync: {},
  sources: {},
  tasks: {},
  jobs: {}
});

Fields:

Field Purpose
name Pipeline name written into execution records.
env Runtime environment inherited by every job. Values can be literals, env.secret(...), or env.var(...).
cache Optional cache registry or default cache ref. Built-in stores are file and memory.
namedInputs Reusable input groups referenced by task inputs.
taskDefaults Defaults applied by exact task id or task name segment.
triggers Named trigger declarations.
sync Generated files that should stay current.
sources Explicit local or git repos whose pipeline can be composed into this graph.
tasks Task map.
jobs Job map.

Pipeline definitions are metadata. Importing a pipeline, calling definePipeline, using directives, or reading metadata does not execute tasks, open cache connections, start cron, clone repos, or evaluate function steps.

Unknown fields in the pipeline, tasks, taskDefaults, jobs, or a job’s github config are rejected with ASYNC_PIPELINE_UNKNOWN_FIELD, so a typo such as timout fails loudly instead of silently changing behavior. Fields that are accepted but only declare metadata are documented as such on this page.

env

env is the runtime process environment for a pipeline job. Pipeline-level env is inherited by every job. Job-level env overrides pipeline-level env by key.

import { definePipeline, env, job, sh, task } from "@async/pipeline";

export default definePipeline({
  name: "app",
  env: {
    NODE_ENV: env.var("NODE_ENV", { default: "dev" })
  },
  tasks: {
    deploy: task({
      run: sh`deploy --target "$API_URL"`
    })
  },
  jobs: {
    deploy: job({
      target: "deploy",
      env: {
        API_URL: env.var("NODE_ENV", {
          prod: "https://api.example.com",
          dev: "http://localhost:3000"
        }, {
          default: "dev"
        }),
        NODE_AUTH_TOKEN: env.secret("NPM_TOKEN")
      }
    })
  }
});

Env values:

Value Runtime behavior
"literal" Uses the literal string.
env.secret("NAME") Reads a secret source. Locally this is process.env.NAME; generated GitHub Actions renders $ into the destination env key.
env.var("NAME") Reads a variable source. Locally this is process.env.NAME; generated GitHub Actions renders $ into the destination env key.
env.var("NAME", { default: "dev" }) Reads NAME, or uses the default when missing.
env.var("NAME", { prod, dev }, { default }) Reads NAME, optionally defaults the selector, then maps it to a runtime value.

Missing secrets, missing vars without defaults, and unmapped variable values fail before the task command runs. Error messages name the env key and source, but do not print secret values.

Generated GitHub Actions uses environment and requires for portable job metadata. Runtime env still belongs in env:

job({
  target: "publish",
  environment: {
    name: "npm-publish",
    url: "https://www.npmjs.com/package/@async/pipeline"
  },
  requires: {
    provenance: true
  },
  env: {
    NODE_AUTH_TOKEN: env.secret("NPM_TOKEN")
  }
});

The generated workflow renders:

environment: "npm-publish"
permissions:
  contents: read
  id-token: write
steps:
  - name: Run pipeline job
    run: pnpm async-pipeline run publish
    env:
      CI: true
      NODE_AUTH_TOKEN: $

Local tests can mock the same job without GitHub Actions by passing an env into runJob(...):

import assert from "node:assert/strict";
import { runJob } from "@async/pipeline/node";
import pipeline from "../pipeline.js";

const record = await runJob(pipeline, {
  id: "publish",
  mode: "ci",
  cwd: process.cwd(),
  env: {
    ...process.env,
    NPM_TOKEN: "fake-token"
  }
});

assert.equal(record.status, "passed");

To test the already-rendered GitHub shape, set the destination key instead:

process.env.NODE_AUTH_TOKEN = "fake-token";

For env: { NODE_AUTH_TOKEN: env.secret("NPM_TOKEN") }, the runner accepts either the source key (NPM_TOKEN) or the rendered destination key (NODE_AUTH_TOKEN). This lets tests cover the same runtime step locally and in CI.

The core model is:

tasks     = what can run
jobs      = named entrypoints
triggers  = when jobs should run
sync      = generated files to keep current

Triggers describe when jobs should run. Sync describes which generated files should be kept current.

task

task({
  description: "Build the app",
  dependsOn: ["typecheck"],
  inputs: ["src/**/*.ts", "package.json"],
  outputs: ["dist/**"],
  cache: "file:local",
  retry: { attempts: 2, delayMs: 500 },
  timeout: "2m",
  requires: { tools: ["node", "pnpm"] },
  run: sh`pnpm build`
})

Task overloads:

task(config);
task(config, sh`pnpm test`);
task(config, [cache.use("file:local"), sh`pnpm test`]);

If config.run is set and a second argument is also passed, task throws ASYNC_PIPELINE_TASK_ARGUMENT_CONFLICT.

Fields:

Field Purpose
dependsOn Task ids that must run first. Use <source>:<task> for declared source tasks.
inputs Files or named input groups that affect cache keys. .git/, .async/, node_modules/, and this task’s declared outputs are ignored by input resolution.
outputs Files produced by the task. File cache snapshots and restores these files on a cache hit.
cache true, false, a cache ref such as "file:local", or cache options.
retry Total attempts as a number, or { attempts, delayMs }. retry: 2 means at most two attempts (one retry); retry: 1 or omitting it disables retries.
timeout Milliseconds or a duration string such as 500ms, 30s, 5m, 1h.
requires Tool, secret, or runtime declarations.
run One shell command/function step or an array of steps/directives.
steps Multiple shell commands, function steps, or static directives.

dependsOn is the author-facing dependency keyword.

Directive form is available for reusable stacks:

task({}, [
  dependsOn("build"),
  cache.use("file:local"),
  sh`pnpm test`
])

Normalization lifts directives into task metadata. Metadata readers inspect directives but never invoke user functions.

defineCache

const caches = defineCache({
  default: "file:local",
  stores: {
    memory: memoryCache(),
    file: fileCache({ root: ".async/cache/tasks" }),
    redis: redisCache({ url: { env: "REDIS_URL" } })
  }
});

Use the registry at pipeline level:

definePipeline({
  name: "app",
  cache: caches,
  tasks: {
    test: task({ cache: "file:local", run: sh`pnpm test` })
  },
  jobs: {
    verify: job({ target: "test" })
  }
});

Built-in runner support:

Store Behavior
file Persistent local task cache under .async/cache/tasks by default. Output-producing tasks store outputs.json and copied output files next to result.json.
memory Process-local task cache. Output-producing hits are honored only while the previously observed output files still exist.

Remote stores can be declared as runtime metadata, but @async/pipeline does not ship a Redis dependency.

Cache keys include direct dependency cache fingerprints, so changing a dependency invalidates its dependents without hashing every task’s inputs into every key. ttlMs is enforced for built-in stores; expired entries rerun.

source

source.path({
  path: "../admin",
  pipeline: "pipeline.ts",
  writable: true,
  prepare: [sh`pnpm install --frozen-lockfile`]
});

source.git({
  url: "https://github.com/acme/storefront.git",
  ref: "main",
  pipeline: "pipeline.ts",
  prepare: [
    sh`pnpm install --frozen-lockfile`,
    sh((ctx) => sh`pnpm add @acme/design-system@file:${ctx.candidate.dir}`)
  ]
});

Sources are explicit. @async/pipeline does not infer reverse dependencies from package manifests, lockfiles, npm metadata, or GitHub search.

Use namespaced refs from root tasks:

task({
  dependsOn: ["storefront:test", "admin:test-design-system"]
})

Path sources with prepare require writable: true in v1. Git sources use warm checkouts under .async/sources.

sh

task({
  run: sh`pnpm test`
})

sh creates a shell step. The host runner executes it from the task working directory.

Use deferred sh only when runtime context is needed:

sh((ctx) => sh`pnpm add @acme/design-system@file:${ctx.candidate.dir}`)

Deferred shell callbacks are metadata-safe. They are not evaluated when a pipeline is imported or read through metadata.

Function Steps

task({
  async run(ctx) {
    ctx.log(`running ${ctx.taskId}`);
    ctx.meta({ checked: true });
  }
})

Function steps receive:

Field Purpose
taskId Current task id.
runId Current execution id.
cwd Current task working directory. Root tasks use the root repo; source tasks use the source checkout.
env Process environment.
root.dir Root pipeline directory.
candidate Candidate repo context: dir, fingerprint, optional git facts.
source Source repo context for namespaced source tasks and prepare steps.
meta Add task metadata to the execution record.
log Append to the task log.
sh Create shell command values.

job

job({
  description: "Full verification",
  target: "build",
  trigger: ["push"],
  environment: {
    name: "npm-publish",
    url: "https://www.npmjs.com/package/@async/pipeline"
  },
  requires: {
    provenance: true
  },
  env: {
    NODE_AUTH_TOKEN: env.secret("NPM_TOKEN"),
    API_URL: env.var("NODE_ENV", {
      prod: "https://api.example.com",
      dev: "http://localhost:3000"
    }, {
      default: "dev"
    })
  }
})

Fields:

Field Purpose
target Task id or task ids used as the job entrypoint.
trigger Trigger ids attached to the job.
environment Optional deployment/environment metadata, either a string name or { name, url }. GitHub lowers this to job environment.
requires Optional portable job requirements. requires.provenance lowers to GitHub id-token: write.
env Job runtime environment. Overrides pipeline env by key.
github GitHub-specific escape hatch for platform fields not covered by portable metadata.
mode Optional manual or ci mode.
env Runtime environment for this job. Job env overrides pipeline env by key.
github Optional generated GitHub Actions job config for platform environment and permissions.

github.permissions accepts contents, idToken, issues, packages, and pullRequests (pullRequests renders as pull-requests). When a job grants any permission, the generator restates contents: read unless you set contents yourself, because job-level permissions replace the workflow defaults. Unknown permission fields fail with ASYNC_PIPELINE_UNKNOWN_FIELD.

See env for local, GitHub Actions, and test behavior.

trigger

trigger.manual();
trigger.github({ events: ["push", "pull_request"], branches: ["main"] });
trigger.cron("0 9 * * 1");
trigger.schedule("0 9 * * 1"); // compatibility alias

Triggers are declarations. Use async-pipeline github generate to render them into committed GitHub Actions YAML. GitHub cannot start a cron or push workflow from TypeScript alone.

sync

definePipeline({
  name: "app",
  sync: {
    github: true,
    tasks: true
  },
  tasks: {},
  jobs: {}
});

sync.github: true uses the default generated paths:

.github/workflows/async-pipeline.yml
.github/async-pipeline.lock.json

Use object form to render elsewhere or tune the generated workflow:

sync: {
  github: {
    workflow: ".tmp/async-pipeline.yml",
    lock: ".tmp/async-pipeline.lock.json",
    nodeVersion: 24,
    cache: true
  }
}

nodeVersion selects the Node version installed by the generated workflow (default 24). cache: true (the default) adds a pinned actions/cache step that restores .async/cache across CI runs so warm tasks resolve as cached; set cache: false to keep CI cold.

sync: {
  github: {
    workflow: ".tmp/async-pipeline.yml",
    lock: ".tmp/async-pipeline.lock.json"
  }
}

sync.tasks: true syncs all jobs, not raw tasks, into the root package-manager manifest with the pipeline prefix. Package manifests receive scripts; Deno manifests receive tasks.

{
  "scripts": {
    "pipeline:verify": "async-pipeline run verify"
  }
}

Use object form for explicit targets:

sync: {
  tasks: {
    prefix: "pipeline",
    runners: ["package"],
    targets: [
      { package: "@acme/app" },
      { path: "tools/worker/deno.json" }
    ],
    jobs: ["verify"],
    tasks: ["typecheck"],
    scripts: {
      "sync:check": "sync check"
    }
  }
}

Package targets match package.json#name. Path targets must point at package.json, deno.json, or deno.jsonc. Raw task sync is opt-in and generates names like pipeline:task:typecheck.

Task sync writes .async-pipeline/tasks.lock.json. The lock records the generator version, config path, prefix, runners, targets, resolved manifest paths, generated command names and values, and a rendered hash. Existing unmanaged scripts or Deno tasks are never overwritten; conflicts throw ASYNC_PIPELINE_SYNC_CONFLICT.

GitHub Commands

async-pipeline sync list
async-pipeline sync generate
async-pipeline sync check
async-pipeline sync github list
async-pipeline sync github generate [--workflow <path>] [--lock <path>]
async-pipeline sync github check [--workflow <path>] [--lock <path>]
async-pipeline sync tasks list
async-pipeline sync tasks generate
async-pipeline sync tasks check
async-pipeline github generate [--workflow <path>] [--lock <path>]
async-pipeline github check [--workflow <path>] [--lock <path>]
async-pipeline github run [--job <id>] [--concurrency <n>]
async-pipeline cache clear
async-pipeline gc [--keep <n>] [--cache-days <n>]

github generate and github check are compatibility aliases for the GitHub sync implementation.

github generate writes .github/workflows/async-pipeline.yml and .github/async-pipeline.lock.json unless paths are overridden.

github check fails when generated files are stale.

github run reads the GitHub event context and runs matching jobs. On workflow_dispatch only jobs with a manual trigger run implicitly; select others explicitly with --job <id>. Pass --concurrency <n> to bound parallel ready-task execution. run --format json emits the execution record; cache clear resets the task cache; gc prunes run records and cache entries unused for --cache-days days, and runs auto-prune to ASYNC_PIPELINE_KEEP_RUNS (default 50, 0 disables). In-memory task output buffers cap at ASYNC_PIPELINE_MAX_LOG_BYTES (default 8 MiB per stream, 0 = unlimited); stored logs keep the tail with a truncation marker.

Runtime Subpath

The runtime API is additive and advanced. It is for embeddable workflows, not the primary pipeline.ts MVP path:

import { branch, cache, compose, createRuntime, defineRuntime, parallel, task } from "@async/pipeline/runtime";

const work = defineRuntime([
  task({ id: "verify" }, compose(
    async (ctx, next) => {
      ctx.state.started = true;
      return next();
    },
    [
      async (_ctx, next) => next(),
      async (_ctx, next) => next()
    ],
    parallel([
      async () => "typecheck",
      async () => "test"
    ]),
    branch(
      (ctx) => Boolean(ctx.input),
      async () => "with-input",
      async () => "without-input"
    )
  )),
  task({ id: "sync", dependsOn: ["verify"] }, [
    cache.use("memory:session"),
    async (ctx, next) => {
      ctx.state.synced = true;
      return next();
    }
  ])
]);

const runtime = createRuntime(work);
const result = await runtime.run();
await runtime.start();
await runtime.stop();

compose(...) is the low-level runtime primitive: functions receive (ctx, next), nested arrays are sequential groups, and parallel(items) or parallel(options, items) is explicit fan-out. task(...) is the opinionated runtime boundary for ids, dependencies, cache directives, inspection, and structured error results.

runJob

runJob(...) executes one job from a normalized pipeline. The job id is id; target stays inside the job definition and points at the requested task graph endpoint.

import { runJob } from "@async/pipeline/node";
import pipeline from "./pipeline.js";

const record = await runJob(pipeline, {
  id: "verify",
  concurrency: 2,
  cwd: process.cwd(),
  env: process.env
});

cwd defaults to process.cwd() and env to process.env. Pass sandbox to run inside a declared or inline sandbox.

interface RunOptions {
  id: string;
  mode?: "manual" | "ci";
  concurrency?: number;
  force?: boolean;
  echo?: boolean;
  cwd?: string;
  env?: NodeJS.ProcessEnv;
  commands?: PipelineCommands;
  executor?: CommandExecutor;
  sandbox?: SandboxId | SandboxDefinition;
}

interface CommandExecutor {
  name: string;
  runShell(command: string, options: {
    cwd: string;
    env: NodeJS.ProcessEnv;
    task: NormalizedTask;
    timeoutMs?: number;
  }): Promise<CommandResult>;
  checkTool?(tool: string): Promise<boolean>;
}

When omitted, concurrency uses a bounded local default. On the first task failure, the scheduler stops starting new tasks and lets already-running tasks finish before writing the final failed run record.

The host workspace uses the real filesystem and shell. Tests can provide a custom env, CommandExecutor, or command policy without touching global process state.

sandboxes

Declare inspectable sandbox profiles in definePipeline(...) for opt-in isolated local runs. The default is always the host; a sandbox only applies when selected:

import { definePipeline, sandbox } from "@async/pipeline";

export default definePipeline({
  name: "app",
  sandboxes: {
    lima: sandbox.lima({ vm: "async-pipeline" }),
    docker: sandbox.docker({ image: "node:24" })
  },
  tasks: {},
  jobs: {}
});

Run a job inside a selected sandbox:

async-pipeline run verify --sandbox docker
async-pipeline run verify --sandbox lima

Programmatic runs select sandboxes the same way: by id from the pipeline’s sandboxes, or with an inline definition.

import { runJob, sandbox } from "@async/pipeline";

await runJob(pipeline, { id: "verify", sandbox: "docker" });

await runJob(pipeline, {
  id: "verify",
  sandbox: sandbox.docker({
    image: "node:24",
    cwd: process.cwd()
  })
});

await runJob(pipeline, {
  id: "verify",
  sandbox: sandbox.lima({ vm: "async-pipeline" }),
  cwd: process.cwd()
});

command policy

commands governs CLI/tool/agent command boundaries. It is separate from task shell execution, which still uses the run’s command executor.

import { command, definePipeline } from "@async/pipeline";

export default definePipeline({
  name: "app",
  commands: command.policy({
    rules: [
      command.rule({
        prefix: ["npm", "publish"],
        action: command.deny()
      }),
      command.rule({
        exact: ["async-pipeline", "github", "check"],
        action: command.mock({
          code: 0,
          stdout: "GitHub workflow is current.\n"
        })
      })
    ],
    fallback: command.allow(),
    record: true,
    output: {
      maxBytes: 20_000,
      redactSecrets: true
    }
  }),
  tasks: {},
  jobs: {}
});

Use runPipelineCli(...) to exercise the CLI without spawning a subprocess:

import { runPipelineCli } from "@async/pipeline";

const result = await runPipelineCli({
  args: ["github", "check"],
  cwd: process.cwd()
});

Rules only affect matching commands. Unmatched commands use fallback, and fallback defaults to command.allow().

Execution Record Shape

Runs are written to:

.async/runs/<run-id>/execution.json

The record includes:

interface ExecutionRecord {
  id: string;
  pipelineName: string;
  jobId: string;
  cwd: string;
  startedAt: string;
  finishedAt?: string;
  status: "running" | "passed" | "failed";
  mode: "manual" | "ci";
  tasks: TaskResult[];
  sources?: Record<string, ExecutionSourceRecord>;
}

Task results include status, attempts, cache key, cache hit, timings, error, and metadata.

Metadata

Read metadata without running anything:

async-pipeline metadata --format json
async-pipeline metadata --format json --include-sources

Metadata reads do not clone sources, run source prepare, execute tasks, or evaluate deferred shell callbacks. --include-sources only loads source pipeline metadata from already-available path sources or previously synced git checkouts.

Run Lock

run and run-task hold .async/run.lock for the duration of a run. A second run in the same project fails fast with ASYNC_PIPELINE_RUN_ACTIVE instead of racing the task cache and run records. A lock whose holder process is dead is reclaimed automatically, so crashed runs never require manual cleanup.

Execution Record Schema

Execution records (.async/runs/<run-id>/execution.json) and stored cache results carry schemaVersion (currently 1), and records include the owning pid so doctor can tell a crashed run from a live one. Consumers should ignore unknown fields; schemaVersion increments only on breaking shape changes.

Exit Codes

Code Meaning
0 Run passed or command succeeded.
1 Run failed, configuration error, or unexpected error.
130 Interrupted by SIGINT (Ctrl-C); tasks were terminated and the execution record finalized.
141 CLI output pipe closed (EPIPE, e.g. piping into head); tasks were terminated and the record finalized.
143 Terminated by SIGTERM; same shutdown path as SIGINT.

Task-level timeouts surface as command exit code 124 inside the task result; the run itself exits 1.

Environment Variables

Variable Effect
ASYNC_PIPELINE_KEEP_RUNS Run-record auto-prune limit applied after each run (default 50, 0 disables).
ASYNC_PIPELINE_MAX_LOG_BYTES Per-stream task output buffer cap in bytes (default 8 MiB, 0 = unlimited, minimum 4096).
ASYNC_PIPELINE_ENVIRONMENT Environment name checked by command.requireEnvironment(...).
CI When set, runs record mode: "ci".