Compare commits

..

No commits in common. "896ff001eb0f5ae89f10bf83c1163e5a3a32a829" and "03484cf5b74c08edb1f3032d376d9b415e99598b" have entirely different histories.

2 changed files with 148 additions and 557 deletions

71
CLAUDE.md Normal file
View File

@ -0,0 +1,71 @@
# Agent Orchestra
Multi-agent system with Claude agents communicating via Slack.
Each agent runs inside its own Docker container with the Claude Agent SDK.
## Quick Start
```bash
# Start all services
docker-compose up -d
# Or for local development:
uv sync
cp .env.example .env # Fill in ANTHROPIC_API_KEY, Slack tokens, Gitea tokens
uv run python -m orchestra.main
```
## Architecture
- **Orchestrator**: Lightweight service that routes Slack messages to agents via HTTP
- **Agent Containers**: Each runs Claude SDK + HTTP API for receiving messages
- **Tools**: Built-in (Read/Write/Bash run in container) + custom MCP tools
- **Permissions**: PreToolUse hooks enforce agent-specific restrictions
- **Communication**: Orchestrator → HTTP → Agent containers
```
Orchestrator (Slack listener) --HTTP--> Agent Containers (SDK + HTTP API)
```
## Claude Agent SDK Usage
Each agent container runs `ClaudeSDKClient` for persistent conversations.
The orchestrator communicates with agents via HTTP API.
```python
# In agent container
from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions
async with ClaudeSDKClient(options) as client:
await client.query(message)
async for msg in client.receive_response():
process(msg)
```
Custom tools use `@tool` decorator and `create_sdk_mcp_server()`.
## Key Files
- `config/orchestra.yml` - Global config (Slack, agent endpoints)
- `config/agents/*.yml` - Agent definitions (tools, permissions, prompts)
- `src/orchestra/core/orchestrator.py` - Slack listener, HTTP routing
- `src/orchestra/agent/agent.py` - Agent service with SDK + HTTP API
- `src/orchestra/tools/` - Custom MCP tool implementations
## Agent Permissions
Each agent has:
- `allowed_tools` / `disallowed_tools` - Tool access
- `permissions.filesystem` - Path restrictions
- `permissions.git` - Branch push/merge restrictions
Enforced via PreToolUse hooks that check before execution.
## Testing
```bash
uv run pytest
uv run pytest tests/test_agent.py -v
```
## Common Tasks
- **Add agent**: Create YAML in config/agents/, add to docker-compose
- **Add tool**: Use @tool decorator in src/orchestra/tools/, register in server
- **Debug agent**: Check container logs: `docker logs agent-dev`

634
PLAN.md
View File

@ -8,16 +8,11 @@ A multi-agent system where Claude-powered agents collaborate via Slack, each run
### Key SDK Concepts ### Key SDK Concepts
1. **Two APIs**: 1. **Two APIs**:
- `query()` - One-shot, creates new session each time - `query()` - One-shot, creates new session each time
- `ClaudeSDKClient` - Persistent session, maintains conversation history - `ClaudeSDKClient` - Persistent session, maintains conversation history
We use `query()` with explicit memory injection because: We use `ClaudeSDKClient` because agents need to remember context across messages.
- Full control over what context goes into each request
- Memory persists in files (survives restarts)
- Inspectable/debuggable - can read memory.md to see agent state
- More cost-effective - only sends relevant context, not full history
- Selective memory - can summarize, prune, or prioritize what's included
2. **Built-in Tools**: The SDK includes Claude Code's tools: 2. **Built-in Tools**: The SDK includes Claude Code's tools:
- `Read`, `Write`, `Edit` - File operations - `Read`, `Write`, `Edit` - File operations
@ -89,16 +84,16 @@ We choose **Option B** because:
- Cleaner permission model: container boundaries enforce filesystem isolation - Cleaner permission model: container boundaries enforce filesystem isolation
- More realistic: tools like Bash run in the agent's actual environment - More realistic: tools like Bash run in the agent's actual environment
### Setting Up SDK with query() and Memory Injection ### Setting Up SDK with Agent Options
```python ```python
from claude_agent_sdk import query, ClaudeAgentOptions, tool, create_sdk_mcp_server from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions, tool, create_sdk_mcp_server
# Create custom tools # Create custom tools
@tool("slack_send", "Send Slack message", {"channel": str, "text": str}) @tool("slack_send", "Send Slack message", {"channel": str, "text": str})
async def slack_send(args): async def slack_send(args):
await slack_client.chat_postMessage( await slack_client.chat_postMessage(
channel=args["channel"], channel=args["channel"],
text=args["text"] text=args["text"]
) )
return {"content": [{"type": "text", "text": "Sent"}]} return {"content": [{"type": "text", "text": "Sent"}]}
@ -109,51 +104,41 @@ tools_server = create_sdk_mcp_server(
tools=[slack_send] tools=[slack_send]
) )
# Configure agent options # Configure agent
options = ClaudeAgentOptions( options = ClaudeAgentOptions(
system_prompt=agent_config.system_prompt, system_prompt=agent_config.system_prompt,
model="sonnet", model="sonnet",
# Built-in tools # Built-in tools
allowed_tools=["Read", "Write", "Edit", "Bash", "Glob", "Grep"], allowed_tools=["Read", "Write", "Edit", "Bash", "Glob", "Grep"],
disallowed_tools=["Task"], # Don't allow spawning subagents disallowed_tools=["Task"], # Don't allow spawning subagents
# Custom tools via MCP # Custom tools via MCP
mcp_servers={"tools": tools_server}, mcp_servers={"tools": tools_server},
# Permission mode # Permission mode
permission_mode="acceptEdits", permission_mode="acceptEdits",
# Working directory (mapped to container volume) # Working directory (mapped to container volume)
cwd="/data/workspaces/developer", cwd="/data/workspaces/developer",
# Hooks for fine-grained control # Hooks for fine-grained control
hooks={ hooks={
"PreToolUse": [HookMatcher(hooks=[permission_checker])] "PreToolUse": [HookMatcher(hooks=[permission_checker])]
} }
) )
# Query pattern: inject memory + task, get response, update memory # Create persistent client
async def process_task(task: str, memory_path: str) -> str: async with ClaudeSDKClient(options) as client:
# 1. Load relevant memory # Send message (client remembers context)
memory_content = read_memory(memory_path) await client.query("Create a hello.py file")
async for msg in client.receive_response():
# 2. Construct prompt with memory context handle_message(msg)
prompt = f"""## Your Memory
{memory_content} # Follow-up (same session, remembers previous)
await client.query("Add a main function")
## Current Task async for msg in client.receive_response():
{task} handle_message(msg)
After completing the task, note any important information to remember."""
# 3. One-shot query (no session state)
response = await query(prompt, options=options)
# 4. Extract and update memory from response
update_memory(memory_path, response)
return response
``` ```
--- ---
@ -195,16 +180,16 @@ After completing the task, note any important information to remember."""
│ │ - Claude CLI│ │ - Claude CLI│ │ - Claude CLI│ │ │ │ - Claude CLI│ │ - Claude CLI│ │ - Claude CLI│ │
│ │ - Node.js │ │ - Node.js │ │ - Node.js │ │ │ │ - Node.js │ │ - Node.js │ │ - Node.js │ │
│ │ - memory/ │ │ - memory/ │ │ - memory/ │ │ │ │ - memory/ │ │ - memory/ │ │ - memory/ │ │
│ │ - workspace/│ │ - workspace/│ │ - workspace/│ │ │ │ - git clone │ │ - git clone │ │ - git clone │ │
│ │ - HTTP API │ │ - HTTP API │ │ - HTTP API │ │ │ │ - HTTP API │ │ - HTTP API │ │ - HTTP API │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │ │ │ │ │ │
│ └────────────────────┼────────────────────┘ │ │ └────────────────────┼────────────────────┘ │
│ │ git clone/push │
│ ▼ │ │ ▼ │
│ ┌─────────────────┐ │ │ ┌─────────────────┐ │
│ │ Gitea Server │ │ │ │ Shared Volume │ │
│ │ (external) │ │ │ │ - /repos/ │ │
│ │ - /projects/ │ │
│ └─────────────────┘ │ │ └─────────────────┘ │
└─────────────────────────────────────────────────────────────────┘ └─────────────────────────────────────────────────────────────────┘
``` ```
@ -250,12 +235,6 @@ agent-orchestra/
│ │ ├── events.py # Event handlers, routing │ │ ├── events.py # Event handlers, routing
│ │ └── formatter.py # Message formatting │ │ └── formatter.py # Message formatting
│ │ │ │
│ ├── queue/
│ │ ├── __init__.py
│ │ ├── redis_client.py # AgentQueue class for message handling
│ │ ├── publisher.py # Orchestrator-side message publishing
│ │ └── consumer.py # Agent-side message consumption
│ │
│ ├── tools/ │ ├── tools/
│ │ ├── __init__.py │ │ ├── __init__.py
│ │ ├── server.py # create_sdk_mcp_server setup │ │ ├── server.py # create_sdk_mcp_server setup
@ -269,11 +248,11 @@ agent-orchestra/
│ └── manager.py # Memory file read/write │ └── manager.py # Memory file read/write
├── data/ # Mounted volumes ├── data/ # Mounted volumes
│ ├── workspaces/ # Per-agent working directories (each clones from Gitea) │ ├── workspaces/ # Per-agent working directories
│ │ ├── ceo/
│ │ ├── developer/ │ │ ├── developer/
│ │ ├── product_manager/ │ │ ├── product_manager/
│ │ └── tech_lead/ │ │ └── tech_lead/
│ ├── repos/ # Shared git repository clone
│ ├── projects/ # Project/task JSON files │ ├── projects/ # Project/task JSON files
│ │ └── boards.json │ │ └── boards.json
│ └── memory/ # Agent memory folders │ └── memory/ # Agent memory folders
@ -470,6 +449,7 @@ disallowed_tools:
permissions: permissions:
filesystem: filesystem:
allowed_paths: allowed_paths:
- "/repos/**"
- "/workspace/**" - "/workspace/**"
git: git:
can_push_to: [] # Tech lead reviews, doesn't push can_push_to: [] # Tech lead reviews, doesn't push
@ -508,8 +488,8 @@ system_prompt: |
3. Implement the solution with tests 3. Implement the solution with tests
4. Create a PR and notify tech_lead for review 4. Create a PR and notify tech_lead for review
Clone repositories from Gitea into /workspace. You have access to the shared repository at /repos/main.
Use the Slack tools to communicate with your team. Use the Slack tools to communicate with your team.
Use the task tools to update your task status. Use the task tools to update your task status.
@ -541,9 +521,10 @@ disallowed_tools:
permissions: permissions:
filesystem: filesystem:
allowed_paths: allowed_paths:
- "/repos/**"
- "/workspace/**" - "/workspace/**"
denied_paths: denied_paths:
- "/workspace/**/.git/config" - "/repos/.git/config"
- "**/.env" - "**/.env"
- "**/secrets/**" - "**/secrets/**"
@ -573,105 +554,6 @@ container:
- "PYTHONPATH=/workspace" - "PYTHONPATH=/workspace"
``` ```
### Designer Config (`config/agents/designer.yml`)
```yaml
id: designer
name: "Designer Dani"
slack_user_id: "${SLACK_DESIGNER_USER_ID}"
model: "sonnet"
max_turns: 50
system_prompt: |
You are Designer Dani, responsible for UI/UX design and frontend aesthetics.
Your responsibilities:
- Create and refine UI designs based on requirements
- Implement CSS/styling changes
- Ensure consistent design system usage
- Review frontend PRs for design consistency
- Provide design feedback and mockup descriptions
When working on design tasks:
1. Understand the user experience goals
2. Reference the existing design system (shadcn/ui, Tailwind)
3. Implement pixel-perfect styling
4. Ensure responsive design across breakpoints
5. Document design decisions in comments
You work closely with developers on frontend implementation.
allowed_tools:
- Read
- Write
- Edit
- Bash
- Glob
- Grep
- WebSearch
- WebFetch
# Slack tools
- mcp__agent__slack_send_message
- mcp__agent__slack_reply_thread
# Task tools
- mcp__agent__tasks_view
- mcp__agent__tasks_update_status
- mcp__agent__tasks_add_comment
# Git tools - can create PRs for design changes
- mcp__agent__gitea_create_pr
- mcp__agent__gitea_list_prs
- mcp__agent__gitea_add_comment
disallowed_tools:
- mcp__agent__slack_create_channel
- mcp__agent__slack_delete_channel
- mcp__agent__tasks_create_project
- mcp__agent__tasks_create_board
- mcp__agent__gitea_merge_pr
permissions:
filesystem:
allowed_paths:
- "/workspace/**/*.css"
- "/workspace/**/*.scss"
- "/workspace/**/*.tsx"
- "/workspace/**/*.jsx"
- "/workspace/**/components/**"
- "/workspace/**/styles/**"
- "/workspace/**/public/**"
denied_paths:
- "/workspace/**/.git/config"
- "**/.env"
- "**/secrets/**"
- "/workspace/**/api/**"
- "/workspace/**/server/**"
git:
branch_pattern: "design/*"
can_push_to:
- "design/*"
cannot_push_to:
- "main"
- "dev"
- "feature/*"
memory:
path: "/memory/designer"
files:
- "memory.md"
can_mention:
- developer
- product_manager
- tech_lead
container:
resources:
memory: "2g"
cpus: "1.0"
```
### Product Manager Config (`config/agents/product_manager.yml`) ### Product Manager Config (`config/agents/product_manager.yml`)
```yaml ```yaml
@ -728,9 +610,10 @@ disallowed_tools:
permissions: permissions:
filesystem: filesystem:
allowed_paths: allowed_paths:
- "/repos/**"
- "/workspace/**" - "/workspace/**"
- "/projects/**" - "/projects/**"
git: git:
can_merge_to: can_merge_to:
- "dev" - "dev"
@ -786,29 +669,32 @@ The agent exposes an HTTP API for the orchestrator to send messages.
""" """
Agent service running inside container: Agent service running inside container:
- Exposes HTTP API for receiving messages from orchestrator - Exposes HTTP API for receiving messages from orchestrator
- Uses query() with explicit memory injection (stateless) - Uses ClaudeSDKClient for persistent conversation
- Custom tools via SDK MCP servers (in-process) - Custom tools via SDK MCP servers (in-process)
- Tools operate directly on container filesystem - Tools operate directly on container filesystem
- Memory persisted to files, injected into each query - Memory management local to container
""" """
from claude_agent_sdk import ( from claude_agent_sdk import (
query, ClaudeSDKClient,
ClaudeAgentOptions, ClaudeAgentOptions,
tool, tool,
create_sdk_mcp_server, create_sdk_mcp_server,
AssistantMessage,
TextBlock,
ToolUseBlock,
HookMatcher HookMatcher
) )
class AgentService: class AgentService:
"""HTTP service that uses stateless query() with memory injection.""" """HTTP service that wraps the Claude SDK client."""
id: str id: str
config: AgentConfig config: AgentConfig
options: ClaudeAgentOptions # Reused for each query client: ClaudeSDKClient # Persistent session
memory: MemoryManager memory: MemoryManager
app: FastAPI # HTTP server for orchestrator communication app: FastAPI # HTTP server for orchestrator communication
async def start(self) -> None: async def start(self) -> None:
"""Initialize SDK options and start HTTP server.""" """Initialize SDK client and start HTTP server."""
# Create in-process MCP server for custom tools # Create in-process MCP server for custom tools
tools_server = create_sdk_mcp_server( tools_server = create_sdk_mcp_server(
name=f"{self.id}-tools", name=f"{self.id}-tools",
@ -816,7 +702,7 @@ class AgentService:
tools=self._build_tools() tools=self._build_tools()
) )
self.options = ClaudeAgentOptions( options = ClaudeAgentOptions(
system_prompt=self.config.system_prompt, system_prompt=self.config.system_prompt,
model=self.config.model, # e.g., "sonnet", "opus" model=self.config.model, # e.g., "sonnet", "opus"
allowed_tools=self.config.allowed_tools, allowed_tools=self.config.allowed_tools,
@ -835,6 +721,9 @@ class AgentService:
hooks=self._build_hooks() hooks=self._build_hooks()
) )
self.client = ClaudeSDKClient(options)
await self.client.connect()
# Start HTTP server for orchestrator # Start HTTP server for orchestrator
await self._start_http_server() await self._start_http_server()
@ -852,68 +741,39 @@ class AgentService:
return {"status": "ok", "agent_id": self.id} return {"status": "ok", "agent_id": self.id}
async def process_message(self, message: str, context: dict) -> str: async def process_message(self, message: str, context: dict) -> str:
""" """Send message and collect response. Client maintains conversation history."""
Stateless query with memory injection. await self.client.query(message)
Flow: response_text = ""
1. Load memory from file async for msg in self.client.receive_response():
2. Construct prompt with memory + message if isinstance(msg, AssistantMessage):
3. Execute query() for block in msg.content:
4. Update memory file with new learnings if isinstance(block, TextBlock):
5. Return response response_text += block.text
"""
# Load current memory state
memory_content = await self.memory.load()
# Construct prompt with injected memory return response_text
prompt = self._build_prompt(message, memory_content, context)
# Stateless query - no session maintained
response = await query(prompt, options=self.options)
# Extract and persist any memory updates
await self.memory.update_from_response(response)
return self._extract_response_text(response)
def _build_prompt(self, message: str, memory: str, context: dict) -> str:
"""Construct prompt with memory context."""
thread_context = ""
if context.get("thread_messages"):
thread_context = "\n## Recent Thread Context\n"
for msg in context["thread_messages"][-5:]: # Last 5 messages
thread_context += f"- {msg['author']}: {msg['text']}\n"
return f"""## Your Memory
{memory}
{thread_context}
## Current Message
{message}
Complete this task. If you learn anything important (team preferences, project
details, decisions made), update your memory file at /memory/memory.md."""
async def stop(self) -> None: async def stop(self) -> None:
"""Stop HTTP server.""" """Disconnect the client and stop HTTP server."""
pass # No client to disconnect with stateless query() await self.client.disconnect()
def _build_tools(self) -> list: def _build_tools(self) -> list:
"""Build custom tools based on agent config permissions.""" """Build custom tools based on agent config permissions."""
tools = [] tools = []
if "slack" in self.config.tools: if "slack" in self.config.tools:
tools.extend(self._create_slack_tools()) tools.extend(self._create_slack_tools())
if "tasks" in self.config.tools: if "tasks" in self.config.tools:
tools.extend(self._create_task_tools()) tools.extend(self._create_task_tools())
return tools return tools
def _build_hooks(self) -> dict: def _build_hooks(self) -> dict:
"""Build permission hooks to enforce tool restrictions.""" """Build permission hooks to enforce tool restrictions."""
async def check_permissions(input_data, tool_use_id, context): async def check_permissions(input_data, tool_use_id, context):
tool_name = input_data.get("tool_name") tool_name = input_data.get("tool_name")
tool_input = input_data.get("tool_input", {}) tool_input = input_data.get("tool_input", {})
# Check git branch restrictions # Check git branch restrictions
if tool_name == "Bash" and "git push" in tool_input.get("command", ""): if tool_name == "Bash" and "git push" in tool_input.get("command", ""):
branch = self._extract_branch(tool_input["command"]) branch = self._extract_branch(tool_input["command"])
@ -926,7 +786,7 @@ details, decisions made), update your memory file at /memory/memory.md."""
} }
} }
return {} return {}
return { return {
"PreToolUse": [HookMatcher(hooks=[check_permissions])] "PreToolUse": [HookMatcher(hooks=[check_permissions])]
} }
@ -1213,164 +1073,6 @@ class GiteaClient:
--- ---
## Message Queue Architecture (Redis)
Redis provides reliable message queuing between the orchestrator and agent containers, ensuring messages aren't lost if an agent is temporarily unavailable.
### Queue Structure
```
Redis Keys:
├── queue:agent:{agent_id}:inbox # Messages waiting to be processed
├── queue:agent:{agent_id}:processing # Currently being processed (for reliability)
├── queue:agent:{agent_id}:outbox # Responses ready for orchestrator
├── agent:{agent_id}:status # Agent health status (heartbeat)
└── agent:{agent_id}:current_task # Current task ID (for recovery)
```
### Message Flow with Redis
```
┌─────────────────────────────────────────────────────────────────┐
│ Orchestrator │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│ │ Slack │───▶│ Router │───▶│ Redis Publisher │ │
│ │ Listener │ │ │ │ (LPUSH to inbox) │ │
│ └─────────────┘ └─────────────┘ └─────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────┐ │ │
│ │ Response Handler │◀────────────┘ │
│ │ (BRPOP from outbox queues) │ │
│ └─────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│ Redis
┌─────────────────────────────────────────────────────────────────┐
│ Agent Container │
│ ┌─────────────────────────────────┐ │
│ │ Message Consumer │ │
│ │ (BRPOPLPUSH inbox→processing) │ │
│ └───────────────┬─────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────┐ │
│ │ Claude SDK query() │ │
│ │ - Inject memory │ │
│ │ - Execute tools │ │
│ │ - Update memory │ │
│ └───────────────┬─────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────┐ │
│ │ Response Publisher │ │
│ │ (LPUSH to outbox, DEL from │ │
│ │ processing) │ │
│ └─────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
```
### Reliability Features
1. **At-least-once delivery**: Using `BRPOPLPUSH` moves message to processing queue atomically
2. **Recovery on crash**: On startup, agents check processing queue for incomplete tasks
3. **Heartbeat monitoring**: Agents publish heartbeat to `agent:{id}:status` every 10s
4. **Dead letter queue**: Messages that fail 3 times move to `queue:dead_letter`
### Message Format
```python
# Inbox message (orchestrator → agent)
{
"id": "msg-uuid-123",
"timestamp": "2025-01-15T10:00:00Z",
"type": "slack_message",
"payload": {
"text": "@developer please implement the login feature",
"channel": "proj-website",
"thread_ts": "1234567890.123456",
"user": "U123456", # Slack user who sent it
"context": {
"thread_messages": [...], # Recent thread history
"mentioned_agents": ["developer"]
}
},
"reply_to": "queue:orchestrator:responses",
"retry_count": 0
}
# Outbox message (agent → orchestrator)
{
"id": "resp-uuid-456",
"request_id": "msg-uuid-123",
"timestamp": "2025-01-15T10:00:05Z",
"agent_id": "developer",
"payload": {
"text": "I'll start working on the login feature...",
"channel": "proj-website",
"thread_ts": "1234567890.123456",
"mentions": ["tech_lead"] # Agents to notify
}
}
```
### Redis Client Implementation
```python
"""
src/orchestra/queue/redis_client.py
"""
import redis.asyncio as redis
from typing import Optional
import json
class AgentQueue:
def __init__(self, agent_id: str, redis_url: str = "redis://localhost:6379"):
self.agent_id = agent_id
self.redis = redis.from_url(redis_url)
self.inbox = f"queue:agent:{agent_id}:inbox"
self.processing = f"queue:agent:{agent_id}:processing"
self.outbox = f"queue:agent:{agent_id}:outbox"
async def receive(self, timeout: int = 0) -> Optional[dict]:
"""Block until message available, move to processing atomically."""
result = await self.redis.brpoplpush(
self.inbox, self.processing, timeout=timeout
)
if result:
return json.loads(result)
return None
async def complete(self, message_id: str, response: dict) -> None:
"""Mark message complete and publish response."""
pipe = self.redis.pipeline()
# Remove from processing
pipe.lrem(self.processing, 1, message_id)
# Publish response
pipe.lpush(self.outbox, json.dumps(response))
await pipe.execute()
async def recover_incomplete(self) -> list[dict]:
"""On startup, recover any messages left in processing."""
messages = []
while True:
msg = await self.redis.rpoplpush(self.processing, self.inbox)
if not msg:
break
messages.append(json.loads(msg))
return messages
async def heartbeat(self) -> None:
"""Update agent status."""
await self.redis.set(
f"agent:{self.agent_id}:status",
json.dumps({"status": "healthy", "timestamp": datetime.utcnow().isoformat()}),
ex=30 # Expire after 30s if no heartbeat
)
```
---
## Slack Integration ## Slack Integration
### Event Flow ### Event Flow
@ -1390,17 +1092,12 @@ Slack Event (message/mention)
┌─────────────────┐ ┌─────────────────┐
Redis Queue │ ← LPUSH to agent inbox (reliable delivery) Agent Queue │ ← Each agent has async message queue
└────────┬────────┘ └────────┬────────┘
┌─────────────────┐ ┌─────────────────┐
│ Agent Process │ ← BRPOPLPUSH, Claude SDK processes │ Agent Process │ ← Claude SDK processes, may invoke tools
└────────┬────────┘
┌─────────────────┐
│ Redis Response │ ← LPUSH to outbox
└────────┬────────┘ └────────┬────────┘
@ -1426,30 +1123,6 @@ Agents post messages with a consistent format:
## Memory System ## Memory System
The memory system provides persistent context across stateless `query()` calls.
Each query injects relevant memory, and agents update their memory file after completing tasks.
### Memory Flow
```
┌──────────────────────────────────────────────────────────────┐
│ Each query() call │
├──────────────────────────────────────────────────────────────┤
│ 1. Load memory.md │
│ 2. Construct prompt: │
│ ┌────────────────────────────────────────────────────┐ │
│ │ ## Your Memory │ │
│ │ {contents of memory.md} │ │
│ │ │ │
│ │ ## Current Task │ │
│ │ {slack message / task description} │ │
│ └────────────────────────────────────────────────────┘ │
│ 3. Execute query() - agent has Read/Write tools │
│ 4. Agent updates /memory/memory.md if needed (via Write) │
│ 5. Return response │
└──────────────────────────────────────────────────────────────┘
```
### Structure ### Structure
``` ```
@ -1495,54 +1168,6 @@ Each query injects relevant memory, and agents update their memory file after co
- Using Next.js App Router (agreed with Terry) - Using Next.js App Router (agreed with Terry)
``` ```
### Memory Manager (`src/orchestra/memory/manager.py`)
```python
"""
Memory manager for loading and updating agent memory files.
Agents update their own memory via the Write tool during query execution.
"""
class MemoryManager:
def __init__(self, agent_id: str, memory_path: str = "/memory"):
self.agent_id = agent_id
self.memory_file = f"{memory_path}/memory.md"
async def load(self) -> str:
"""Load memory content for injection into query prompt."""
try:
with open(self.memory_file, "r") as f:
return f.read()
except FileNotFoundError:
# Initialize empty memory structure
return self._initial_memory()
def _initial_memory(self) -> str:
"""Create initial memory structure for new agent."""
return f"""# Agent Memory - {self.agent_id}
## Active Context
- No active tasks
## Project Knowledge
(none yet)
## Team Preferences
(none yet)
## Learnings
(none yet)
"""
async def update_from_response(self, response: str) -> None:
"""
Optional: Parse response for explicit memory updates.
In practice, agents use the Write tool to update memory.md directly.
This method can be used for automated memory extraction if needed.
"""
pass # Agents self-manage memory via Write tool
```
--- ---
## Implementation Phases ## Implementation Phases
@ -1613,22 +1238,6 @@ class MemoryManager:
version: '3.8' version: '3.8'
services: services:
# Redis for reliable message queuing between orchestrator and agents
redis:
image: redis:7-alpine
command: redis-server --appendonly yes
volumes:
- redis-data:/data
networks:
- orchestra-net
restart: unless-stopped
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
start_period: 10s
orchestrator: orchestrator:
build: build:
context: . context: .
@ -1636,31 +1245,15 @@ services:
environment: environment:
- SLACK_APP_TOKEN=${SLACK_APP_TOKEN} - SLACK_APP_TOKEN=${SLACK_APP_TOKEN}
- SLACK_BOT_TOKEN=${SLACK_BOT_TOKEN} - SLACK_BOT_TOKEN=${SLACK_BOT_TOKEN}
- REDIS_URL=redis://redis:6379
volumes: volumes:
- ./config:/app/config:ro - ./config:/app/config:ro
networks: networks:
- orchestra-net - orchestra-net
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 30s
depends_on: depends_on:
redis: - agent-ceo
condition: service_healthy - agent-pm
agent-ceo: - agent-dev
condition: service_healthy - agent-techlead
agent-pm:
condition: service_healthy
agent-dev:
condition: service_healthy
agent-techlead:
condition: service_healthy
agent-designer:
condition: service_healthy
# Full agent containers (SDK, Node.js, CLI) # Full agent containers (SDK, Node.js, CLI)
agent-ceo: agent-ceo:
@ -1673,26 +1266,16 @@ services:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- GITEA_URL=${GITEA_URL} - GITEA_URL=${GITEA_URL}
- GITEA_API_TOKEN=${GITEA_API_TOKEN} - GITEA_API_TOKEN=${GITEA_API_TOKEN}
- REDIS_URL=redis://redis:6379
volumes: volumes:
- ./config/agents/ceo.yml:/app/config/agent.yml:ro - ./config/agents/ceo.yml:/app/config/agent.yml:ro
- ./data/workspaces/ceo:/workspace - ./data/workspaces/ceo:/workspace
- ./data/repos:/repos
- ./data/memory/ceo:/memory - ./data/memory/ceo:/memory
- ./data/projects:/projects - ./data/projects:/projects
ports: ports:
- "8001:8001" - "8001:8001"
networks: networks:
- orchestra-net - orchestra-net
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8001/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 60s
depends_on:
redis:
condition: service_healthy
agent-pm: agent-pm:
build: build:
@ -1704,26 +1287,16 @@ services:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- GITEA_URL=${GITEA_URL} - GITEA_URL=${GITEA_URL}
- GITEA_API_TOKEN=${GITEA_API_TOKEN} - GITEA_API_TOKEN=${GITEA_API_TOKEN}
- REDIS_URL=redis://redis:6379
volumes: volumes:
- ./config/agents/product_manager.yml:/app/config/agent.yml:ro - ./config/agents/product_manager.yml:/app/config/agent.yml:ro
- ./data/workspaces/product_manager:/workspace - ./data/workspaces/product_manager:/workspace
- ./data/repos:/repos
- ./data/memory/product_manager:/memory - ./data/memory/product_manager:/memory
- ./data/projects:/projects - ./data/projects:/projects
ports: ports:
- "8002:8002" - "8002:8002"
networks: networks:
- orchestra-net - orchestra-net
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8002/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 60s
depends_on:
redis:
condition: service_healthy
agent-dev: agent-dev:
build: build:
@ -1735,25 +1308,15 @@ services:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- GITEA_URL=${GITEA_URL} - GITEA_URL=${GITEA_URL}
- GITEA_API_TOKEN=${GITEA_API_TOKEN} - GITEA_API_TOKEN=${GITEA_API_TOKEN}
- REDIS_URL=redis://redis:6379
volumes: volumes:
- ./config/agents/developer.yml:/app/config/agent.yml:ro - ./config/agents/developer.yml:/app/config/agent.yml:ro
- ./data/workspaces/developer:/workspace - ./data/workspaces/developer:/workspace
- ./data/repos:/repos
- ./data/memory/developer:/memory - ./data/memory/developer:/memory
ports: ports:
- "8003:8003" - "8003:8003"
networks: networks:
- orchestra-net - orchestra-net
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8003/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 60s
depends_on:
redis:
condition: service_healthy
agent-techlead: agent-techlead:
build: build:
@ -1765,59 +1328,18 @@ services:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- GITEA_URL=${GITEA_URL} - GITEA_URL=${GITEA_URL}
- GITEA_API_TOKEN=${GITEA_API_TOKEN} - GITEA_API_TOKEN=${GITEA_API_TOKEN}
- REDIS_URL=redis://redis:6379
volumes: volumes:
- ./config/agents/tech_lead.yml:/app/config/agent.yml:ro - ./config/agents/tech_lead.yml:/app/config/agent.yml:ro
- ./data/workspaces/tech_lead:/workspace - ./data/workspaces/tech_lead:/workspace
- ./data/repos:/repos:ro # Read-only for tech lead
- ./data/memory/tech_lead:/memory - ./data/memory/tech_lead:/memory
ports: ports:
- "8004:8004" - "8004:8004"
networks: networks:
- orchestra-net - orchestra-net
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8004/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 60s
depends_on:
redis:
condition: service_healthy
agent-designer:
build:
context: .
dockerfile: Dockerfile.agent
environment:
- AGENT_ID=designer
- AGENT_PORT=8005
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- GITEA_URL=${GITEA_URL}
- GITEA_API_TOKEN=${GITEA_API_TOKEN}
- REDIS_URL=redis://redis:6379
volumes:
- ./config/agents/designer.yml:/app/config/agent.yml:ro
- ./data/workspaces/designer:/workspace
- ./data/memory/designer:/memory
ports:
- "8005:8005"
networks:
- orchestra-net
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8005/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 60s
depends_on:
redis:
condition: service_healthy
volumes: volumes:
orchestra-data: orchestra-data:
redis-data:
networks: networks:
orchestra-net: orchestra-net:
@ -1872,7 +1394,7 @@ COPY src/orchestra/agent/ ./src/orchestra/agent/
COPY src/orchestra/tools/ ./src/orchestra/tools/ COPY src/orchestra/tools/ ./src/orchestra/tools/
# Create workspace structure # Create workspace structure
RUN mkdir -p /workspace /memory RUN mkdir -p /workspace /repos /memory
# Expose HTTP API port # Expose HTTP API port
EXPOSE 8001 EXPOSE 8001
@ -1903,7 +1425,6 @@ dependencies = [
"anyio>=4.0.0", "anyio>=4.0.0",
"fastapi>=0.109.0", # HTTP API for agent containers "fastapi>=0.109.0", # HTTP API for agent containers
"uvicorn>=0.27.0", # ASGI server for FastAPI "uvicorn>=0.27.0", # ASGI server for FastAPI
"redis>=5.0.0", # Async Redis client for message queuing
] ]
[project.optional-dependencies] [project.optional-dependencies]
@ -1912,7 +1433,6 @@ dev = [
"pytest-asyncio>=0.23.0", "pytest-asyncio>=0.23.0",
"pytest-mock>=3.12.0", "pytest-mock>=3.12.0",
"ruff>=0.1.0", "ruff>=0.1.0",
"fakeredis>=2.20.0", # Redis mock for testing
] ]
``` ```