This commit is contained in:
Pierre Wessman 2025-11-27 11:37:26 +01:00
parent 03484cf5b7
commit 59ac78ca39
2 changed files with 174 additions and 142 deletions

View File

@ -1,71 +0,0 @@
# Agent Orchestra
Multi-agent system with Claude agents communicating via Slack.
Each agent runs inside its own Docker container with the Claude Agent SDK.
## Quick Start
```bash
# Start all services
docker-compose up -d
# Or for local development:
uv sync
cp .env.example .env # Fill in ANTHROPIC_API_KEY, Slack tokens, Gitea tokens
uv run python -m orchestra.main
```
## Architecture
- **Orchestrator**: Lightweight service that routes Slack messages to agents via HTTP
- **Agent Containers**: Each runs Claude SDK + HTTP API for receiving messages
- **Tools**: Built-in (Read/Write/Bash run in container) + custom MCP tools
- **Permissions**: PreToolUse hooks enforce agent-specific restrictions
- **Communication**: Orchestrator → HTTP → Agent containers
```
Orchestrator (Slack listener) --HTTP--> Agent Containers (SDK + HTTP API)
```
## Claude Agent SDK Usage
Each agent container runs `ClaudeSDKClient` for persistent conversations.
The orchestrator communicates with agents via HTTP API.
```python
# In agent container
from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions
async with ClaudeSDKClient(options) as client:
await client.query(message)
async for msg in client.receive_response():
process(msg)
```
Custom tools use `@tool` decorator and `create_sdk_mcp_server()`.
## Key Files
- `config/orchestra.yml` - Global config (Slack, agent endpoints)
- `config/agents/*.yml` - Agent definitions (tools, permissions, prompts)
- `src/orchestra/core/orchestrator.py` - Slack listener, HTTP routing
- `src/orchestra/agent/agent.py` - Agent service with SDK + HTTP API
- `src/orchestra/tools/` - Custom MCP tool implementations
## Agent Permissions
Each agent has:
- `allowed_tools` / `disallowed_tools` - Tool access
- `permissions.filesystem` - Path restrictions
- `permissions.git` - Branch push/merge restrictions
Enforced via PreToolUse hooks that check before execution.
## Testing
```bash
uv run pytest
uv run pytest tests/test_agent.py -v
```
## Common Tasks
- **Add agent**: Create YAML in config/agents/, add to docker-compose
- **Add tool**: Use @tool decorator in src/orchestra/tools/, register in server
- **Debug agent**: Check container logs: `docker logs agent-dev`

245
PLAN.md
View File

@ -8,11 +8,16 @@ A multi-agent system where Claude-powered agents collaborate via Slack, each run
### Key SDK Concepts
1. **Two APIs**:
1. **Two APIs**:
- `query()` - One-shot, creates new session each time
- `ClaudeSDKClient` - Persistent session, maintains conversation history
We use `ClaudeSDKClient` because agents need to remember context across messages.
We use `query()` with explicit memory injection because:
- Full control over what context goes into each request
- Memory persists in files (survives restarts)
- Inspectable/debuggable - can read memory.md to see agent state
- More cost-effective - only sends relevant context, not full history
- Selective memory - can summarize, prune, or prioritize what's included
2. **Built-in Tools**: The SDK includes Claude Code's tools:
- `Read`, `Write`, `Edit` - File operations
@ -84,16 +89,16 @@ We choose **Option B** because:
- Cleaner permission model: container boundaries enforce filesystem isolation
- More realistic: tools like Bash run in the agent's actual environment
### Setting Up SDK with Agent Options
### Setting Up SDK with query() and Memory Injection
```python
from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions, tool, create_sdk_mcp_server
from claude_agent_sdk import query, ClaudeAgentOptions, tool, create_sdk_mcp_server
# Create custom tools
@tool("slack_send", "Send Slack message", {"channel": str, "text": str})
async def slack_send(args):
await slack_client.chat_postMessage(
channel=args["channel"],
channel=args["channel"],
text=args["text"]
)
return {"content": [{"type": "text", "text": "Sent"}]}
@ -104,41 +109,51 @@ tools_server = create_sdk_mcp_server(
tools=[slack_send]
)
# Configure agent
# Configure agent options
options = ClaudeAgentOptions(
system_prompt=agent_config.system_prompt,
model="sonnet",
# Built-in tools
allowed_tools=["Read", "Write", "Edit", "Bash", "Glob", "Grep"],
disallowed_tools=["Task"], # Don't allow spawning subagents
# Custom tools via MCP
mcp_servers={"tools": tools_server},
# Permission mode
permission_mode="acceptEdits",
# Working directory (mapped to container volume)
cwd="/data/workspaces/developer",
# Hooks for fine-grained control
hooks={
"PreToolUse": [HookMatcher(hooks=[permission_checker])]
}
)
# Create persistent client
async with ClaudeSDKClient(options) as client:
# Send message (client remembers context)
await client.query("Create a hello.py file")
async for msg in client.receive_response():
handle_message(msg)
# Follow-up (same session, remembers previous)
await client.query("Add a main function")
async for msg in client.receive_response():
handle_message(msg)
# Query pattern: inject memory + task, get response, update memory
async def process_task(task: str, memory_path: str) -> str:
# 1. Load relevant memory
memory_content = read_memory(memory_path)
# 2. Construct prompt with memory context
prompt = f"""## Your Memory
{memory_content}
## Current Task
{task}
After completing the task, note any important information to remember."""
# 3. One-shot query (no session state)
response = await query(prompt, options=options)
# 4. Extract and update memory from response
update_memory(memory_path, response)
return response
```
---
@ -180,16 +195,16 @@ async with ClaudeSDKClient(options) as client:
│ │ - Claude CLI│ │ - Claude CLI│ │ - Claude CLI│ │
│ │ - Node.js │ │ - Node.js │ │ - Node.js │ │
│ │ - memory/ │ │ - memory/ │ │ - memory/ │ │
│ │ - git clone │ │ - git clone │ │ - git clone │ │
│ │ - workspace/│ │ - workspace/│ │ - workspace/│ │
│ │ - HTTP API │ │ - HTTP API │ │ - HTTP API │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ └────────────────────┼────────────────────┘ │
│ │ git clone/push │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Shared Volume │ │
│ │ - /repos/ │ │
│ │ - /projects/ │ │
│ │ Gitea Server │ │
│ │ (external) │ │
│ └─────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
```
@ -248,11 +263,11 @@ agent-orchestra/
│ └── manager.py # Memory file read/write
├── data/ # Mounted volumes
│ ├── workspaces/ # Per-agent working directories
│ ├── workspaces/ # Per-agent working directories (each clones from Gitea)
│ │ ├── ceo/
│ │ ├── developer/
│ │ ├── product_manager/
│ │ └── tech_lead/
│ ├── repos/ # Shared git repository clone
│ ├── projects/ # Project/task JSON files
│ │ └── boards.json
│ └── memory/ # Agent memory folders
@ -449,7 +464,6 @@ disallowed_tools:
permissions:
filesystem:
allowed_paths:
- "/repos/**"
- "/workspace/**"
git:
can_push_to: [] # Tech lead reviews, doesn't push
@ -488,8 +502,8 @@ system_prompt: |
3. Implement the solution with tests
4. Create a PR and notify tech_lead for review
You have access to the shared repository at /repos/main.
Clone repositories from Gitea into /workspace.
Use the Slack tools to communicate with your team.
Use the task tools to update your task status.
@ -521,10 +535,9 @@ disallowed_tools:
permissions:
filesystem:
allowed_paths:
- "/repos/**"
- "/workspace/**"
denied_paths:
- "/repos/.git/config"
- "/workspace/**/.git/config"
- "**/.env"
- "**/secrets/**"
@ -610,10 +623,9 @@ disallowed_tools:
permissions:
filesystem:
allowed_paths:
- "/repos/**"
- "/workspace/**"
- "/projects/**"
git:
can_merge_to:
- "dev"
@ -669,32 +681,29 @@ The agent exposes an HTTP API for the orchestrator to send messages.
"""
Agent service running inside container:
- Exposes HTTP API for receiving messages from orchestrator
- Uses ClaudeSDKClient for persistent conversation
- Uses query() with explicit memory injection (stateless)
- Custom tools via SDK MCP servers (in-process)
- Tools operate directly on container filesystem
- Memory management local to container
- Memory persisted to files, injected into each query
"""
from claude_agent_sdk import (
ClaudeSDKClient,
query,
ClaudeAgentOptions,
tool,
create_sdk_mcp_server,
AssistantMessage,
TextBlock,
ToolUseBlock,
HookMatcher
)
class AgentService:
"""HTTP service that wraps the Claude SDK client."""
"""HTTP service that uses stateless query() with memory injection."""
id: str
config: AgentConfig
client: ClaudeSDKClient # Persistent session
options: ClaudeAgentOptions # Reused for each query
memory: MemoryManager
app: FastAPI # HTTP server for orchestrator communication
async def start(self) -> None:
"""Initialize SDK client and start HTTP server."""
"""Initialize SDK options and start HTTP server."""
# Create in-process MCP server for custom tools
tools_server = create_sdk_mcp_server(
name=f"{self.id}-tools",
@ -702,7 +711,7 @@ class AgentService:
tools=self._build_tools()
)
options = ClaudeAgentOptions(
self.options = ClaudeAgentOptions(
system_prompt=self.config.system_prompt,
model=self.config.model, # e.g., "sonnet", "opus"
allowed_tools=self.config.allowed_tools,
@ -721,9 +730,6 @@ class AgentService:
hooks=self._build_hooks()
)
self.client = ClaudeSDKClient(options)
await self.client.connect()
# Start HTTP server for orchestrator
await self._start_http_server()
@ -741,39 +747,68 @@ class AgentService:
return {"status": "ok", "agent_id": self.id}
async def process_message(self, message: str, context: dict) -> str:
"""Send message and collect response. Client maintains conversation history."""
await self.client.query(message)
"""
Stateless query with memory injection.
response_text = ""
async for msg in self.client.receive_response():
if isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
response_text += block.text
Flow:
1. Load memory from file
2. Construct prompt with memory + message
3. Execute query()
4. Update memory file with new learnings
5. Return response
"""
# Load current memory state
memory_content = await self.memory.load()
return response_text
# Construct prompt with injected memory
prompt = self._build_prompt(message, memory_content, context)
# Stateless query - no session maintained
response = await query(prompt, options=self.options)
# Extract and persist any memory updates
await self.memory.update_from_response(response)
return self._extract_response_text(response)
def _build_prompt(self, message: str, memory: str, context: dict) -> str:
"""Construct prompt with memory context."""
thread_context = ""
if context.get("thread_messages"):
thread_context = "\n## Recent Thread Context\n"
for msg in context["thread_messages"][-5:]: # Last 5 messages
thread_context += f"- {msg['author']}: {msg['text']}\n"
return f"""## Your Memory
{memory}
{thread_context}
## Current Message
{message}
Complete this task. If you learn anything important (team preferences, project
details, decisions made), update your memory file at /memory/memory.md."""
async def stop(self) -> None:
"""Disconnect the client and stop HTTP server."""
await self.client.disconnect()
"""Stop HTTP server."""
pass # No client to disconnect with stateless query()
def _build_tools(self) -> list:
"""Build custom tools based on agent config permissions."""
tools = []
if "slack" in self.config.tools:
tools.extend(self._create_slack_tools())
if "tasks" in self.config.tools:
tools.extend(self._create_task_tools())
return tools
def _build_hooks(self) -> dict:
"""Build permission hooks to enforce tool restrictions."""
async def check_permissions(input_data, tool_use_id, context):
tool_name = input_data.get("tool_name")
tool_input = input_data.get("tool_input", {})
# Check git branch restrictions
if tool_name == "Bash" and "git push" in tool_input.get("command", ""):
branch = self._extract_branch(tool_input["command"])
@ -786,7 +821,7 @@ class AgentService:
}
}
return {}
return {
"PreToolUse": [HookMatcher(hooks=[check_permissions])]
}
@ -1123,6 +1158,30 @@ Agents post messages with a consistent format:
## Memory System
The memory system provides persistent context across stateless `query()` calls.
Each query injects relevant memory, and agents update their memory file after completing tasks.
### Memory Flow
```
┌──────────────────────────────────────────────────────────────┐
│ Each query() call │
├──────────────────────────────────────────────────────────────┤
│ 1. Load memory.md │
│ 2. Construct prompt: │
│ ┌────────────────────────────────────────────────────┐ │
│ │ ## Your Memory │ │
│ │ {contents of memory.md} │ │
│ │ │ │
│ │ ## Current Task │ │
│ │ {slack message / task description} │ │
│ └────────────────────────────────────────────────────┘ │
│ 3. Execute query() - agent has Read/Write tools │
│ 4. Agent updates /memory/memory.md if needed (via Write) │
│ 5. Return response │
└──────────────────────────────────────────────────────────────┘
```
### Structure
```
@ -1168,6 +1227,54 @@ Agents post messages with a consistent format:
- Using Next.js App Router (agreed with Terry)
```
### Memory Manager (`src/orchestra/memory/manager.py`)
```python
"""
Memory manager for loading and updating agent memory files.
Agents update their own memory via the Write tool during query execution.
"""
class MemoryManager:
def __init__(self, agent_id: str, memory_path: str = "/memory"):
self.agent_id = agent_id
self.memory_file = f"{memory_path}/memory.md"
async def load(self) -> str:
"""Load memory content for injection into query prompt."""
try:
with open(self.memory_file, "r") as f:
return f.read()
except FileNotFoundError:
# Initialize empty memory structure
return self._initial_memory()
def _initial_memory(self) -> str:
"""Create initial memory structure for new agent."""
return f"""# Agent Memory - {self.agent_id}
## Active Context
- No active tasks
## Project Knowledge
(none yet)
## Team Preferences
(none yet)
## Learnings
(none yet)
"""
async def update_from_response(self, response: str) -> None:
"""
Optional: Parse response for explicit memory updates.
In practice, agents use the Write tool to update memory.md directly.
This method can be used for automated memory extraction if needed.
"""
pass # Agents self-manage memory via Write tool
```
---
## Implementation Phases
@ -1269,7 +1376,6 @@ services:
volumes:
- ./config/agents/ceo.yml:/app/config/agent.yml:ro
- ./data/workspaces/ceo:/workspace
- ./data/repos:/repos
- ./data/memory/ceo:/memory
- ./data/projects:/projects
ports:
@ -1290,7 +1396,6 @@ services:
volumes:
- ./config/agents/product_manager.yml:/app/config/agent.yml:ro
- ./data/workspaces/product_manager:/workspace
- ./data/repos:/repos
- ./data/memory/product_manager:/memory
- ./data/projects:/projects
ports:
@ -1311,7 +1416,6 @@ services:
volumes:
- ./config/agents/developer.yml:/app/config/agent.yml:ro
- ./data/workspaces/developer:/workspace
- ./data/repos:/repos
- ./data/memory/developer:/memory
ports:
- "8003:8003"
@ -1331,7 +1435,6 @@ services:
volumes:
- ./config/agents/tech_lead.yml:/app/config/agent.yml:ro
- ./data/workspaces/tech_lead:/workspace
- ./data/repos:/repos:ro # Read-only for tech lead
- ./data/memory/tech_lead:/memory
ports:
- "8004:8004"
@ -1394,7 +1497,7 @@ COPY src/orchestra/agent/ ./src/orchestra/agent/
COPY src/orchestra/tools/ ./src/orchestra/tools/
# Create workspace structure
RUN mkdir -p /workspace /repos /memory
RUN mkdir -p /workspace /memory
# Expose HTTP API port
EXPOSE 8001