A3S Docs
A3S Flow

SDK Reference

Complete API reference for Rust, Python, and TypeScript SDKs

SDK Reference

A3S Flow provides SDKs for Rust, Python, and TypeScript with identical APIs. All SDKs wrap the same Rust runtime and provide the same capabilities.

Prop

Type


Rust SDK

Installation

[dependencies]
a3s-flow = "0.3"
tokio = { version = "1", features = ["full"] }
serde_json = "1"

FlowEngine

Entry point for creating and managing workflow executions.

use a3s_flow::{FlowEngine, NodeRegistry};

// Create engine with default nodes
let engine = FlowEngine::new(NodeRegistry::with_defaults());

// Create engine with custom registry
let mut registry = NodeRegistry::new();
registry.register(Arc::new(MyCustomNode));
let engine = FlowEngine::new(registry);

Methods

// List registered node types
fn node_types(&self) -> Vec<String>

// Start workflow execution
async fn start(
    &self,
    definition: &Value,
    variables: HashMap<String, Value>
) -> Result<Uuid>

// Pause execution at next wave boundary
async fn pause(&self, id: Uuid) -> Result<()>

// Resume paused execution
async fn resume(&self, id: Uuid) -> Result<()>

// Terminate execution
async fn terminate(&self, id: Uuid) -> Result<()>

// Query execution state
async fn state(&self, id: Uuid) -> Result<ExecutionState>

// Subscribe to execution events
async fn subscribe(&self, id: Uuid) -> Result<Receiver<FlowEvent>>

ExecutionState

pub enum ExecutionState {
    Running,
    Paused,
    Completed(FlowResult),
    Failed(String),
    Terminated,
}

impl ExecutionState {
    pub fn as_str(&self) -> &str;
    pub fn is_terminal(&self) -> bool;
}

FlowResult

pub struct FlowResult {
    pub execution_id: Uuid,
    pub outputs: HashMap<String, Value>,
    pub completed_nodes: HashSet<String>,
    pub skipped_nodes: HashSet<String>,
    pub context: HashMap<String, Value>,
}

FlowEvent

pub enum FlowEvent {
    NodeStarted { node_id: String },
    NodeCompleted { node_id: String, output: Value },
    NodeSkipped { node_id: String },
    NodeFailed { node_id: String, error: String },
    WaveCompleted { wave_index: usize, node_count: usize },
    ExecutionPaused,
    ExecutionResumed,
    ExecutionCompleted { result: FlowResult },
    ExecutionFailed { error: String },
    ExecutionTerminated,
}

Python SDK

Installation

pip install a3s-flow

FlowEngine

from a3s_flow import FlowEngine

# Create engine
engine = FlowEngine()

# Start workflow
execution_id = await engine.start(definition, variables)

# Pause execution
await engine.pause(execution_id)

# Resume execution
await engine.resume(execution_id)

# Terminate execution
await engine.terminate(execution_id)

# Query state
state = await engine.state(execution_id)

# Subscribe to events
async for event in engine.subscribe(execution_id):
    print(f"Event: {event}")

State Object

{
    "status": "running" | "paused" | "completed" | "failed" | "terminated",
    "result": {  # Only present when status == "completed"
        "execution_id": str,
        "outputs": dict,
        "completed_nodes": list[str],
        "skipped_nodes": list[str],
        "context": dict
    },
    "error": str  # Only present when status == "failed"
}

Event Object

{
    "type": "node_started" | "node_completed" | "node_skipped" | "node_failed" |
            "wave_completed" | "execution_paused" | "execution_resumed" |
            "execution_completed" | "execution_failed" | "execution_terminated",
    "node_id": str,  # For node events
    "output": any,   # For node_completed
    "error": str,    # For node_failed / execution_failed
    "wave_index": int,  # For wave_completed
    "node_count": int,  # For wave_completed
    "result": dict   # For execution_completed
}

TypeScript SDK

Installation

npm install @a3s-lab/flow

FlowEngine

import { FlowEngine, ExecutionState, FlowEvent } from '@a3s-lab/flow';

// Create engine
const engine = new FlowEngine();

// Start workflow
const executionId = await engine.start(definition, variables);

// Pause execution
await engine.pause(executionId);

// Resume execution
await engine.resume(executionId);

// Terminate execution
await engine.terminate(executionId);

// Query state
const state = await engine.state(executionId);

// Subscribe to events
for await (const event of engine.subscribe(executionId)) {
  console.log('Event:', event);
}

ExecutionState

type ExecutionState =
  | { status: 'running' }
  | { status: 'paused' }
  | { status: 'completed'; result: FlowResult }
  | { status: 'failed'; error: string }
  | { status: 'terminated' };

interface FlowResult {
  executionId: string;
  outputs: Record<string, any>;
  completedNodes: string[];
  skippedNodes: string[];
  context: Record<string, any>;
}

FlowEvent

type FlowEvent =
  | { type: 'node_started'; nodeId: string }
  | { type: 'node_completed'; nodeId: string; output: any }
  | { type: 'node_skipped'; nodeId: string }
  | { type: 'node_failed'; nodeId: string; error: string }
  | { type: 'wave_completed'; waveIndex: number; nodeCount: number }
  | { type: 'execution_paused' }
  | { type: 'execution_resumed' }
  | { type: 'execution_completed'; result: FlowResult }
  | { type: 'execution_failed'; error: string }
  | { type: 'execution_terminated' };

Workflow Definition

All SDKs accept the same JSON workflow definition format:

interface WorkflowDefinition {
  nodes: Node[];
  edges: Edge[];
}

interface Node {
  id: string;
  type: string;
  data?: Record<string, any>;
}

interface Edge {
  source: string;
  target: string;
}

Conditional Execution

Add run_if inside data to conditionally execute a node:

{
  "id": "notify",
  "type": "http-request",
  "data": {
    "url": "https://hooks.example.com/success",
    "method": "POST",
    "run_if": {
      "from": "check",
      "path": "branch",
      "op": "eq",
      "value": "success"
    }
  }
}

Prop

Type


Error Handling

Rust

match engine.start(&definition, variables).await {
    Ok(id) => println!("Started: {}", id),
    Err(FlowError::InvalidDefinition(msg)) => eprintln!("Invalid workflow: {}", msg),
    Err(FlowError::CyclicDependency) => eprintln!("Workflow contains cycles"),
    Err(e) => eprintln!("Error: {}", e),
}

Python

try:
    execution_id = await engine.start(definition, variables)
except ValueError as e:
    print(f"Invalid workflow: {e}")
except RuntimeError as e:
    print(f"Execution error: {e}")

TypeScript

try {
  const executionId = await engine.start(definition, variables);
} catch (error) {
  if (error.message.includes('cycle')) {
    console.error('Workflow contains cycles');
  } else {
    console.error('Error:', error.message);
  }
}

On this page