A3S Docs
A3S Code

Multi-Machine Distribution

Horizontal scaling with external task distribution across multiple workers

Multi-Machine Distribution

A3S Code supports external task distribution for horizontal scaling. Offload tool execution to external workers running on multiple machines, containers, or cloud regions.

Key Features: 3 execution modes | 4 priority lanes | Multi-language workers | Built-in timeout handling

Overview

External task distribution enables:

  • Horizontal scaling — Add workers to increase throughput
  • Resource isolation — Separate heavy computation from main process
  • Custom environments — Run tools in containers, VMs, or specialized hardware
  • Multi-region deployment — Execute tasks in different geographic regions
  • Language-agnostic workers — Implement workers in Rust, Python, TypeScript, or any language

Architecture

┌─────────────────────────────────────────────────────────────┐
│                        Agent Session                         │
│  ┌────────────────────────────────────────────────────────┐ │
│  │              Session Lane Queue (a3s-lane)             │ │
│  │  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌────────┐│ │
│  │  │ Control  │  │  Query   │  │ Execute  │  │Generate││ │
│  │  │   (P0)   │  │   (P1)   │  │   (P2)   │  │  (P3)  ││ │
│  │  └────┬─────┘  └────┬─────┘  └────┬─────┘  └───┬────┘│ │
│  └───────┼─────────────┼─────────────┼─────────────┼─────┘ │
└──────────┼─────────────┼─────────────┼─────────────┼───────┘
           │             │             │             │
           │      ┌──────┴──────┐      │             │
           │      │   External  │      │             │
           │      │   Workers   │      │             │
           │      └─────────────┘      │             │
           │             │             │             │
           ▼             ▼             ▼             ▼
    ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐
    │ Worker 1 │  │ Worker 2 │  │ Worker 3 │  │ Worker N │
    │ (Local)  │  │ (Remote) │  │(Container)│  │ (Cloud) │
    └──────────┘  └──────────┘  └──────────┘  └──────────┘

Task Handler Modes

A3S Code supports 3 task handler modes per lane:

Prop

Type

Session Lanes

Tasks are routed to 4 priority lanes:

Prop

Type

Query lane is the best candidate for external distribution — read operations are naturally parallelizable and safe.

Complete Example

Step 1: Configure Session

use a3s_code_core::{Agent, SessionOptions, SessionQueueConfig};
use a3s_code_core::queue::{SessionLane, LaneHandlerConfig, TaskHandlerMode};

let agent = Agent::new("agent.hcl").await?;

// Configure queue with high concurrency
let queue_config = SessionQueueConfig {
    query_max_concurrency: 20,
    execute_max_concurrency: 5,
    enable_metrics: true,
    enable_dlq: true,
    ..Default::default()
};

let session = agent.session(".", Some(
    SessionOptions::new()
        .with_queue_config(queue_config)
))?;

// Configure Query lane for external processing
session.set_lane_handler(SessionLane::Query, LaneHandlerConfig {
    mode: TaskHandlerMode::External,
    timeout_ms: 120_000,  // 2 minutes
}).await;
from a3s_code import Agent, SessionOptions, SessionQueueConfig
from a3s_code import SessionLane, LaneHandlerConfig, TaskHandlerMode

agent = Agent.create("agent.hcl")

# Configure queue
queue_config = SessionQueueConfig(
    query_max_concurrency=20,
    execute_max_concurrency=5,
    enable_metrics=True,
    enable_dlq=True
)

session = agent.session(".", SessionOptions(
    queue_config=queue_config
))

# Configure Query lane for external processing
session.set_lane_handler(
    SessionLane.QUERY,
    LaneHandlerConfig(
        mode=TaskHandlerMode.EXTERNAL,
        timeout_ms=120_000
    )
)
import { Agent, SessionOptions, SessionQueueConfig } from '@a3s-lab/code';
import { SessionLane, LaneHandlerConfig, TaskHandlerMode } from '@a3s-lab/code';

const agent = await Agent.create('agent.hcl');

const session = agent.session('.', {
  queueConfig: {
    queryMaxConcurrency: 20,
    executeMaxConcurrency: 5,
    enableMetrics: true,
    enableDlq: true
  }
});

await session.setLaneHandler(SessionLane.Query, {
  mode: TaskHandlerMode.External,
  timeoutMs: 120_000
});

Step 2: Implement Worker

use a3s_code_core::{AgentSession, ExternalTask, ExternalTaskResult};
use tokio::time::{sleep, Duration};

async fn worker_loop(session: Arc<AgentSession>) -> Result<()> {
    loop {
        // Poll for pending tasks
        let tasks = session.pending_external_tasks().await;

        if tasks.is_empty() {
            sleep(Duration::from_millis(100)).await;
            continue;
        }

        // Process tasks in parallel
        let handles: Vec<_> = tasks.into_iter()
            .map(|task| {
                let session = session.clone();
                tokio::spawn(async move {
                    let result = execute_task(&task).await;
                    session.complete_external_task(&task.task_id, result).await
                })
            })
            .collect();

        for handle in handles {
            let _ = handle.await;
        }
    }
}

async fn execute_task(task: &ExternalTask) -> ExternalTaskResult {
    match task.command_type.as_str() {
        "read" => {
            let path = task.payload["path"].as_str().unwrap();
            match tokio::fs::read_to_string(path).await {
                Ok(content) => ExternalTaskResult {
                    success: true,
                    result: serde_json::json!({ "content": content }),
                    error: None,
                },
                Err(e) => ExternalTaskResult {
                    success: false,
                    result: serde_json::json!({}),
                    error: Some(e.to_string()),
                },
            }
        }
        "bash" => {
            let command = task.payload["command"].as_str().unwrap();
            match tokio::process::Command::new("sh")
                .arg("-c")
                .arg(command)
                .output()
                .await
            {
                Ok(output) => ExternalTaskResult {
                    success: output.status.success(),
                    result: serde_json::json!({
                        "stdout": String::from_utf8_lossy(&output.stdout),
                        "stderr": String::from_utf8_lossy(&output.stderr),
                        "exit_code": output.status.code()
                    }),
                    error: None,
                },
                Err(e) => ExternalTaskResult {
                    success: false,
                    result: serde_json::json!({}),
                    error: Some(e.to_string()),
                },
            }
        }
        _ => ExternalTaskResult {
            success: false,
            result: serde_json::json!({}),
            error: Some(format!("Unknown command type: {}", task.command_type)),
        },
    }
}
import asyncio
from a3s_code import AgentSession, ExternalTaskResult

async def worker_loop(session: AgentSession):
    """Worker loop - can run on multiple machines"""
    while True:
        # Poll for pending tasks
        tasks = await session.pending_external_tasks()

        if not tasks:
            await asyncio.sleep(0.1)
            continue

        # Process tasks in parallel
        await asyncio.gather(*[
            process_task(session, task) for task in tasks
        ])

async def process_task(session: AgentSession, task):
    """Process a single task"""
    result = await execute_task(task)
    await session.complete_external_task(task.task_id, result)

async def execute_task(task):
    """Execute task based on command type"""
    if task.command_type == "read":
        path = task.payload["path"]
        try:
            with open(path, 'r') as f:
                content = f.read()
            return ExternalTaskResult(
                success=True,
                result={"content": content},
                error=None
            )
        except Exception as e:
            return ExternalTaskResult(
                success=False,
                result={},
                error=str(e)
            )

    elif task.command_type == "bash":
        command = task.payload["command"]
        try:
            proc = await asyncio.create_subprocess_shell(
                command,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE
            )
            stdout, stderr = await proc.communicate()
            return ExternalTaskResult(
                success=proc.returncode == 0,
                result={
                    "stdout": stdout.decode(),
                    "stderr": stderr.decode(),
                    "exit_code": proc.returncode
                },
                error=None
            )
        except Exception as e:
            return ExternalTaskResult(
                success=False,
                result={},
                error=str(e)
            )

    else:
        return ExternalTaskResult(
            success=False,
            result={},
            error=f"Unknown command type: {task.command_type}"
        )
import { AgentSession, ExternalTask, ExternalTaskResult } from '@a3s-lab/code';
import { exec } from 'child_process';
import { promisify } from 'util';
import { readFile } from 'fs/promises';

const execAsync = promisify(exec);

async function workerLoop(session: AgentSession): Promise<void> {
  while (true) {
    // Poll for pending tasks
    const tasks = await session.pendingExternalTasks();

    if (tasks.length === 0) {
      await new Promise(resolve => setTimeout(resolve, 100));
      continue;
    }

    // Process tasks in parallel
    await Promise.all(tasks.map(task => processTask(session, task)));
  }
}

async function processTask(session: AgentSession, task: ExternalTask): Promise<void> {
  const result = await executeTask(task);
  await session.completeExternalTask(task.taskId, result);
}

async function executeTask(task: ExternalTask): Promise<ExternalTaskResult> {
  switch (task.commandType) {
    case 'read': {
      const path = task.payload.path;
      try {
        const content = await readFile(path, 'utf-8');
        return {
          success: true,
          result: { content },
          error: null
        };
      } catch (e) {
        return {
          success: false,
          result: {},
          error: e.message
        };
      }
    }

    case 'bash': {
      const command = task.payload.command;
      try {
        const { stdout, stderr } = await execAsync(command);
        return {
          success: true,
          result: { stdout, stderr, exitCode: 0 },
          error: null
        };
      } catch (e) {
        return {
          success: false,
          result: { stdout: e.stdout, stderr: e.stderr, exitCode: e.code },
          error: e.message
        };
      }
    }

    default:
      return {
        success: false,
        result: {},
        error: `Unknown command type: ${task.commandType}`
      };
  }
}

Step 3: Deploy Workers

# Worker 1 (local machine)
./worker --session-id abc123

# Worker 2 (remote server)
ssh user@remote "cd /app && ./worker --session-id abc123"

# Worker 3 (container)
docker run -e SESSION_ID=abc123 myapp/worker

# Worker 4 (cloud)
kubectl run worker --image=myapp/worker --env="SESSION_ID=abc123"

Use Cases

// Configure Query lane for external processing
session.set_lane_handler(SessionLane::Query, LaneHandlerConfig {
    mode: TaskHandlerMode::External,
    timeout_ms: 120_000,
}).await;

// Agent sends message
let result = session.send("Search for all authentication functions").await?;

// Behind the scenes:
// - Agent generates multiple grep/read tasks
// - Tasks are distributed to workers
// - Workers execute in parallel across machines
// - Results are aggregated and returned

2. Distributed Test Execution

// Configure Execute lane for external processing
session.set_lane_handler(SessionLane::Execute, LaneHandlerConfig {
    mode: TaskHandlerMode::External,
    timeout_ms: 300_000,  // 5 minutes
}).await;

// Agent sends message
let result = session.send("Run all tests in parallel").await?;

// Behind the scenes:
// - Agent generates bash tasks for each test suite
// - Tasks are distributed to workers
// - Workers run tests in parallel
// - Results are collected and reported

3. Multi-Region File Processing

// Workers in different regions
// Worker 1: US-East
// Worker 2: EU-West
// Worker 3: Asia-Pacific

// Each worker polls for tasks and executes locally
// Reduces latency for region-specific file operations

4. Hybrid Mode for Monitoring

// Execute internally but notify external monitoring system
session.set_lane_handler(SessionLane::Execute, LaneHandlerConfig {
    mode: TaskHandlerMode::Hybrid,
    timeout_ms: 60_000,
}).await;

// External system receives notifications for:
// - Task started
// - Task completed
// - Task failed
// Can be used for logging, metrics, alerting

Best Practices

Choose the Right Lane

  • Query lane — Best for parallelizable read operations (grep, read, glob, ls)
  • Execute lane — For write operations and bash commands
  • Control lane — Keep internal for control operations
  • Generate lane — Keep internal for LLM calls (unless you have custom LLM infrastructure)

Set Appropriate Timeouts

// Short timeout for fast operations
LaneHandlerConfig {
    mode: TaskHandlerMode::External,
    timeout_ms: 30_000,  // 30 seconds
}

// Long timeout for heavy operations
LaneHandlerConfig {
    mode: TaskHandlerMode::External,
    timeout_ms: 600_000,  // 10 minutes
}

Handle Timeouts Gracefully

async fn execute_task(task: &ExternalTask) -> ExternalTaskResult {
    // Check if task is already timed out
    if task.is_timed_out() {
        return ExternalTaskResult {
            success: false,
            result: serde_json::json!({}),
            error: Some("Task timed out before execution".to_string()),
        };
    }

    // Execute with remaining time
    let remaining = task.remaining_ms();
    // ... execute with timeout
}

Monitor Queue Metrics

// Get queue statistics
let stats = session.queue_stats().await;
println!("Pending: {}, Active: {}, External: {}",
    stats.pending_count,
    stats.active_count,
    stats.external_count
);

// Get detailed metrics
if let Some(metrics) = session.queue_metrics().await {
    println!("Total processed: {}", metrics.total_processed);
    println!("Success rate: {:.2}%", metrics.success_rate * 100.0);
}

Implement Retry Logic

async fn execute_task_with_retry(task: &ExternalTask, max_retries: u32) -> ExternalTaskResult {
    for attempt in 0..max_retries {
        let result = execute_task(task).await;
        if result.success {
            return result;
        }

        if attempt < max_retries - 1 {
            tokio::time::sleep(Duration::from_secs(2_u64.pow(attempt))).await;
        }
    }

    ExternalTaskResult {
        success: false,
        result: serde_json::json!({}),
        error: Some(format!("Failed after {} retries", max_retries)),
    }
}

Performance Considerations

Concurrency Limits

let queue_config = SessionQueueConfig {
    query_max_concurrency: 20,      // High for read operations
    execute_max_concurrency: 5,     // Lower for write operations
    ..Default::default()
};

Worker Scaling

  • Start with 1 worker per machine
  • Monitor queue depth and task latency
  • Add workers if queue depth grows
  • Each worker can process multiple tasks concurrently

Network Latency

  • External mode adds network round-trip latency
  • Use for tasks that take > 100ms to execute
  • For very fast operations (< 10ms), internal mode is faster

Monitoring and Debugging

Enable Metrics

let queue_config = SessionQueueConfig {
    enable_metrics: true,
    enable_alerts: true,
    ..Default::default()
};

Check Queue Stats

let stats = session.queue_stats().await;
println!("Queue Stats:");
println!("  Pending: {}", stats.pending_count);
println!("  Active: {}", stats.active_count);
println!("  External: {}", stats.external_count);
println!("  Per lane: {:?}", stats.per_lane);

Monitor Task Latency

if let Some(metrics) = session.queue_metrics().await {
    println!("Average latency: {:?}", metrics.avg_latency);
    println!("P95 latency: {:?}", metrics.p95_latency);
    println!("P99 latency: {:?}", metrics.p99_latency);
}

Summary

A3S Code's external task distribution enables:

  • ✅ Multi-machine parallelization — Distribute tasks across multiple workers
  • ✅ Flexible execution modes — Internal, External, or Hybrid per lane
  • ✅ Priority-based scheduling — 4 lanes with configurable priorities
  • ✅ Built-in timeout handling — Automatic timeout detection and cleanup
  • ✅ Metrics and monitoring — Track queue depth, latency, success rate
  • ✅ Language-agnostic workers — Implement workers in Rust, Python, TypeScript, or any language

This makes A3S Code suitable for:

  • Large-scale code analysis across multiple repositories
  • Distributed test execution
  • Multi-region deployments
  • Custom execution environments (containers, VMs, specialized hardware)
  • Horizontal scaling of tool execution
  • Architecture — System design and components
  • Lane Queue — Priority routing and task distribution
  • Tasks — Planning and parallel execution

API Reference

LaneHandlerConfig

Prop

Type

ExternalTask fields

Prop

Type

ExternalTaskResult fields

Prop

Type

On this page