A3S Code
Multi-Machine Distribution
Horizontal scaling with external task distribution across multiple workers
Multi-Machine Distribution
A3S Code supports external task distribution for horizontal scaling. Offload tool execution to external workers running on multiple machines, containers, or cloud regions.
Key Features: 3 execution modes | 4 priority lanes | Multi-language workers | Built-in timeout handling
Overview
External task distribution enables:
- Horizontal scaling — Add workers to increase throughput
- Resource isolation — Separate heavy computation from main process
- Custom environments — Run tools in containers, VMs, or specialized hardware
- Multi-region deployment — Execute tasks in different geographic regions
- Language-agnostic workers — Implement workers in Rust, Python, TypeScript, or any language
Architecture
┌─────────────────────────────────────────────────────────────┐
│ Agent Session │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ Session Lane Queue (a3s-lane) │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────┐│ │
│ │ │ Control │ │ Query │ │ Execute │ │Generate││ │
│ │ │ (P0) │ │ (P1) │ │ (P2) │ │ (P3) ││ │
│ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ └───┬────┘│ │
│ └───────┼─────────────┼─────────────┼─────────────┼─────┘ │
└──────────┼─────────────┼─────────────┼─────────────┼───────┘
│ │ │ │
│ ┌──────┴──────┐ │ │
│ │ External │ │ │
│ │ Workers │ │ │
│ └─────────────┘ │ │
│ │ │ │
▼ ▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ Worker 1 │ │ Worker 2 │ │ Worker 3 │ │ Worker N │
│ (Local) │ │ (Remote) │ │(Container)│ │ (Cloud) │
└──────────┘ └──────────┘ └──────────┘ └──────────┘Task Handler Modes
A3S Code supports 3 task handler modes per lane:
Prop
Type
Session Lanes
Tasks are routed to 4 priority lanes:
Prop
Type
Query lane is the best candidate for external distribution — read operations are naturally parallelizable and safe.
Complete Example
Step 1: Configure Session
use a3s_code_core::{Agent, SessionOptions, SessionQueueConfig};
use a3s_code_core::queue::{SessionLane, LaneHandlerConfig, TaskHandlerMode};
let agent = Agent::new("agent.hcl").await?;
// Configure queue with high concurrency
let queue_config = SessionQueueConfig {
query_max_concurrency: 20,
execute_max_concurrency: 5,
enable_metrics: true,
enable_dlq: true,
..Default::default()
};
let session = agent.session(".", Some(
SessionOptions::new()
.with_queue_config(queue_config)
))?;
// Configure Query lane for external processing
session.set_lane_handler(SessionLane::Query, LaneHandlerConfig {
mode: TaskHandlerMode::External,
timeout_ms: 120_000, // 2 minutes
}).await;from a3s_code import Agent, SessionOptions, SessionQueueConfig
from a3s_code import SessionLane, LaneHandlerConfig, TaskHandlerMode
agent = Agent.create("agent.hcl")
# Configure queue
queue_config = SessionQueueConfig(
query_max_concurrency=20,
execute_max_concurrency=5,
enable_metrics=True,
enable_dlq=True
)
session = agent.session(".", SessionOptions(
queue_config=queue_config
))
# Configure Query lane for external processing
session.set_lane_handler(
SessionLane.QUERY,
LaneHandlerConfig(
mode=TaskHandlerMode.EXTERNAL,
timeout_ms=120_000
)
)import { Agent, SessionOptions, SessionQueueConfig } from '@a3s-lab/code';
import { SessionLane, LaneHandlerConfig, TaskHandlerMode } from '@a3s-lab/code';
const agent = await Agent.create('agent.hcl');
const session = agent.session('.', {
queueConfig: {
queryMaxConcurrency: 20,
executeMaxConcurrency: 5,
enableMetrics: true,
enableDlq: true
}
});
await session.setLaneHandler(SessionLane.Query, {
mode: TaskHandlerMode.External,
timeoutMs: 120_000
});Step 2: Implement Worker
use a3s_code_core::{AgentSession, ExternalTask, ExternalTaskResult};
use tokio::time::{sleep, Duration};
async fn worker_loop(session: Arc<AgentSession>) -> Result<()> {
loop {
// Poll for pending tasks
let tasks = session.pending_external_tasks().await;
if tasks.is_empty() {
sleep(Duration::from_millis(100)).await;
continue;
}
// Process tasks in parallel
let handles: Vec<_> = tasks.into_iter()
.map(|task| {
let session = session.clone();
tokio::spawn(async move {
let result = execute_task(&task).await;
session.complete_external_task(&task.task_id, result).await
})
})
.collect();
for handle in handles {
let _ = handle.await;
}
}
}
async fn execute_task(task: &ExternalTask) -> ExternalTaskResult {
match task.command_type.as_str() {
"read" => {
let path = task.payload["path"].as_str().unwrap();
match tokio::fs::read_to_string(path).await {
Ok(content) => ExternalTaskResult {
success: true,
result: serde_json::json!({ "content": content }),
error: None,
},
Err(e) => ExternalTaskResult {
success: false,
result: serde_json::json!({}),
error: Some(e.to_string()),
},
}
}
"bash" => {
let command = task.payload["command"].as_str().unwrap();
match tokio::process::Command::new("sh")
.arg("-c")
.arg(command)
.output()
.await
{
Ok(output) => ExternalTaskResult {
success: output.status.success(),
result: serde_json::json!({
"stdout": String::from_utf8_lossy(&output.stdout),
"stderr": String::from_utf8_lossy(&output.stderr),
"exit_code": output.status.code()
}),
error: None,
},
Err(e) => ExternalTaskResult {
success: false,
result: serde_json::json!({}),
error: Some(e.to_string()),
},
}
}
_ => ExternalTaskResult {
success: false,
result: serde_json::json!({}),
error: Some(format!("Unknown command type: {}", task.command_type)),
},
}
}import asyncio
from a3s_code import AgentSession, ExternalTaskResult
async def worker_loop(session: AgentSession):
"""Worker loop - can run on multiple machines"""
while True:
# Poll for pending tasks
tasks = await session.pending_external_tasks()
if not tasks:
await asyncio.sleep(0.1)
continue
# Process tasks in parallel
await asyncio.gather(*[
process_task(session, task) for task in tasks
])
async def process_task(session: AgentSession, task):
"""Process a single task"""
result = await execute_task(task)
await session.complete_external_task(task.task_id, result)
async def execute_task(task):
"""Execute task based on command type"""
if task.command_type == "read":
path = task.payload["path"]
try:
with open(path, 'r') as f:
content = f.read()
return ExternalTaskResult(
success=True,
result={"content": content},
error=None
)
except Exception as e:
return ExternalTaskResult(
success=False,
result={},
error=str(e)
)
elif task.command_type == "bash":
command = task.payload["command"]
try:
proc = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
return ExternalTaskResult(
success=proc.returncode == 0,
result={
"stdout": stdout.decode(),
"stderr": stderr.decode(),
"exit_code": proc.returncode
},
error=None
)
except Exception as e:
return ExternalTaskResult(
success=False,
result={},
error=str(e)
)
else:
return ExternalTaskResult(
success=False,
result={},
error=f"Unknown command type: {task.command_type}"
)import { AgentSession, ExternalTask, ExternalTaskResult } from '@a3s-lab/code';
import { exec } from 'child_process';
import { promisify } from 'util';
import { readFile } from 'fs/promises';
const execAsync = promisify(exec);
async function workerLoop(session: AgentSession): Promise<void> {
while (true) {
// Poll for pending tasks
const tasks = await session.pendingExternalTasks();
if (tasks.length === 0) {
await new Promise(resolve => setTimeout(resolve, 100));
continue;
}
// Process tasks in parallel
await Promise.all(tasks.map(task => processTask(session, task)));
}
}
async function processTask(session: AgentSession, task: ExternalTask): Promise<void> {
const result = await executeTask(task);
await session.completeExternalTask(task.taskId, result);
}
async function executeTask(task: ExternalTask): Promise<ExternalTaskResult> {
switch (task.commandType) {
case 'read': {
const path = task.payload.path;
try {
const content = await readFile(path, 'utf-8');
return {
success: true,
result: { content },
error: null
};
} catch (e) {
return {
success: false,
result: {},
error: e.message
};
}
}
case 'bash': {
const command = task.payload.command;
try {
const { stdout, stderr } = await execAsync(command);
return {
success: true,
result: { stdout, stderr, exitCode: 0 },
error: null
};
} catch (e) {
return {
success: false,
result: { stdout: e.stdout, stderr: e.stderr, exitCode: e.code },
error: e.message
};
}
}
default:
return {
success: false,
result: {},
error: `Unknown command type: ${task.commandType}`
};
}
}Step 3: Deploy Workers
# Worker 1 (local machine)
./worker --session-id abc123
# Worker 2 (remote server)
ssh user@remote "cd /app && ./worker --session-id abc123"
# Worker 3 (container)
docker run -e SESSION_ID=abc123 myapp/worker
# Worker 4 (cloud)
kubectl run worker --image=myapp/worker --env="SESSION_ID=abc123"Use Cases
1. Parallel Code Search
// Configure Query lane for external processing
session.set_lane_handler(SessionLane::Query, LaneHandlerConfig {
mode: TaskHandlerMode::External,
timeout_ms: 120_000,
}).await;
// Agent sends message
let result = session.send("Search for all authentication functions").await?;
// Behind the scenes:
// - Agent generates multiple grep/read tasks
// - Tasks are distributed to workers
// - Workers execute in parallel across machines
// - Results are aggregated and returned2. Distributed Test Execution
// Configure Execute lane for external processing
session.set_lane_handler(SessionLane::Execute, LaneHandlerConfig {
mode: TaskHandlerMode::External,
timeout_ms: 300_000, // 5 minutes
}).await;
// Agent sends message
let result = session.send("Run all tests in parallel").await?;
// Behind the scenes:
// - Agent generates bash tasks for each test suite
// - Tasks are distributed to workers
// - Workers run tests in parallel
// - Results are collected and reported3. Multi-Region File Processing
// Workers in different regions
// Worker 1: US-East
// Worker 2: EU-West
// Worker 3: Asia-Pacific
// Each worker polls for tasks and executes locally
// Reduces latency for region-specific file operations4. Hybrid Mode for Monitoring
// Execute internally but notify external monitoring system
session.set_lane_handler(SessionLane::Execute, LaneHandlerConfig {
mode: TaskHandlerMode::Hybrid,
timeout_ms: 60_000,
}).await;
// External system receives notifications for:
// - Task started
// - Task completed
// - Task failed
// Can be used for logging, metrics, alertingBest Practices
Choose the Right Lane
- Query lane — Best for parallelizable read operations (grep, read, glob, ls)
- Execute lane — For write operations and bash commands
- Control lane — Keep internal for control operations
- Generate lane — Keep internal for LLM calls (unless you have custom LLM infrastructure)
Set Appropriate Timeouts
// Short timeout for fast operations
LaneHandlerConfig {
mode: TaskHandlerMode::External,
timeout_ms: 30_000, // 30 seconds
}
// Long timeout for heavy operations
LaneHandlerConfig {
mode: TaskHandlerMode::External,
timeout_ms: 600_000, // 10 minutes
}Handle Timeouts Gracefully
async fn execute_task(task: &ExternalTask) -> ExternalTaskResult {
// Check if task is already timed out
if task.is_timed_out() {
return ExternalTaskResult {
success: false,
result: serde_json::json!({}),
error: Some("Task timed out before execution".to_string()),
};
}
// Execute with remaining time
let remaining = task.remaining_ms();
// ... execute with timeout
}Monitor Queue Metrics
// Get queue statistics
let stats = session.queue_stats().await;
println!("Pending: {}, Active: {}, External: {}",
stats.pending_count,
stats.active_count,
stats.external_count
);
// Get detailed metrics
if let Some(metrics) = session.queue_metrics().await {
println!("Total processed: {}", metrics.total_processed);
println!("Success rate: {:.2}%", metrics.success_rate * 100.0);
}Implement Retry Logic
async fn execute_task_with_retry(task: &ExternalTask, max_retries: u32) -> ExternalTaskResult {
for attempt in 0..max_retries {
let result = execute_task(task).await;
if result.success {
return result;
}
if attempt < max_retries - 1 {
tokio::time::sleep(Duration::from_secs(2_u64.pow(attempt))).await;
}
}
ExternalTaskResult {
success: false,
result: serde_json::json!({}),
error: Some(format!("Failed after {} retries", max_retries)),
}
}Performance Considerations
Concurrency Limits
let queue_config = SessionQueueConfig {
query_max_concurrency: 20, // High for read operations
execute_max_concurrency: 5, // Lower for write operations
..Default::default()
};Worker Scaling
- Start with 1 worker per machine
- Monitor queue depth and task latency
- Add workers if queue depth grows
- Each worker can process multiple tasks concurrently
Network Latency
- External mode adds network round-trip latency
- Use for tasks that take > 100ms to execute
- For very fast operations (< 10ms), internal mode is faster
Monitoring and Debugging
Enable Metrics
let queue_config = SessionQueueConfig {
enable_metrics: true,
enable_alerts: true,
..Default::default()
};Check Queue Stats
let stats = session.queue_stats().await;
println!("Queue Stats:");
println!(" Pending: {}", stats.pending_count);
println!(" Active: {}", stats.active_count);
println!(" External: {}", stats.external_count);
println!(" Per lane: {:?}", stats.per_lane);Monitor Task Latency
if let Some(metrics) = session.queue_metrics().await {
println!("Average latency: {:?}", metrics.avg_latency);
println!("P95 latency: {:?}", metrics.p95_latency);
println!("P99 latency: {:?}", metrics.p99_latency);
}Summary
A3S Code's external task distribution enables:
- ✅ Multi-machine parallelization — Distribute tasks across multiple workers
- ✅ Flexible execution modes — Internal, External, or Hybrid per lane
- ✅ Priority-based scheduling — 4 lanes with configurable priorities
- ✅ Built-in timeout handling — Automatic timeout detection and cleanup
- ✅ Metrics and monitoring — Track queue depth, latency, success rate
- ✅ Language-agnostic workers — Implement workers in Rust, Python, TypeScript, or any language
This makes A3S Code suitable for:
- Large-scale code analysis across multiple repositories
- Distributed test execution
- Multi-region deployments
- Custom execution environments (containers, VMs, specialized hardware)
- Horizontal scaling of tool execution
Related
- Architecture — System design and components
- Lane Queue — Priority routing and task distribution
- Tasks — Planning and parallel execution
API Reference
LaneHandlerConfig
Prop
Type
ExternalTask fields
Prop
Type
ExternalTaskResult fields
Prop
Type