bots-and-co/PLAN.md
Pierre Wessman 896ff001eb .
2025-11-27 12:06:29 +01:00

1924 lines
59 KiB
Markdown

# Agent Orchestra
A multi-agent system where Claude-powered agents collaborate via Slack, each running in isolated Docker containers with role-based permissions and tools.
---
## Claude Agent SDK Integration Notes
### Key SDK Concepts
1. **Two APIs**:
- `query()` - One-shot, creates new session each time
- `ClaudeSDKClient` - Persistent session, maintains conversation history
We use `query()` with explicit memory injection because:
- Full control over what context goes into each request
- Memory persists in files (survives restarts)
- Inspectable/debuggable - can read memory.md to see agent state
- More cost-effective - only sends relevant context, not full history
- Selective memory - can summarize, prune, or prioritize what's included
2. **Built-in Tools**: The SDK includes Claude Code's tools:
- `Read`, `Write`, `Edit` - File operations
- `Bash` - Shell commands
- `Glob`, `Grep` - File search
- `WebSearch`, `WebFetch` - Web access
- `Task` - Subagent spawning
- `NotebookEdit` - Jupyter notebooks
3. **Custom Tools via SDK MCP Servers**:
- Use `@tool` decorator and `create_sdk_mcp_server()`
- Runs in-process (no subprocess overhead)
- Tools are prefixed: `mcp__<server>__<tool>`
4. **Hooks for Permissions**:
- `PreToolUse` - Check before tool execution, can deny
- `PostToolUse` - Log/audit after execution
- Defined via `HookMatcher` in `ClaudeAgentOptions`
5. **Permission Modes**:
- `"default"` - Standard permission behavior
- `"acceptEdits"` - Auto-accept file edits
- `"bypassPermissions"` - Skip all checks (use with caution)
### Architecture Decision: Orchestrator vs. Per-Container SDK
**Option A: Orchestrator runs SDK**
```
┌─────────────────────────────────────────┐
│ Orchestrator │
│ - Runs ClaudeSDKClient per agent │
│ - Routes Slack messages │
│ - Agents execute via SDK tools │
│ (Bash, Write, etc. run in container) │
└─────────────────────────────────────────┘
│ docker exec
┌─────────────────┐
│ Agent Container │
│ - Workspace │
│ - Git clone │
│ - No SDK here │
└─────────────────┘
```
**Option B: SDK runs inside each container (chosen)**
```
┌─────────────────────┐
│ Orchestrator │
│ - Slack listener │
│ - Message routing │
│ - HTTP API │
└─────────────────────┘
│ HTTP
┌─────────────────┐ ┌─────────────────┐
│ Agent Container │ │ Agent Container │
│ - SDK Client │ │ - SDK Client │
│ - Claude CLI │ │ - Claude CLI │
│ - Node.js │ │ - Node.js │
│ - Workspace │ │ - Workspace │
└─────────────────┘ └─────────────────┘
```
We choose **Option B** because:
- True isolation: each agent's SDK tools operate directly on their container filesystem
- Simpler architecture: no `docker exec` complexity or volume mounting tricks
- Better scalability: agents can run on different hosts
- Cleaner permission model: container boundaries enforce filesystem isolation
- More realistic: tools like Bash run in the agent's actual environment
### Setting Up SDK with query() and Memory Injection
```python
from claude_agent_sdk import query, ClaudeAgentOptions, tool, create_sdk_mcp_server
# Create custom tools
@tool("slack_send", "Send Slack message", {"channel": str, "text": str})
async def slack_send(args):
await slack_client.chat_postMessage(
channel=args["channel"],
text=args["text"]
)
return {"content": [{"type": "text", "text": "Sent"}]}
# Create MCP server
tools_server = create_sdk_mcp_server(
name="agent-tools",
tools=[slack_send]
)
# Configure agent options
options = ClaudeAgentOptions(
system_prompt=agent_config.system_prompt,
model="sonnet",
# Built-in tools
allowed_tools=["Read", "Write", "Edit", "Bash", "Glob", "Grep"],
disallowed_tools=["Task"], # Don't allow spawning subagents
# Custom tools via MCP
mcp_servers={"tools": tools_server},
# Permission mode
permission_mode="acceptEdits",
# Working directory (mapped to container volume)
cwd="/data/workspaces/developer",
# Hooks for fine-grained control
hooks={
"PreToolUse": [HookMatcher(hooks=[permission_checker])]
}
)
# Query pattern: inject memory + task, get response, update memory
async def process_task(task: str, memory_path: str) -> str:
# 1. Load relevant memory
memory_content = read_memory(memory_path)
# 2. Construct prompt with memory context
prompt = f"""## Your Memory
{memory_content}
## Current Task
{task}
After completing the task, note any important information to remember."""
# 3. One-shot query (no session state)
response = await query(prompt, options=options)
# 4. Extract and update memory from response
update_memory(memory_path, response)
return response
```
---
## Project Overview
**Goal:** Build a server that orchestrates multiple async Claude agents (using Claude Agent SDK) that communicate through Slack, manage projects together, and can delegate work to each other.
**Key Concepts:**
- Agents defined via YAML (roles, tools, permissions, model)
- Slack as the communication layer (threads, channels, mentions)
- Each agent runs in its own persistent Docker container
- Shared git repo with branch-based access control
- Simple JSON-based project/task management
- CEO agent as supervisor/orchestrator
---
## Architecture
```
┌─────────────────────────────────────────────────────────────────┐
│ Host Machine │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ Orchestrator Service ││
│ │ - Slack event listener (Socket Mode) ││
│ │ - HTTP API for agent communication ││
│ │ - Message routing to agent containers ││
│ │ - Agent lifecycle management ││
│ └─────────────────────────────────────────────────────────────┘│
│ │ HTTP │
│ ┌────────────────────┼────────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ CEO Agent │ │ PM Agent │ │ Dev Agent │ │
│ │ Container │ │ Container │ │ Container │ │
│ │ │ │ │ │ │ │
│ │ - SDK Client│ │ - SDK Client│ │ - SDK Client│ │
│ │ - Claude CLI│ │ - Claude CLI│ │ - Claude CLI│ │
│ │ - Node.js │ │ - Node.js │ │ - Node.js │ │
│ │ - memory/ │ │ - memory/ │ │ - memory/ │ │
│ │ - workspace/│ │ - workspace/│ │ - workspace/│ │
│ │ - HTTP API │ │ - HTTP API │ │ - HTTP API │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ └────────────────────┼────────────────────┘ │
│ │ git clone/push │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Gitea Server │ │
│ │ (external) │ │
│ └─────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
```
---
## Directory Structure
```
agent-orchestra/
├── CLAUDE.md # Claude Code instructions
├── README.md
├── pyproject.toml
├── docker-compose.yml
├── Dockerfile.orchestrator # Lightweight routing service
├── Dockerfile.agent # Full agent container (SDK, Node.js, CLI)
├── config/
│ ├── orchestra.yml # Global config (Slack, Gitea, Docker)
│ └── agents/
│ ├── ceo.yml
│ ├── product_manager.yml
│ ├── developer.yml
│ ├── tech_lead.yml
│ └── designer.yml
├── src/
│ └── orchestra/
│ ├── __init__.py
│ ├── main.py # Entry point
│ │
│ ├── core/
│ │ ├── __init__.py
│ │ ├── orchestrator.py # Main service, Slack listener
│ │ ├── agent.py # ClaudeSDKClient wrapper
│ │ ├── config.py # YAML config → Pydantic models
│ │ ├── container.py # Docker workspace management
│ │ └── permissions.py # PreToolUse hooks
│ │
│ ├── slack/
│ │ ├── __init__.py
│ │ ├── client.py # Slack API wrapper
│ │ ├── events.py # Event handlers, routing
│ │ └── formatter.py # Message formatting
│ │
│ ├── queue/
│ │ ├── __init__.py
│ │ ├── redis_client.py # AgentQueue class for message handling
│ │ ├── publisher.py # Orchestrator-side message publishing
│ │ └── consumer.py # Agent-side message consumption
│ │
│ ├── tools/
│ │ ├── __init__.py
│ │ ├── server.py # create_sdk_mcp_server setup
│ │ ├── slack_tools.py # @tool decorated Slack operations
│ │ ├── task_tools.py # @tool decorated task management
│ │ ├── gitea_tools.py # @tool decorated Gitea/PR operations
│ │ └── gitea_client.py # Gitea API client (httpx)
│ │
│ └── memory/
│ ├── __init__.py
│ └── manager.py # Memory file read/write
├── data/ # Mounted volumes
│ ├── workspaces/ # Per-agent working directories (each clones from Gitea)
│ │ ├── ceo/
│ │ ├── developer/
│ │ ├── product_manager/
│ │ └── tech_lead/
│ ├── projects/ # Project/task JSON files
│ │ └── boards.json
│ └── memory/ # Agent memory folders
│ ├── ceo/
│ │ └── memory.md
│ ├── product_manager/
│ │ └── memory.md
│ └── developer/
│ └── memory.md
└── tests/
├── __init__.py
├── test_agent.py
├── test_tools.py
├── test_permissions.py
└── test_slack.py
```
---
## Configuration Schema
### Global Config (`config/orchestra.yml`)
```yaml
slack:
app_token: "${SLACK_APP_TOKEN}"
bot_token: "${SLACK_BOT_TOKEN}"
default_channel: "general"
git:
provider: "gitea"
base_url: "${GITEA_URL}" # e.g., https://git.example.com
api_token: "${GITEA_API_TOKEN}"
repo_owner: "${GITEA_REPO_OWNER}"
repo_name: "${GITEA_REPO_NAME}"
main_branch: "main"
dev_branch: "dev"
docker:
network: "orchestra-net"
base_image: "agent-orchestra:latest"
mcp:
expose_port: 8080
external_servers:
- name: "filesystem"
url: "stdio://mcp-filesystem"
```
### CEO Agent Config (`config/agents/ceo.yml`)
```yaml
id: ceo
name: "CEO Chris"
slack_user_id: "${SLACK_CEO_USER_ID}"
model: "sonnet"
max_turns: 100
system_prompt: |
You are CEO Chris, the supervisor of the agent team.
You have broad oversight and can step in when needed.
Your responsibilities:
- Monitor team progress and intervene if blocked
- Make high-level decisions when there's disagreement
- Ensure quality standards are maintained
- Approve major architectural decisions
- Merge approved changes to main branch
You generally let the team operate autonomously but
provide guidance when asked or when you notice issues.
You have access to all tools and can perform any operation.
# All tools allowed
allowed_tools:
- Read
- Write
- Edit
- Bash
- Glob
- Grep
- WebSearch
- WebFetch
- Task
# All Slack tools
- mcp__agent__slack_send_message
- mcp__agent__slack_reply_thread
- mcp__agent__slack_create_channel
- mcp__agent__slack_delete_channel
- mcp__agent__slack_invite_to_channel
# All task tools
- mcp__agent__tasks_create_project
- mcp__agent__tasks_create_board
- mcp__agent__tasks_create
- mcp__agent__tasks_assign
- mcp__agent__tasks_update_status
- mcp__agent__tasks_view
- mcp__agent__tasks_delete
# All git tools
- mcp__agent__gitea_create_pr
- mcp__agent__gitea_list_prs
- mcp__agent__gitea_get_pr
- mcp__agent__gitea_merge_pr
- mcp__agent__gitea_add_review
- mcp__agent__gitea_merge_to_main
disallowed_tools: [] # CEO has no restrictions
permissions:
filesystem:
allowed_paths:
- "/**"
git:
can_push_to:
- "*"
can_merge_to:
- "main"
- "dev"
memory:
path: "/memory/ceo"
files:
- "memory.md"
can_mention:
- "*"
can_delegate_to:
- "*"
container:
resources:
memory: "2g"
cpus: "1.0"
```
### Tech Lead Config (`config/agents/tech_lead.yml`)
```yaml
id: tech_lead
name: "Tech Lead Terry"
slack_user_id: "${SLACK_TECHLEAD_USER_ID}"
model: "sonnet"
max_turns: 50
system_prompt: |
You are Tech Lead Terry, responsible for code quality and architecture.
Your responsibilities:
- Review PRs from developers
- Make architectural decisions
- Ensure code quality and test coverage
- Mentor developers on best practices
- Approve PRs for merge to dev
When reviewing code:
1. Check for correctness and edge cases
2. Verify tests exist and are meaningful
3. Look for security issues
4. Ensure code follows project conventions
5. Provide constructive feedback
allowed_tools:
- Read
- Write
- Edit
- Bash
- Glob
- Grep
- WebSearch
- WebFetch
# Slack tools
- mcp__agent__slack_send_message
- mcp__agent__slack_reply_thread
# Task tools
- mcp__agent__tasks_view
- mcp__agent__tasks_update_status
- mcp__agent__tasks_create # Can create tech debt tasks
- mcp__agent__tasks_add_comment
# Git tools - can review but not merge
- mcp__agent__gitea_list_prs
- mcp__agent__gitea_get_pr
- mcp__agent__gitea_add_review
- mcp__agent__gitea_add_comment
disallowed_tools:
- mcp__agent__gitea_merge_pr
- mcp__agent__slack_create_channel
permissions:
filesystem:
allowed_paths:
- "/workspace/**"
git:
can_push_to: [] # Tech lead reviews, doesn't push
can_merge_to: [] # Only PM merges
memory:
path: "/memory/tech_lead"
files:
- "memory.md"
can_mention:
- developer
- product_manager
- ceo
```
### Agent Config (`config/agents/developer.yml`)
```yaml
id: developer
name: "Dev Dana"
slack_user_id: "${SLACK_DEV_USER_ID}" # Bot user for this agent
# Claude Agent SDK settings
model: "sonnet" # sonnet, opus, haiku
max_turns: 50
system_prompt: |
You are Dev Dana, a skilled software developer working in a team.
You write clean, well-tested code. You work on feature branches
and create PRs for review. You communicate progress in Slack.
When assigned a task, you:
1. Understand requirements fully (ask PM if unclear)
2. Create a feature branch from dev
3. Implement the solution with tests
4. Create a PR and notify tech_lead for review
Clone repositories from Gitea into /workspace.
Use the Slack tools to communicate with your team.
Use the task tools to update your task status.
# Built-in Claude Code tools (Read, Write, Edit, Bash, Glob, Grep, WebSearch, etc.)
allowed_tools:
- Read
- Write
- Edit
- Bash
- Glob
- Grep
- WebSearch
- WebFetch
# Custom MCP tools (prefixed with mcp__<server>__)
- mcp__agent__slack_send_message
- mcp__agent__slack_reply_thread
- mcp__agent__tasks_view
- mcp__agent__tasks_update_status
- mcp__agent__gitea_create_pr
- mcp__agent__gitea_list_prs
disallowed_tools:
- mcp__agent__slack_create_channel
- mcp__agent__slack_delete_channel
- mcp__agent__tasks_create_project
- mcp__agent__gitea_merge_pr
# Permission hooks configuration
permissions:
filesystem:
allowed_paths:
- "/workspace/**"
denied_paths:
- "/workspace/**/.git/config"
- "**/.env"
- "**/secrets/**"
git:
branch_pattern: "feature/*"
can_push_to:
- "feature/*"
cannot_push_to:
- "main"
- "dev"
memory:
path: "/memory/developer"
files:
- "memory.md"
can_mention:
- tech_lead
- product_manager
- ceo
container:
resources:
memory: "2g"
cpus: "1.0"
environment:
- "PYTHONPATH=/workspace"
```
### Designer Config (`config/agents/designer.yml`)
```yaml
id: designer
name: "Designer Dani"
slack_user_id: "${SLACK_DESIGNER_USER_ID}"
model: "sonnet"
max_turns: 50
system_prompt: |
You are Designer Dani, responsible for UI/UX design and frontend aesthetics.
Your responsibilities:
- Create and refine UI designs based on requirements
- Implement CSS/styling changes
- Ensure consistent design system usage
- Review frontend PRs for design consistency
- Provide design feedback and mockup descriptions
When working on design tasks:
1. Understand the user experience goals
2. Reference the existing design system (shadcn/ui, Tailwind)
3. Implement pixel-perfect styling
4. Ensure responsive design across breakpoints
5. Document design decisions in comments
You work closely with developers on frontend implementation.
allowed_tools:
- Read
- Write
- Edit
- Bash
- Glob
- Grep
- WebSearch
- WebFetch
# Slack tools
- mcp__agent__slack_send_message
- mcp__agent__slack_reply_thread
# Task tools
- mcp__agent__tasks_view
- mcp__agent__tasks_update_status
- mcp__agent__tasks_add_comment
# Git tools - can create PRs for design changes
- mcp__agent__gitea_create_pr
- mcp__agent__gitea_list_prs
- mcp__agent__gitea_add_comment
disallowed_tools:
- mcp__agent__slack_create_channel
- mcp__agent__slack_delete_channel
- mcp__agent__tasks_create_project
- mcp__agent__tasks_create_board
- mcp__agent__gitea_merge_pr
permissions:
filesystem:
allowed_paths:
- "/workspace/**/*.css"
- "/workspace/**/*.scss"
- "/workspace/**/*.tsx"
- "/workspace/**/*.jsx"
- "/workspace/**/components/**"
- "/workspace/**/styles/**"
- "/workspace/**/public/**"
denied_paths:
- "/workspace/**/.git/config"
- "**/.env"
- "**/secrets/**"
- "/workspace/**/api/**"
- "/workspace/**/server/**"
git:
branch_pattern: "design/*"
can_push_to:
- "design/*"
cannot_push_to:
- "main"
- "dev"
- "feature/*"
memory:
path: "/memory/designer"
files:
- "memory.md"
can_mention:
- developer
- product_manager
- tech_lead
container:
resources:
memory: "2g"
cpus: "1.0"
```
### Product Manager Config (`config/agents/product_manager.yml`)
```yaml
id: product_manager
name: "PM Paula"
slack_user_id: "${SLACK_PM_USER_ID}"
model: "sonnet"
max_turns: 50
system_prompt: |
You are PM Paula, a product manager coordinating the team.
You translate requirements into actionable tasks, create project
channels, and ensure smooth delivery.
When given a new feature request:
1. Break it down into tasks
2. Create a project channel if needed
3. Assign tasks to appropriate team members
4. Track progress and remove blockers
You can merge approved PRs to the dev branch.
You cannot merge to main - that requires CEO approval.
allowed_tools:
- Read
- Write
- Glob
- Grep
- Bash
- WebSearch
- WebFetch
# Slack tools
- mcp__agent__slack_send_message
- mcp__agent__slack_reply_thread
- mcp__agent__slack_create_channel
- mcp__agent__slack_invite_to_channel
# Task tools
- mcp__agent__tasks_create_project
- mcp__agent__tasks_create_board
- mcp__agent__tasks_create
- mcp__agent__tasks_assign
- mcp__agent__tasks_update_status
- mcp__agent__tasks_view
# Git/Gitea tools
- mcp__agent__gitea_list_prs
- mcp__agent__gitea_get_pr
- mcp__agent__gitea_merge_pr
- mcp__agent__gitea_add_review
disallowed_tools:
- mcp__agent__gitea_push_to_main
permissions:
filesystem:
allowed_paths:
- "/workspace/**"
- "/projects/**"
git:
can_merge_to:
- "dev"
cannot_merge_to:
- "main"
memory:
path: "/memory/product_manager"
files:
- "memory.md"
can_mention:
- developer
- tech_lead
- designer
- ceo
can_delegate_to:
- developer
- designer
```
---
## Core Components
### 1. Orchestrator (`src/orchestra/core/orchestrator.py`)
```python
"""
Lightweight orchestrator that:
- Listens to Slack events (Socket Mode)
- Routes messages to agent containers via HTTP
- Manages container lifecycle (start/stop)
- Does NOT run the Claude SDK (agents do)
"""
class Orchestrator:
async def start(self) -> None: ...
async def stop(self) -> None: ...
async def route_message(self, event: SlackEvent) -> None: ...
async def send_to_agent(self, agent_id: str, message: str) -> str: ...
async def start_agent_container(self, agent_id: str) -> None: ...
async def stop_agent_container(self, agent_id: str) -> None: ...
```
### 2. Agent (`src/orchestra/agent/agent.py`)
Each agent runs inside its own Docker container with the Claude Agent SDK.
The agent exposes an HTTP API for the orchestrator to send messages.
```python
"""
Agent service running inside container:
- Exposes HTTP API for receiving messages from orchestrator
- Uses query() with explicit memory injection (stateless)
- Custom tools via SDK MCP servers (in-process)
- Tools operate directly on container filesystem
- Memory persisted to files, injected into each query
"""
from claude_agent_sdk import (
query,
ClaudeAgentOptions,
tool,
create_sdk_mcp_server,
HookMatcher
)
class AgentService:
"""HTTP service that uses stateless query() with memory injection."""
id: str
config: AgentConfig
options: ClaudeAgentOptions # Reused for each query
memory: MemoryManager
app: FastAPI # HTTP server for orchestrator communication
async def start(self) -> None:
"""Initialize SDK options and start HTTP server."""
# Create in-process MCP server for custom tools
tools_server = create_sdk_mcp_server(
name=f"{self.id}-tools",
version="1.0.0",
tools=self._build_tools()
)
self.options = ClaudeAgentOptions(
system_prompt=self.config.system_prompt,
model=self.config.model, # e.g., "sonnet", "opus"
allowed_tools=self.config.allowed_tools,
disallowed_tools=self.config.disallowed_tools,
mcp_servers={
"agent": tools_server,
# External MCP servers for git, etc.
"gitea": {
"type": "stdio",
"command": "gitea-mcp-server",
"args": ["--url", self.config.gitea_url]
}
},
permission_mode="acceptEdits", # Auto-accept in containers
cwd="/workspace", # Agent's local workspace
hooks=self._build_hooks()
)
# Start HTTP server for orchestrator
await self._start_http_server()
async def _start_http_server(self) -> None:
"""Start FastAPI server to receive messages from orchestrator."""
self.app = FastAPI()
@self.app.post("/message")
async def handle_message(request: MessageRequest) -> MessageResponse:
response = await self.process_message(request.text, request.context)
return MessageResponse(text=response)
@self.app.get("/health")
async def health():
return {"status": "ok", "agent_id": self.id}
async def process_message(self, message: str, context: dict) -> str:
"""
Stateless query with memory injection.
Flow:
1. Load memory from file
2. Construct prompt with memory + message
3. Execute query()
4. Update memory file with new learnings
5. Return response
"""
# Load current memory state
memory_content = await self.memory.load()
# Construct prompt with injected memory
prompt = self._build_prompt(message, memory_content, context)
# Stateless query - no session maintained
response = await query(prompt, options=self.options)
# Extract and persist any memory updates
await self.memory.update_from_response(response)
return self._extract_response_text(response)
def _build_prompt(self, message: str, memory: str, context: dict) -> str:
"""Construct prompt with memory context."""
thread_context = ""
if context.get("thread_messages"):
thread_context = "\n## Recent Thread Context\n"
for msg in context["thread_messages"][-5:]: # Last 5 messages
thread_context += f"- {msg['author']}: {msg['text']}\n"
return f"""## Your Memory
{memory}
{thread_context}
## Current Message
{message}
Complete this task. If you learn anything important (team preferences, project
details, decisions made), update your memory file at /memory/memory.md."""
async def stop(self) -> None:
"""Stop HTTP server."""
pass # No client to disconnect with stateless query()
def _build_tools(self) -> list:
"""Build custom tools based on agent config permissions."""
tools = []
if "slack" in self.config.tools:
tools.extend(self._create_slack_tools())
if "tasks" in self.config.tools:
tools.extend(self._create_task_tools())
return tools
def _build_hooks(self) -> dict:
"""Build permission hooks to enforce tool restrictions."""
async def check_permissions(input_data, tool_use_id, context):
tool_name = input_data.get("tool_name")
tool_input = input_data.get("tool_input", {})
# Check git branch restrictions
if tool_name == "Bash" and "git push" in tool_input.get("command", ""):
branch = self._extract_branch(tool_input["command"])
if not self._can_push_to_branch(branch):
return {
"hookSpecificOutput": {
"hookEventName": "PreToolUse",
"permissionDecision": "deny",
"permissionDecisionReason": f"Cannot push to {branch}"
}
}
return {}
return {
"PreToolUse": [HookMatcher(hooks=[check_permissions])]
}
```
### 3. Custom Tools with SDK MCP Server
The Claude Agent SDK supports in-process MCP servers, eliminating subprocess overhead.
```python
"""
Custom tools implemented as SDK MCP servers.
These run in-process for better performance.
"""
from claude_agent_sdk import tool, create_sdk_mcp_server
from typing import Any
# Slack Tools
@tool("slack_send_message", "Send a message to a Slack channel or thread", {
"channel": str,
"text": str,
"thread_ts": str # Optional, for threading
})
async def slack_send_message(args: dict[str, Any]) -> dict[str, Any]:
# Implementation uses slack_sdk
channel = args["channel"]
text = args["text"]
thread_ts = args.get("thread_ts")
result = await slack_client.chat_postMessage(
channel=channel,
text=text,
thread_ts=thread_ts
)
return {
"content": [{
"type": "text",
"text": f"Message sent to {channel}"
}]
}
@tool("slack_create_channel", "Create a new Slack channel", {
"name": str,
"description": str
})
async def slack_create_channel(args: dict[str, Any]) -> dict[str, Any]:
result = await slack_client.conversations_create(
name=args["name"]
)
return {
"content": [{
"type": "text",
"text": f"Created channel #{args['name']}"
}]
}
# Task Management Tools
@tool("tasks_create", "Create a new task", {
"title": str,
"description": str,
"assignee": str,
"board_id": str
})
async def tasks_create(args: dict[str, Any]) -> dict[str, Any]:
task = await task_manager.create_task(
title=args["title"],
description=args["description"],
assignee=args["assignee"],
board_id=args["board_id"]
)
return {
"content": [{
"type": "text",
"text": f"Created task {task['id']}: {task['title']}"
}]
}
@tool("tasks_list", "List tasks with optional filters", {
"board_id": str,
"status": str, # Optional: pending, in_progress, done
"assignee": str # Optional
})
async def tasks_list(args: dict[str, Any]) -> dict[str, Any]:
tasks = await task_manager.list_tasks(**args)
return {
"content": [{
"type": "text",
"text": json.dumps(tasks, indent=2)
}]
}
# Gitea Tools (PR management)
@tool("gitea_create_pr", "Create a pull request", {
"title": str,
"body": str,
"head": str, # Source branch
"base": str # Target branch (usually "dev")
})
async def gitea_create_pr(args: dict[str, Any]) -> dict[str, Any]:
pr = await gitea_client.create_pull_request(**args)
return {
"content": [{
"type": "text",
"text": f"Created PR #{pr['number']}: {pr['title']}"
}]
}
@tool("gitea_merge_pr", "Merge a pull request", {
"pr_number": int,
"merge_style": str # merge, rebase, squash
})
async def gitea_merge_pr(args: dict[str, Any]) -> dict[str, Any]:
result = await gitea_client.merge_pull_request(
args["pr_number"],
args["merge_style"]
)
return {
"content": [{
"type": "text",
"text": f"Merged PR #{args['pr_number']}"
}]
}
# Create server with all tools
def create_agent_tools_server(agent_config: AgentConfig):
"""Create MCP server with tools filtered by agent permissions."""
all_tools = {
"slack_send_message": slack_send_message,
"slack_create_channel": slack_create_channel,
"tasks_create": tasks_create,
"tasks_list": tasks_list,
"gitea_create_pr": gitea_create_pr,
"gitea_merge_pr": gitea_merge_pr,
}
# Filter based on agent's allowed operations
allowed = []
for tool_name, tool_fn in all_tools.items():
if agent_config.can_use_tool(tool_name):
allowed.append(tool_fn)
return create_sdk_mcp_server(
name=f"{agent_config.id}-tools",
version="1.0.0",
tools=allowed
)
```
### 4. Container Manager (`src/orchestra/core/container.py`)
```python
"""
Docker container lifecycle management:
- Build agent images
- Start/stop containers
- Volume mounting
- Network configuration
"""
class ContainerManager:
async def create_container(self, agent_config: AgentConfig) -> Container: ...
async def start_container(self, container_id: str) -> None: ...
async def stop_container(self, container_id: str) -> None: ...
async def exec_in_container(self, container_id: str, cmd: str) -> str: ...
```
---
## Tool Implementations
### Tasks Tool (`src/orchestra/tools/tasks.py`)
```python
"""
JSON-based project management.
File: /projects/boards.json
"""
# Schema
{
"projects": {
"project-123": {
"id": "project-123",
"name": "Company Website",
"channel": "proj-website",
"created_by": "product_manager",
"created_at": "2025-01-15T10:00:00Z"
}
},
"boards": {
"board-456": {
"id": "board-456",
"project_id": "project-123",
"name": "Sprint 1",
"columns": ["todo", "in_progress", "review", "done"]
}
},
"tasks": {
"task-789": {
"id": "task-789",
"board_id": "board-456",
"title": "Implement homepage",
"description": "Create responsive homepage with hero section",
"status": "in_progress",
"assignee": "developer",
"created_by": "product_manager",
"branch": "feature/task-789",
"pr_url": null,
"comments": [
{
"author": "developer",
"text": "Started working on this",
"timestamp": "2025-01-15T11:00:00Z"
}
]
}
}
}
```
### Git Tool (`src/orchestra/tools/git.py`)
```python
"""
Git operations with permission checking.
Enforces branch patterns and operation restrictions.
Uses Gitea API for PR operations.
"""
class GitTool:
# Local git operations
async def checkout_branch(self, branch: str) -> str: ...
async def create_branch(self, branch: str, from_ref: str = "dev") -> str: ...
async def commit(self, message: str, files: list[str]) -> str: ...
async def push(self, branch: str) -> str: ...
async def pull(self, branch: str) -> str: ...
async def status(self) -> str: ...
async def diff(self, ref: str = "HEAD") -> str: ...
# Gitea API operations
async def create_pr(self, title: str, body: str, head: str, base: str = "dev") -> dict: ...
async def list_prs(self, state: str = "open") -> list[dict]: ...
async def get_pr(self, pr_number: int) -> dict: ...
async def add_pr_comment(self, pr_number: int, body: str) -> dict: ...
async def add_pr_review(self, pr_number: int, body: str, approved: bool) -> dict: ...
async def merge_pr(self, pr_number: int, merge_style: str = "merge") -> dict: ... # Permission checked
class GiteaClient:
"""
Async client for Gitea API.
Docs: https://docs.gitea.com/development/api-usage
"""
def __init__(self, base_url: str, token: str, owner: str, repo: str):
self.base_url = base_url.rstrip("/")
self.token = token
self.owner = owner
self.repo = repo
async def create_pull_request(
self, title: str, body: str, head: str, base: str
) -> dict:
"""POST /repos/{owner}/{repo}/pulls"""
...
async def get_pull_request(self, number: int) -> dict:
"""GET /repos/{owner}/{repo}/pulls/{index}"""
...
async def merge_pull_request(
self, number: int, merge_style: str = "merge" # merge, rebase, squash
) -> dict:
"""POST /repos/{owner}/{repo}/pulls/{index}/merge"""
...
async def create_review(
self, number: int, body: str, event: str # APPROVE, REQUEST_CHANGES, COMMENT
) -> dict:
"""POST /repos/{owner}/{repo}/pulls/{index}/reviews"""
...
```
---
## Message Queue Architecture (Redis)
Redis provides reliable message queuing between the orchestrator and agent containers, ensuring messages aren't lost if an agent is temporarily unavailable.
### Queue Structure
```
Redis Keys:
├── queue:agent:{agent_id}:inbox # Messages waiting to be processed
├── queue:agent:{agent_id}:processing # Currently being processed (for reliability)
├── queue:agent:{agent_id}:outbox # Responses ready for orchestrator
├── agent:{agent_id}:status # Agent health status (heartbeat)
└── agent:{agent_id}:current_task # Current task ID (for recovery)
```
### Message Flow with Redis
```
┌─────────────────────────────────────────────────────────────────┐
│ Orchestrator │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│ │ Slack │───▶│ Router │───▶│ Redis Publisher │ │
│ │ Listener │ │ │ │ (LPUSH to inbox) │ │
│ └─────────────┘ └─────────────┘ └─────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────┐ │ │
│ │ Response Handler │◀────────────┘ │
│ │ (BRPOP from outbox queues) │ │
│ └─────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│ Redis
┌─────────────────────────────────────────────────────────────────┐
│ Agent Container │
│ ┌─────────────────────────────────┐ │
│ │ Message Consumer │ │
│ │ (BRPOPLPUSH inbox→processing) │ │
│ └───────────────┬─────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────┐ │
│ │ Claude SDK query() │ │
│ │ - Inject memory │ │
│ │ - Execute tools │ │
│ │ - Update memory │ │
│ └───────────────┬─────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────┐ │
│ │ Response Publisher │ │
│ │ (LPUSH to outbox, DEL from │ │
│ │ processing) │ │
│ └─────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
```
### Reliability Features
1. **At-least-once delivery**: Using `BRPOPLPUSH` moves message to processing queue atomically
2. **Recovery on crash**: On startup, agents check processing queue for incomplete tasks
3. **Heartbeat monitoring**: Agents publish heartbeat to `agent:{id}:status` every 10s
4. **Dead letter queue**: Messages that fail 3 times move to `queue:dead_letter`
### Message Format
```python
# Inbox message (orchestrator → agent)
{
"id": "msg-uuid-123",
"timestamp": "2025-01-15T10:00:00Z",
"type": "slack_message",
"payload": {
"text": "@developer please implement the login feature",
"channel": "proj-website",
"thread_ts": "1234567890.123456",
"user": "U123456", # Slack user who sent it
"context": {
"thread_messages": [...], # Recent thread history
"mentioned_agents": ["developer"]
}
},
"reply_to": "queue:orchestrator:responses",
"retry_count": 0
}
# Outbox message (agent → orchestrator)
{
"id": "resp-uuid-456",
"request_id": "msg-uuid-123",
"timestamp": "2025-01-15T10:00:05Z",
"agent_id": "developer",
"payload": {
"text": "I'll start working on the login feature...",
"channel": "proj-website",
"thread_ts": "1234567890.123456",
"mentions": ["tech_lead"] # Agents to notify
}
}
```
### Redis Client Implementation
```python
"""
src/orchestra/queue/redis_client.py
"""
import redis.asyncio as redis
from typing import Optional
import json
class AgentQueue:
def __init__(self, agent_id: str, redis_url: str = "redis://localhost:6379"):
self.agent_id = agent_id
self.redis = redis.from_url(redis_url)
self.inbox = f"queue:agent:{agent_id}:inbox"
self.processing = f"queue:agent:{agent_id}:processing"
self.outbox = f"queue:agent:{agent_id}:outbox"
async def receive(self, timeout: int = 0) -> Optional[dict]:
"""Block until message available, move to processing atomically."""
result = await self.redis.brpoplpush(
self.inbox, self.processing, timeout=timeout
)
if result:
return json.loads(result)
return None
async def complete(self, message_id: str, response: dict) -> None:
"""Mark message complete and publish response."""
pipe = self.redis.pipeline()
# Remove from processing
pipe.lrem(self.processing, 1, message_id)
# Publish response
pipe.lpush(self.outbox, json.dumps(response))
await pipe.execute()
async def recover_incomplete(self) -> list[dict]:
"""On startup, recover any messages left in processing."""
messages = []
while True:
msg = await self.redis.rpoplpush(self.processing, self.inbox)
if not msg:
break
messages.append(json.loads(msg))
return messages
async def heartbeat(self) -> None:
"""Update agent status."""
await self.redis.set(
f"agent:{self.agent_id}:status",
json.dumps({"status": "healthy", "timestamp": datetime.utcnow().isoformat()}),
ex=30 # Expire after 30s if no heartbeat
)
```
---
## Slack Integration
### Event Flow
```
Slack Event (message/mention)
┌─────────────────┐
│ Event Handler │ ← Parses mentions, extracts thread context
└────────┬────────┘
┌─────────────────┐
│ Router │ ← Determines which agent(s) should respond
└────────┬────────┘
┌─────────────────┐
│ Redis Queue │ ← LPUSH to agent inbox (reliable delivery)
└────────┬────────┘
┌─────────────────┐
│ Agent Process │ ← BRPOPLPUSH, Claude SDK processes
└────────┬────────┘
┌─────────────────┐
│ Redis Response │ ← LPUSH to outbox
└────────┬────────┘
┌─────────────────┐
│ Slack Response │ ← Posts reply in thread, may mention others
└─────────────────┘
```
### Message Format
Agents post messages with a consistent format:
```
🤖 *Dev Dana*
> Working on task-789: Implement homepage
>
> Created branch `feature/task-789` and started implementation.
> Will have a PR ready for review in ~2 hours.
>
> @tech_lead heads up, will need your review later
```
---
## Memory System
The memory system provides persistent context across stateless `query()` calls.
Each query injects relevant memory, and agents update their memory file after completing tasks.
### Memory Flow
```
┌──────────────────────────────────────────────────────────────┐
│ Each query() call │
├──────────────────────────────────────────────────────────────┤
│ 1. Load memory.md │
│ 2. Construct prompt: │
│ ┌────────────────────────────────────────────────────┐ │
│ │ ## Your Memory │ │
│ │ {contents of memory.md} │ │
│ │ │ │
│ │ ## Current Task │ │
│ │ {slack message / task description} │ │
│ └────────────────────────────────────────────────────┘ │
│ 3. Execute query() - agent has Read/Write tools │
│ 4. Agent updates /memory/memory.md if needed (via Write) │
│ 5. Return response │
└──────────────────────────────────────────────────────────────┘
```
### Structure
```
/memory/
├── ceo/
│ └── memory.md # CEO's persistent memory
├── product_manager/
│ └── memory.md # PM's persistent memory
├── developer/
│ ├── memory.md # General memory
│ ├── project-website.md # Project-specific (future)
│ └── learnings.md # Technical learnings (future)
└── tech_lead/
└── memory.md
```
### Memory Format (`memory.md`)
```markdown
# Agent Memory - Dev Dana
## Active Context
- Currently working on: Company Website project
- Current branch: feature/task-789
- Pending PR: None
## Project Knowledge
### Company Website
- Tech stack: React, Tailwind, Next.js
- Design system: Uses shadcn/ui components
- PM: Paula, Tech Lead: Terry
## Team Preferences
- Paula prefers detailed task breakdowns
- Terry wants tests for all new features
- Code style: Prefer functional components
## Learnings
- 2025-01-15: This repo uses pnpm, not npm
- 2025-01-14: API endpoints are in /api folder
## Recent Decisions
- Using Next.js App Router (agreed with Terry)
```
### Memory Manager (`src/orchestra/memory/manager.py`)
```python
"""
Memory manager for loading and updating agent memory files.
Agents update their own memory via the Write tool during query execution.
"""
class MemoryManager:
def __init__(self, agent_id: str, memory_path: str = "/memory"):
self.agent_id = agent_id
self.memory_file = f"{memory_path}/memory.md"
async def load(self) -> str:
"""Load memory content for injection into query prompt."""
try:
with open(self.memory_file, "r") as f:
return f.read()
except FileNotFoundError:
# Initialize empty memory structure
return self._initial_memory()
def _initial_memory(self) -> str:
"""Create initial memory structure for new agent."""
return f"""# Agent Memory - {self.agent_id}
## Active Context
- No active tasks
## Project Knowledge
(none yet)
## Team Preferences
(none yet)
## Learnings
(none yet)
"""
async def update_from_response(self, response: str) -> None:
"""
Optional: Parse response for explicit memory updates.
In practice, agents use the Write tool to update memory.md directly.
This method can be used for automated memory extraction if needed.
"""
pass # Agents self-manage memory via Write tool
```
---
## Implementation Phases
### Phase 1: Foundation (MVP)
**Goal:** Single agent responding to Slack messages
1. Set up project structure with pyproject.toml
2. Implement config loading (YAML → Pydantic models)
3. Basic Slack integration (listen to events, post messages)
4. Wrap Claude Agent SDK in Agent class
5. Simple filesystem tool
6. Memory loading/saving
7. Basic Docker container for one agent
**Deliverable:** Can @mention one agent in Slack and get responses
### Phase 2: Multi-Agent
**Goal:** Multiple agents running and communicating
1. Container manager for multiple agents
2. Message routing based on mentions
3. Agent-to-agent communication via Slack
4. Thread context preservation
5. All agents defined and running
**Deliverable:** Can have PM delegate to Developer via Slack
### Phase 3: Tools & Permissions
**Goal:** Full toolset with permission enforcement
1. Tool registry with permission checking
2. Git tool implementation
3. Tasks/project management tool
4. Code execution tool
5. Web search tool
6. Slack tools (create_channel, etc.)
**Deliverable:** Agents can do real work with proper restrictions
### Phase 4: Workflow & Polish
**Goal:** Smooth end-to-end workflows
1. PR creation and review flow
2. Task lifecycle (create → assign → progress → complete)
3. CEO supervision capabilities
4. Better memory management
5. Error handling and recovery
6. MCP server exposure
**Deliverable:** Full workflow from request → implementation → review → merge
### Phase 5: Production Ready
**Goal:** Deployable and maintainable
1. Docker Compose setup
2. Health checks and monitoring
3. Logging and observability
4. Configuration validation
5. Documentation
6. Integration tests
---
## Docker Compose Structure
```yaml
version: '3.8'
services:
# Redis for reliable message queuing between orchestrator and agents
redis:
image: redis:7-alpine
command: redis-server --appendonly yes
volumes:
- redis-data:/data
networks:
- orchestra-net
restart: unless-stopped
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
start_period: 10s
orchestrator:
build:
context: .
dockerfile: Dockerfile.orchestrator
environment:
- SLACK_APP_TOKEN=${SLACK_APP_TOKEN}
- SLACK_BOT_TOKEN=${SLACK_BOT_TOKEN}
- REDIS_URL=redis://redis:6379
volumes:
- ./config:/app/config:ro
networks:
- orchestra-net
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 30s
depends_on:
redis:
condition: service_healthy
agent-ceo:
condition: service_healthy
agent-pm:
condition: service_healthy
agent-dev:
condition: service_healthy
agent-techlead:
condition: service_healthy
agent-designer:
condition: service_healthy
# Full agent containers (SDK, Node.js, CLI)
agent-ceo:
build:
context: .
dockerfile: Dockerfile.agent
environment:
- AGENT_ID=ceo
- AGENT_PORT=8001
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- GITEA_URL=${GITEA_URL}
- GITEA_API_TOKEN=${GITEA_API_TOKEN}
- REDIS_URL=redis://redis:6379
volumes:
- ./config/agents/ceo.yml:/app/config/agent.yml:ro
- ./data/workspaces/ceo:/workspace
- ./data/memory/ceo:/memory
- ./data/projects:/projects
ports:
- "8001:8001"
networks:
- orchestra-net
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8001/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 60s
depends_on:
redis:
condition: service_healthy
agent-pm:
build:
context: .
dockerfile: Dockerfile.agent
environment:
- AGENT_ID=product_manager
- AGENT_PORT=8002
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- GITEA_URL=${GITEA_URL}
- GITEA_API_TOKEN=${GITEA_API_TOKEN}
- REDIS_URL=redis://redis:6379
volumes:
- ./config/agents/product_manager.yml:/app/config/agent.yml:ro
- ./data/workspaces/product_manager:/workspace
- ./data/memory/product_manager:/memory
- ./data/projects:/projects
ports:
- "8002:8002"
networks:
- orchestra-net
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8002/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 60s
depends_on:
redis:
condition: service_healthy
agent-dev:
build:
context: .
dockerfile: Dockerfile.agent
environment:
- AGENT_ID=developer
- AGENT_PORT=8003
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- GITEA_URL=${GITEA_URL}
- GITEA_API_TOKEN=${GITEA_API_TOKEN}
- REDIS_URL=redis://redis:6379
volumes:
- ./config/agents/developer.yml:/app/config/agent.yml:ro
- ./data/workspaces/developer:/workspace
- ./data/memory/developer:/memory
ports:
- "8003:8003"
networks:
- orchestra-net
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8003/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 60s
depends_on:
redis:
condition: service_healthy
agent-techlead:
build:
context: .
dockerfile: Dockerfile.agent
environment:
- AGENT_ID=tech_lead
- AGENT_PORT=8004
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- GITEA_URL=${GITEA_URL}
- GITEA_API_TOKEN=${GITEA_API_TOKEN}
- REDIS_URL=redis://redis:6379
volumes:
- ./config/agents/tech_lead.yml:/app/config/agent.yml:ro
- ./data/workspaces/tech_lead:/workspace
- ./data/memory/tech_lead:/memory
ports:
- "8004:8004"
networks:
- orchestra-net
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8004/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 60s
depends_on:
redis:
condition: service_healthy
agent-designer:
build:
context: .
dockerfile: Dockerfile.agent
environment:
- AGENT_ID=designer
- AGENT_PORT=8005
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- GITEA_URL=${GITEA_URL}
- GITEA_API_TOKEN=${GITEA_API_TOKEN}
- REDIS_URL=redis://redis:6379
volumes:
- ./config/agents/designer.yml:/app/config/agent.yml:ro
- ./data/workspaces/designer:/workspace
- ./data/memory/designer:/memory
ports:
- "8005:8005"
networks:
- orchestra-net
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8005/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 60s
depends_on:
redis:
condition: service_healthy
volumes:
orchestra-data:
redis-data:
networks:
orchestra-net:
driver: bridge
```
### Dockerfile.orchestrator
```dockerfile
FROM python:3.12-slim
# Lightweight orchestrator - no Node.js/SDK needed
RUN apt-get update && apt-get install -y \
curl \
&& rm -rf /var/lib/apt/lists/*
# Install Python dependencies
WORKDIR /app
COPY pyproject.toml .
RUN pip install uv && uv sync
COPY src/orchestra/ ./src/orchestra/
COPY config/ ./config/
CMD ["uv", "run", "python", "-m", "orchestra.main"]
```
### Dockerfile.agent
```dockerfile
FROM python:3.12-slim
# Full agent container with Claude SDK and tools
RUN apt-get update && apt-get install -y \
curl \
git \
jq \
&& curl -fsSL https://deb.nodesource.com/setup_20.x | bash - \
&& apt-get install -y nodejs \
&& rm -rf /var/lib/apt/lists/*
# Install Claude Code CLI
RUN npm install -g @anthropic-ai/claude-code
# Install Python dependencies
WORKDIR /app
COPY pyproject.toml .
RUN pip install uv && uv sync
# Copy agent service code
COPY src/orchestra/agent/ ./src/orchestra/agent/
COPY src/orchestra/tools/ ./src/orchestra/tools/
# Create workspace structure
RUN mkdir -p /workspace /memory
# Expose HTTP API port
EXPOSE 8001
# Run agent service
CMD ["uv", "run", "python", "-m", "orchestra.agent.main"]
```
---
## Dependencies
```toml
[project]
name = "agent-orchestra"
version = "0.1.0"
requires-python = ">=3.10"
dependencies = [
"claude-agent-sdk>=0.1.0", # Official Claude Agent SDK
"slack-sdk>=3.27.0",
"slack-bolt>=1.18.0",
"pydantic>=2.5.0",
"pydantic-settings>=2.1.0",
"pyyaml>=6.0.0",
"docker>=7.0.0",
"aiohttp>=3.9.0",
"httpx>=0.26.0",
"anyio>=4.0.0",
"fastapi>=0.109.0", # HTTP API for agent containers
"uvicorn>=0.27.0", # ASGI server for FastAPI
"redis>=5.0.0", # Async Redis client for message queuing
]
[project.optional-dependencies]
dev = [
"pytest>=8.0.0",
"pytest-asyncio>=0.23.0",
"pytest-mock>=3.12.0",
"ruff>=0.1.0",
"fakeredis>=2.20.0", # Redis mock for testing
]
```
**Prerequisites:**
- Python 3.10+
- Node.js (required by Claude Agent SDK in agent containers)
- Claude Code CLI 2.0.0+: `npm install -g @anthropic-ai/claude-code` (in agent containers)
---