Compare commits
2 Commits
03484cf5b7
...
896ff001eb
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
896ff001eb | ||
|
|
59ac78ca39 |
71
CLAUDE.md
71
CLAUDE.md
@ -1,71 +0,0 @@
|
||||
# Agent Orchestra
|
||||
|
||||
Multi-agent system with Claude agents communicating via Slack.
|
||||
Each agent runs inside its own Docker container with the Claude Agent SDK.
|
||||
|
||||
## Quick Start
|
||||
```bash
|
||||
# Start all services
|
||||
docker-compose up -d
|
||||
|
||||
# Or for local development:
|
||||
uv sync
|
||||
cp .env.example .env # Fill in ANTHROPIC_API_KEY, Slack tokens, Gitea tokens
|
||||
uv run python -m orchestra.main
|
||||
```
|
||||
|
||||
## Architecture
|
||||
|
||||
- **Orchestrator**: Lightweight service that routes Slack messages to agents via HTTP
|
||||
- **Agent Containers**: Each runs Claude SDK + HTTP API for receiving messages
|
||||
- **Tools**: Built-in (Read/Write/Bash run in container) + custom MCP tools
|
||||
- **Permissions**: PreToolUse hooks enforce agent-specific restrictions
|
||||
- **Communication**: Orchestrator → HTTP → Agent containers
|
||||
|
||||
```
|
||||
Orchestrator (Slack listener) --HTTP--> Agent Containers (SDK + HTTP API)
|
||||
```
|
||||
|
||||
## Claude Agent SDK Usage
|
||||
|
||||
Each agent container runs `ClaudeSDKClient` for persistent conversations.
|
||||
The orchestrator communicates with agents via HTTP API.
|
||||
|
||||
```python
|
||||
# In agent container
|
||||
from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions
|
||||
|
||||
async with ClaudeSDKClient(options) as client:
|
||||
await client.query(message)
|
||||
async for msg in client.receive_response():
|
||||
process(msg)
|
||||
```
|
||||
|
||||
Custom tools use `@tool` decorator and `create_sdk_mcp_server()`.
|
||||
|
||||
## Key Files
|
||||
- `config/orchestra.yml` - Global config (Slack, agent endpoints)
|
||||
- `config/agents/*.yml` - Agent definitions (tools, permissions, prompts)
|
||||
- `src/orchestra/core/orchestrator.py` - Slack listener, HTTP routing
|
||||
- `src/orchestra/agent/agent.py` - Agent service with SDK + HTTP API
|
||||
- `src/orchestra/tools/` - Custom MCP tool implementations
|
||||
|
||||
## Agent Permissions
|
||||
|
||||
Each agent has:
|
||||
- `allowed_tools` / `disallowed_tools` - Tool access
|
||||
- `permissions.filesystem` - Path restrictions
|
||||
- `permissions.git` - Branch push/merge restrictions
|
||||
|
||||
Enforced via PreToolUse hooks that check before execution.
|
||||
|
||||
## Testing
|
||||
```bash
|
||||
uv run pytest
|
||||
uv run pytest tests/test_agent.py -v
|
||||
```
|
||||
|
||||
## Common Tasks
|
||||
- **Add agent**: Create YAML in config/agents/, add to docker-compose
|
||||
- **Add tool**: Use @tool decorator in src/orchestra/tools/, register in server
|
||||
- **Debug agent**: Check container logs: `docker logs agent-dev`
|
||||
600
PLAN.md
600
PLAN.md
@ -12,7 +12,12 @@ A multi-agent system where Claude-powered agents collaborate via Slack, each run
|
||||
- `query()` - One-shot, creates new session each time
|
||||
- `ClaudeSDKClient` - Persistent session, maintains conversation history
|
||||
|
||||
We use `ClaudeSDKClient` because agents need to remember context across messages.
|
||||
We use `query()` with explicit memory injection because:
|
||||
- Full control over what context goes into each request
|
||||
- Memory persists in files (survives restarts)
|
||||
- Inspectable/debuggable - can read memory.md to see agent state
|
||||
- More cost-effective - only sends relevant context, not full history
|
||||
- Selective memory - can summarize, prune, or prioritize what's included
|
||||
|
||||
2. **Built-in Tools**: The SDK includes Claude Code's tools:
|
||||
- `Read`, `Write`, `Edit` - File operations
|
||||
@ -84,10 +89,10 @@ We choose **Option B** because:
|
||||
- Cleaner permission model: container boundaries enforce filesystem isolation
|
||||
- More realistic: tools like Bash run in the agent's actual environment
|
||||
|
||||
### Setting Up SDK with Agent Options
|
||||
### Setting Up SDK with query() and Memory Injection
|
||||
|
||||
```python
|
||||
from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions, tool, create_sdk_mcp_server
|
||||
from claude_agent_sdk import query, ClaudeAgentOptions, tool, create_sdk_mcp_server
|
||||
|
||||
# Create custom tools
|
||||
@tool("slack_send", "Send Slack message", {"channel": str, "text": str})
|
||||
@ -104,7 +109,7 @@ tools_server = create_sdk_mcp_server(
|
||||
tools=[slack_send]
|
||||
)
|
||||
|
||||
# Configure agent
|
||||
# Configure agent options
|
||||
options = ClaudeAgentOptions(
|
||||
system_prompt=agent_config.system_prompt,
|
||||
model="sonnet",
|
||||
@ -128,17 +133,27 @@ options = ClaudeAgentOptions(
|
||||
}
|
||||
)
|
||||
|
||||
# Create persistent client
|
||||
async with ClaudeSDKClient(options) as client:
|
||||
# Send message (client remembers context)
|
||||
await client.query("Create a hello.py file")
|
||||
async for msg in client.receive_response():
|
||||
handle_message(msg)
|
||||
# Query pattern: inject memory + task, get response, update memory
|
||||
async def process_task(task: str, memory_path: str) -> str:
|
||||
# 1. Load relevant memory
|
||||
memory_content = read_memory(memory_path)
|
||||
|
||||
# Follow-up (same session, remembers previous)
|
||||
await client.query("Add a main function")
|
||||
async for msg in client.receive_response():
|
||||
handle_message(msg)
|
||||
# 2. Construct prompt with memory context
|
||||
prompt = f"""## Your Memory
|
||||
{memory_content}
|
||||
|
||||
## Current Task
|
||||
{task}
|
||||
|
||||
After completing the task, note any important information to remember."""
|
||||
|
||||
# 3. One-shot query (no session state)
|
||||
response = await query(prompt, options=options)
|
||||
|
||||
# 4. Extract and update memory from response
|
||||
update_memory(memory_path, response)
|
||||
|
||||
return response
|
||||
```
|
||||
|
||||
---
|
||||
@ -180,16 +195,16 @@ async with ClaudeSDKClient(options) as client:
|
||||
│ │ - Claude CLI│ │ - Claude CLI│ │ - Claude CLI│ │
|
||||
│ │ - Node.js │ │ - Node.js │ │ - Node.js │ │
|
||||
│ │ - memory/ │ │ - memory/ │ │ - memory/ │ │
|
||||
│ │ - git clone │ │ - git clone │ │ - git clone │ │
|
||||
│ │ - workspace/│ │ - workspace/│ │ - workspace/│ │
|
||||
│ │ - HTTP API │ │ - HTTP API │ │ - HTTP API │ │
|
||||
│ └─────────────┘ └─────────────┘ └─────────────┘ │
|
||||
│ │ │ │ │
|
||||
│ └────────────────────┼────────────────────┘ │
|
||||
│ │ git clone/push │
|
||||
│ ▼ │
|
||||
│ ┌─────────────────┐ │
|
||||
│ │ Shared Volume │ │
|
||||
│ │ - /repos/ │ │
|
||||
│ │ - /projects/ │ │
|
||||
│ │ Gitea Server │ │
|
||||
│ │ (external) │ │
|
||||
│ └─────────────────┘ │
|
||||
└─────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
@ -235,6 +250,12 @@ agent-orchestra/
|
||||
│ │ ├── events.py # Event handlers, routing
|
||||
│ │ └── formatter.py # Message formatting
|
||||
│ │
|
||||
│ ├── queue/
|
||||
│ │ ├── __init__.py
|
||||
│ │ ├── redis_client.py # AgentQueue class for message handling
|
||||
│ │ ├── publisher.py # Orchestrator-side message publishing
|
||||
│ │ └── consumer.py # Agent-side message consumption
|
||||
│ │
|
||||
│ ├── tools/
|
||||
│ │ ├── __init__.py
|
||||
│ │ ├── server.py # create_sdk_mcp_server setup
|
||||
@ -248,11 +269,11 @@ agent-orchestra/
|
||||
│ └── manager.py # Memory file read/write
|
||||
│
|
||||
├── data/ # Mounted volumes
|
||||
│ ├── workspaces/ # Per-agent working directories
|
||||
│ ├── workspaces/ # Per-agent working directories (each clones from Gitea)
|
||||
│ │ ├── ceo/
|
||||
│ │ ├── developer/
|
||||
│ │ ├── product_manager/
|
||||
│ │ └── tech_lead/
|
||||
│ ├── repos/ # Shared git repository clone
|
||||
│ ├── projects/ # Project/task JSON files
|
||||
│ │ └── boards.json
|
||||
│ └── memory/ # Agent memory folders
|
||||
@ -449,7 +470,6 @@ disallowed_tools:
|
||||
permissions:
|
||||
filesystem:
|
||||
allowed_paths:
|
||||
- "/repos/**"
|
||||
- "/workspace/**"
|
||||
git:
|
||||
can_push_to: [] # Tech lead reviews, doesn't push
|
||||
@ -488,7 +508,7 @@ system_prompt: |
|
||||
3. Implement the solution with tests
|
||||
4. Create a PR and notify tech_lead for review
|
||||
|
||||
You have access to the shared repository at /repos/main.
|
||||
Clone repositories from Gitea into /workspace.
|
||||
|
||||
Use the Slack tools to communicate with your team.
|
||||
Use the task tools to update your task status.
|
||||
@ -521,10 +541,9 @@ disallowed_tools:
|
||||
permissions:
|
||||
filesystem:
|
||||
allowed_paths:
|
||||
- "/repos/**"
|
||||
- "/workspace/**"
|
||||
denied_paths:
|
||||
- "/repos/.git/config"
|
||||
- "/workspace/**/.git/config"
|
||||
- "**/.env"
|
||||
- "**/secrets/**"
|
||||
|
||||
@ -554,6 +573,105 @@ container:
|
||||
- "PYTHONPATH=/workspace"
|
||||
```
|
||||
|
||||
### Designer Config (`config/agents/designer.yml`)
|
||||
|
||||
```yaml
|
||||
id: designer
|
||||
name: "Designer Dani"
|
||||
slack_user_id: "${SLACK_DESIGNER_USER_ID}"
|
||||
|
||||
model: "sonnet"
|
||||
max_turns: 50
|
||||
|
||||
system_prompt: |
|
||||
You are Designer Dani, responsible for UI/UX design and frontend aesthetics.
|
||||
|
||||
Your responsibilities:
|
||||
- Create and refine UI designs based on requirements
|
||||
- Implement CSS/styling changes
|
||||
- Ensure consistent design system usage
|
||||
- Review frontend PRs for design consistency
|
||||
- Provide design feedback and mockup descriptions
|
||||
|
||||
When working on design tasks:
|
||||
1. Understand the user experience goals
|
||||
2. Reference the existing design system (shadcn/ui, Tailwind)
|
||||
3. Implement pixel-perfect styling
|
||||
4. Ensure responsive design across breakpoints
|
||||
5. Document design decisions in comments
|
||||
|
||||
You work closely with developers on frontend implementation.
|
||||
|
||||
allowed_tools:
|
||||
- Read
|
||||
- Write
|
||||
- Edit
|
||||
- Bash
|
||||
- Glob
|
||||
- Grep
|
||||
- WebSearch
|
||||
- WebFetch
|
||||
# Slack tools
|
||||
- mcp__agent__slack_send_message
|
||||
- mcp__agent__slack_reply_thread
|
||||
# Task tools
|
||||
- mcp__agent__tasks_view
|
||||
- mcp__agent__tasks_update_status
|
||||
- mcp__agent__tasks_add_comment
|
||||
# Git tools - can create PRs for design changes
|
||||
- mcp__agent__gitea_create_pr
|
||||
- mcp__agent__gitea_list_prs
|
||||
- mcp__agent__gitea_add_comment
|
||||
|
||||
disallowed_tools:
|
||||
- mcp__agent__slack_create_channel
|
||||
- mcp__agent__slack_delete_channel
|
||||
- mcp__agent__tasks_create_project
|
||||
- mcp__agent__tasks_create_board
|
||||
- mcp__agent__gitea_merge_pr
|
||||
|
||||
permissions:
|
||||
filesystem:
|
||||
allowed_paths:
|
||||
- "/workspace/**/*.css"
|
||||
- "/workspace/**/*.scss"
|
||||
- "/workspace/**/*.tsx"
|
||||
- "/workspace/**/*.jsx"
|
||||
- "/workspace/**/components/**"
|
||||
- "/workspace/**/styles/**"
|
||||
- "/workspace/**/public/**"
|
||||
denied_paths:
|
||||
- "/workspace/**/.git/config"
|
||||
- "**/.env"
|
||||
- "**/secrets/**"
|
||||
- "/workspace/**/api/**"
|
||||
- "/workspace/**/server/**"
|
||||
|
||||
git:
|
||||
branch_pattern: "design/*"
|
||||
can_push_to:
|
||||
- "design/*"
|
||||
cannot_push_to:
|
||||
- "main"
|
||||
- "dev"
|
||||
- "feature/*"
|
||||
|
||||
memory:
|
||||
path: "/memory/designer"
|
||||
files:
|
||||
- "memory.md"
|
||||
|
||||
can_mention:
|
||||
- developer
|
||||
- product_manager
|
||||
- tech_lead
|
||||
|
||||
container:
|
||||
resources:
|
||||
memory: "2g"
|
||||
cpus: "1.0"
|
||||
```
|
||||
|
||||
### Product Manager Config (`config/agents/product_manager.yml`)
|
||||
|
||||
```yaml
|
||||
@ -610,7 +728,6 @@ disallowed_tools:
|
||||
permissions:
|
||||
filesystem:
|
||||
allowed_paths:
|
||||
- "/repos/**"
|
||||
- "/workspace/**"
|
||||
- "/projects/**"
|
||||
|
||||
@ -669,32 +786,29 @@ The agent exposes an HTTP API for the orchestrator to send messages.
|
||||
"""
|
||||
Agent service running inside container:
|
||||
- Exposes HTTP API for receiving messages from orchestrator
|
||||
- Uses ClaudeSDKClient for persistent conversation
|
||||
- Uses query() with explicit memory injection (stateless)
|
||||
- Custom tools via SDK MCP servers (in-process)
|
||||
- Tools operate directly on container filesystem
|
||||
- Memory management local to container
|
||||
- Memory persisted to files, injected into each query
|
||||
"""
|
||||
from claude_agent_sdk import (
|
||||
ClaudeSDKClient,
|
||||
query,
|
||||
ClaudeAgentOptions,
|
||||
tool,
|
||||
create_sdk_mcp_server,
|
||||
AssistantMessage,
|
||||
TextBlock,
|
||||
ToolUseBlock,
|
||||
HookMatcher
|
||||
)
|
||||
|
||||
class AgentService:
|
||||
"""HTTP service that wraps the Claude SDK client."""
|
||||
"""HTTP service that uses stateless query() with memory injection."""
|
||||
id: str
|
||||
config: AgentConfig
|
||||
client: ClaudeSDKClient # Persistent session
|
||||
options: ClaudeAgentOptions # Reused for each query
|
||||
memory: MemoryManager
|
||||
app: FastAPI # HTTP server for orchestrator communication
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Initialize SDK client and start HTTP server."""
|
||||
"""Initialize SDK options and start HTTP server."""
|
||||
# Create in-process MCP server for custom tools
|
||||
tools_server = create_sdk_mcp_server(
|
||||
name=f"{self.id}-tools",
|
||||
@ -702,7 +816,7 @@ class AgentService:
|
||||
tools=self._build_tools()
|
||||
)
|
||||
|
||||
options = ClaudeAgentOptions(
|
||||
self.options = ClaudeAgentOptions(
|
||||
system_prompt=self.config.system_prompt,
|
||||
model=self.config.model, # e.g., "sonnet", "opus"
|
||||
allowed_tools=self.config.allowed_tools,
|
||||
@ -721,9 +835,6 @@ class AgentService:
|
||||
hooks=self._build_hooks()
|
||||
)
|
||||
|
||||
self.client = ClaudeSDKClient(options)
|
||||
await self.client.connect()
|
||||
|
||||
# Start HTTP server for orchestrator
|
||||
await self._start_http_server()
|
||||
|
||||
@ -741,21 +852,50 @@ class AgentService:
|
||||
return {"status": "ok", "agent_id": self.id}
|
||||
|
||||
async def process_message(self, message: str, context: dict) -> str:
|
||||
"""Send message and collect response. Client maintains conversation history."""
|
||||
await self.client.query(message)
|
||||
"""
|
||||
Stateless query with memory injection.
|
||||
|
||||
response_text = ""
|
||||
async for msg in self.client.receive_response():
|
||||
if isinstance(msg, AssistantMessage):
|
||||
for block in msg.content:
|
||||
if isinstance(block, TextBlock):
|
||||
response_text += block.text
|
||||
Flow:
|
||||
1. Load memory from file
|
||||
2. Construct prompt with memory + message
|
||||
3. Execute query()
|
||||
4. Update memory file with new learnings
|
||||
5. Return response
|
||||
"""
|
||||
# Load current memory state
|
||||
memory_content = await self.memory.load()
|
||||
|
||||
return response_text
|
||||
# Construct prompt with injected memory
|
||||
prompt = self._build_prompt(message, memory_content, context)
|
||||
|
||||
# Stateless query - no session maintained
|
||||
response = await query(prompt, options=self.options)
|
||||
|
||||
# Extract and persist any memory updates
|
||||
await self.memory.update_from_response(response)
|
||||
|
||||
return self._extract_response_text(response)
|
||||
|
||||
def _build_prompt(self, message: str, memory: str, context: dict) -> str:
|
||||
"""Construct prompt with memory context."""
|
||||
thread_context = ""
|
||||
if context.get("thread_messages"):
|
||||
thread_context = "\n## Recent Thread Context\n"
|
||||
for msg in context["thread_messages"][-5:]: # Last 5 messages
|
||||
thread_context += f"- {msg['author']}: {msg['text']}\n"
|
||||
|
||||
return f"""## Your Memory
|
||||
{memory}
|
||||
{thread_context}
|
||||
## Current Message
|
||||
{message}
|
||||
|
||||
Complete this task. If you learn anything important (team preferences, project
|
||||
details, decisions made), update your memory file at /memory/memory.md."""
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Disconnect the client and stop HTTP server."""
|
||||
await self.client.disconnect()
|
||||
"""Stop HTTP server."""
|
||||
pass # No client to disconnect with stateless query()
|
||||
|
||||
def _build_tools(self) -> list:
|
||||
"""Build custom tools based on agent config permissions."""
|
||||
@ -1073,6 +1213,164 @@ class GiteaClient:
|
||||
|
||||
---
|
||||
|
||||
## Message Queue Architecture (Redis)
|
||||
|
||||
Redis provides reliable message queuing between the orchestrator and agent containers, ensuring messages aren't lost if an agent is temporarily unavailable.
|
||||
|
||||
### Queue Structure
|
||||
|
||||
```
|
||||
Redis Keys:
|
||||
├── queue:agent:{agent_id}:inbox # Messages waiting to be processed
|
||||
├── queue:agent:{agent_id}:processing # Currently being processed (for reliability)
|
||||
├── queue:agent:{agent_id}:outbox # Responses ready for orchestrator
|
||||
├── agent:{agent_id}:status # Agent health status (heartbeat)
|
||||
└── agent:{agent_id}:current_task # Current task ID (for recovery)
|
||||
```
|
||||
|
||||
### Message Flow with Redis
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────────────┐
|
||||
│ Orchestrator │
|
||||
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
|
||||
│ │ Slack │───▶│ Router │───▶│ Redis Publisher │ │
|
||||
│ │ Listener │ │ │ │ (LPUSH to inbox) │ │
|
||||
│ └─────────────┘ └─────────────┘ └─────────────────────┘ │
|
||||
│ │ │
|
||||
│ ┌─────────────────────────────────┐ │ │
|
||||
│ │ Response Handler │◀────────────┘ │
|
||||
│ │ (BRPOP from outbox queues) │ │
|
||||
│ └─────────────────────────────────┘ │
|
||||
└─────────────────────────────────────────────────────────────────┘
|
||||
│
|
||||
│ Redis
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────────────────┐
|
||||
│ Agent Container │
|
||||
│ ┌─────────────────────────────────┐ │
|
||||
│ │ Message Consumer │ │
|
||||
│ │ (BRPOPLPUSH inbox→processing) │ │
|
||||
│ └───────────────┬─────────────────┘ │
|
||||
│ │ │
|
||||
│ ▼ │
|
||||
│ ┌─────────────────────────────────┐ │
|
||||
│ │ Claude SDK query() │ │
|
||||
│ │ - Inject memory │ │
|
||||
│ │ - Execute tools │ │
|
||||
│ │ - Update memory │ │
|
||||
│ └───────────────┬─────────────────┘ │
|
||||
│ │ │
|
||||
│ ▼ │
|
||||
│ ┌─────────────────────────────────┐ │
|
||||
│ │ Response Publisher │ │
|
||||
│ │ (LPUSH to outbox, DEL from │ │
|
||||
│ │ processing) │ │
|
||||
│ └─────────────────────────────────┘ │
|
||||
└─────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
### Reliability Features
|
||||
|
||||
1. **At-least-once delivery**: Using `BRPOPLPUSH` moves message to processing queue atomically
|
||||
2. **Recovery on crash**: On startup, agents check processing queue for incomplete tasks
|
||||
3. **Heartbeat monitoring**: Agents publish heartbeat to `agent:{id}:status` every 10s
|
||||
4. **Dead letter queue**: Messages that fail 3 times move to `queue:dead_letter`
|
||||
|
||||
### Message Format
|
||||
|
||||
```python
|
||||
# Inbox message (orchestrator → agent)
|
||||
{
|
||||
"id": "msg-uuid-123",
|
||||
"timestamp": "2025-01-15T10:00:00Z",
|
||||
"type": "slack_message",
|
||||
"payload": {
|
||||
"text": "@developer please implement the login feature",
|
||||
"channel": "proj-website",
|
||||
"thread_ts": "1234567890.123456",
|
||||
"user": "U123456", # Slack user who sent it
|
||||
"context": {
|
||||
"thread_messages": [...], # Recent thread history
|
||||
"mentioned_agents": ["developer"]
|
||||
}
|
||||
},
|
||||
"reply_to": "queue:orchestrator:responses",
|
||||
"retry_count": 0
|
||||
}
|
||||
|
||||
# Outbox message (agent → orchestrator)
|
||||
{
|
||||
"id": "resp-uuid-456",
|
||||
"request_id": "msg-uuid-123",
|
||||
"timestamp": "2025-01-15T10:00:05Z",
|
||||
"agent_id": "developer",
|
||||
"payload": {
|
||||
"text": "I'll start working on the login feature...",
|
||||
"channel": "proj-website",
|
||||
"thread_ts": "1234567890.123456",
|
||||
"mentions": ["tech_lead"] # Agents to notify
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Redis Client Implementation
|
||||
|
||||
```python
|
||||
"""
|
||||
src/orchestra/queue/redis_client.py
|
||||
"""
|
||||
import redis.asyncio as redis
|
||||
from typing import Optional
|
||||
import json
|
||||
|
||||
class AgentQueue:
|
||||
def __init__(self, agent_id: str, redis_url: str = "redis://localhost:6379"):
|
||||
self.agent_id = agent_id
|
||||
self.redis = redis.from_url(redis_url)
|
||||
self.inbox = f"queue:agent:{agent_id}:inbox"
|
||||
self.processing = f"queue:agent:{agent_id}:processing"
|
||||
self.outbox = f"queue:agent:{agent_id}:outbox"
|
||||
|
||||
async def receive(self, timeout: int = 0) -> Optional[dict]:
|
||||
"""Block until message available, move to processing atomically."""
|
||||
result = await self.redis.brpoplpush(
|
||||
self.inbox, self.processing, timeout=timeout
|
||||
)
|
||||
if result:
|
||||
return json.loads(result)
|
||||
return None
|
||||
|
||||
async def complete(self, message_id: str, response: dict) -> None:
|
||||
"""Mark message complete and publish response."""
|
||||
pipe = self.redis.pipeline()
|
||||
# Remove from processing
|
||||
pipe.lrem(self.processing, 1, message_id)
|
||||
# Publish response
|
||||
pipe.lpush(self.outbox, json.dumps(response))
|
||||
await pipe.execute()
|
||||
|
||||
async def recover_incomplete(self) -> list[dict]:
|
||||
"""On startup, recover any messages left in processing."""
|
||||
messages = []
|
||||
while True:
|
||||
msg = await self.redis.rpoplpush(self.processing, self.inbox)
|
||||
if not msg:
|
||||
break
|
||||
messages.append(json.loads(msg))
|
||||
return messages
|
||||
|
||||
async def heartbeat(self) -> None:
|
||||
"""Update agent status."""
|
||||
await self.redis.set(
|
||||
f"agent:{self.agent_id}:status",
|
||||
json.dumps({"status": "healthy", "timestamp": datetime.utcnow().isoformat()}),
|
||||
ex=30 # Expire after 30s if no heartbeat
|
||||
)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Slack Integration
|
||||
|
||||
### Event Flow
|
||||
@ -1092,12 +1390,17 @@ Slack Event (message/mention)
|
||||
│
|
||||
▼
|
||||
┌─────────────────┐
|
||||
│ Agent Queue │ ← Each agent has async message queue
|
||||
│ Redis Queue │ ← LPUSH to agent inbox (reliable delivery)
|
||||
└────────┬────────┘
|
||||
│
|
||||
▼
|
||||
┌─────────────────┐
|
||||
│ Agent Process │ ← Claude SDK processes, may invoke tools
|
||||
│ Agent Process │ ← BRPOPLPUSH, Claude SDK processes
|
||||
└────────┬────────┘
|
||||
│
|
||||
▼
|
||||
┌─────────────────┐
|
||||
│ Redis Response │ ← LPUSH to outbox
|
||||
└────────┬────────┘
|
||||
│
|
||||
▼
|
||||
@ -1123,6 +1426,30 @@ Agents post messages with a consistent format:
|
||||
|
||||
## Memory System
|
||||
|
||||
The memory system provides persistent context across stateless `query()` calls.
|
||||
Each query injects relevant memory, and agents update their memory file after completing tasks.
|
||||
|
||||
### Memory Flow
|
||||
|
||||
```
|
||||
┌──────────────────────────────────────────────────────────────┐
|
||||
│ Each query() call │
|
||||
├──────────────────────────────────────────────────────────────┤
|
||||
│ 1. Load memory.md │
|
||||
│ 2. Construct prompt: │
|
||||
│ ┌────────────────────────────────────────────────────┐ │
|
||||
│ │ ## Your Memory │ │
|
||||
│ │ {contents of memory.md} │ │
|
||||
│ │ │ │
|
||||
│ │ ## Current Task │ │
|
||||
│ │ {slack message / task description} │ │
|
||||
│ └────────────────────────────────────────────────────┘ │
|
||||
│ 3. Execute query() - agent has Read/Write tools │
|
||||
│ 4. Agent updates /memory/memory.md if needed (via Write) │
|
||||
│ 5. Return response │
|
||||
└──────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
### Structure
|
||||
|
||||
```
|
||||
@ -1168,6 +1495,54 @@ Agents post messages with a consistent format:
|
||||
- Using Next.js App Router (agreed with Terry)
|
||||
```
|
||||
|
||||
### Memory Manager (`src/orchestra/memory/manager.py`)
|
||||
|
||||
```python
|
||||
"""
|
||||
Memory manager for loading and updating agent memory files.
|
||||
Agents update their own memory via the Write tool during query execution.
|
||||
"""
|
||||
|
||||
class MemoryManager:
|
||||
def __init__(self, agent_id: str, memory_path: str = "/memory"):
|
||||
self.agent_id = agent_id
|
||||
self.memory_file = f"{memory_path}/memory.md"
|
||||
|
||||
async def load(self) -> str:
|
||||
"""Load memory content for injection into query prompt."""
|
||||
try:
|
||||
with open(self.memory_file, "r") as f:
|
||||
return f.read()
|
||||
except FileNotFoundError:
|
||||
# Initialize empty memory structure
|
||||
return self._initial_memory()
|
||||
|
||||
def _initial_memory(self) -> str:
|
||||
"""Create initial memory structure for new agent."""
|
||||
return f"""# Agent Memory - {self.agent_id}
|
||||
|
||||
## Active Context
|
||||
- No active tasks
|
||||
|
||||
## Project Knowledge
|
||||
(none yet)
|
||||
|
||||
## Team Preferences
|
||||
(none yet)
|
||||
|
||||
## Learnings
|
||||
(none yet)
|
||||
"""
|
||||
|
||||
async def update_from_response(self, response: str) -> None:
|
||||
"""
|
||||
Optional: Parse response for explicit memory updates.
|
||||
In practice, agents use the Write tool to update memory.md directly.
|
||||
This method can be used for automated memory extraction if needed.
|
||||
"""
|
||||
pass # Agents self-manage memory via Write tool
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Implementation Phases
|
||||
@ -1238,6 +1613,22 @@ Agents post messages with a consistent format:
|
||||
version: '3.8'
|
||||
|
||||
services:
|
||||
# Redis for reliable message queuing between orchestrator and agents
|
||||
redis:
|
||||
image: redis:7-alpine
|
||||
command: redis-server --appendonly yes
|
||||
volumes:
|
||||
- redis-data:/data
|
||||
networks:
|
||||
- orchestra-net
|
||||
restart: unless-stopped
|
||||
healthcheck:
|
||||
test: ["CMD", "redis-cli", "ping"]
|
||||
interval: 10s
|
||||
timeout: 5s
|
||||
retries: 5
|
||||
start_period: 10s
|
||||
|
||||
orchestrator:
|
||||
build:
|
||||
context: .
|
||||
@ -1245,15 +1636,31 @@ services:
|
||||
environment:
|
||||
- SLACK_APP_TOKEN=${SLACK_APP_TOKEN}
|
||||
- SLACK_BOT_TOKEN=${SLACK_BOT_TOKEN}
|
||||
- REDIS_URL=redis://redis:6379
|
||||
volumes:
|
||||
- ./config:/app/config:ro
|
||||
networks:
|
||||
- orchestra-net
|
||||
restart: unless-stopped
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
|
||||
interval: 30s
|
||||
timeout: 10s
|
||||
retries: 3
|
||||
start_period: 30s
|
||||
depends_on:
|
||||
- agent-ceo
|
||||
- agent-pm
|
||||
- agent-dev
|
||||
- agent-techlead
|
||||
redis:
|
||||
condition: service_healthy
|
||||
agent-ceo:
|
||||
condition: service_healthy
|
||||
agent-pm:
|
||||
condition: service_healthy
|
||||
agent-dev:
|
||||
condition: service_healthy
|
||||
agent-techlead:
|
||||
condition: service_healthy
|
||||
agent-designer:
|
||||
condition: service_healthy
|
||||
|
||||
# Full agent containers (SDK, Node.js, CLI)
|
||||
agent-ceo:
|
||||
@ -1266,16 +1673,26 @@ services:
|
||||
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
|
||||
- GITEA_URL=${GITEA_URL}
|
||||
- GITEA_API_TOKEN=${GITEA_API_TOKEN}
|
||||
- REDIS_URL=redis://redis:6379
|
||||
volumes:
|
||||
- ./config/agents/ceo.yml:/app/config/agent.yml:ro
|
||||
- ./data/workspaces/ceo:/workspace
|
||||
- ./data/repos:/repos
|
||||
- ./data/memory/ceo:/memory
|
||||
- ./data/projects:/projects
|
||||
ports:
|
||||
- "8001:8001"
|
||||
networks:
|
||||
- orchestra-net
|
||||
restart: unless-stopped
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "-f", "http://localhost:8001/health"]
|
||||
interval: 30s
|
||||
timeout: 10s
|
||||
retries: 3
|
||||
start_period: 60s
|
||||
depends_on:
|
||||
redis:
|
||||
condition: service_healthy
|
||||
|
||||
agent-pm:
|
||||
build:
|
||||
@ -1287,16 +1704,26 @@ services:
|
||||
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
|
||||
- GITEA_URL=${GITEA_URL}
|
||||
- GITEA_API_TOKEN=${GITEA_API_TOKEN}
|
||||
- REDIS_URL=redis://redis:6379
|
||||
volumes:
|
||||
- ./config/agents/product_manager.yml:/app/config/agent.yml:ro
|
||||
- ./data/workspaces/product_manager:/workspace
|
||||
- ./data/repos:/repos
|
||||
- ./data/memory/product_manager:/memory
|
||||
- ./data/projects:/projects
|
||||
ports:
|
||||
- "8002:8002"
|
||||
networks:
|
||||
- orchestra-net
|
||||
restart: unless-stopped
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "-f", "http://localhost:8002/health"]
|
||||
interval: 30s
|
||||
timeout: 10s
|
||||
retries: 3
|
||||
start_period: 60s
|
||||
depends_on:
|
||||
redis:
|
||||
condition: service_healthy
|
||||
|
||||
agent-dev:
|
||||
build:
|
||||
@ -1308,15 +1735,25 @@ services:
|
||||
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
|
||||
- GITEA_URL=${GITEA_URL}
|
||||
- GITEA_API_TOKEN=${GITEA_API_TOKEN}
|
||||
- REDIS_URL=redis://redis:6379
|
||||
volumes:
|
||||
- ./config/agents/developer.yml:/app/config/agent.yml:ro
|
||||
- ./data/workspaces/developer:/workspace
|
||||
- ./data/repos:/repos
|
||||
- ./data/memory/developer:/memory
|
||||
ports:
|
||||
- "8003:8003"
|
||||
networks:
|
||||
- orchestra-net
|
||||
restart: unless-stopped
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "-f", "http://localhost:8003/health"]
|
||||
interval: 30s
|
||||
timeout: 10s
|
||||
retries: 3
|
||||
start_period: 60s
|
||||
depends_on:
|
||||
redis:
|
||||
condition: service_healthy
|
||||
|
||||
agent-techlead:
|
||||
build:
|
||||
@ -1328,18 +1765,59 @@ services:
|
||||
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
|
||||
- GITEA_URL=${GITEA_URL}
|
||||
- GITEA_API_TOKEN=${GITEA_API_TOKEN}
|
||||
- REDIS_URL=redis://redis:6379
|
||||
volumes:
|
||||
- ./config/agents/tech_lead.yml:/app/config/agent.yml:ro
|
||||
- ./data/workspaces/tech_lead:/workspace
|
||||
- ./data/repos:/repos:ro # Read-only for tech lead
|
||||
- ./data/memory/tech_lead:/memory
|
||||
ports:
|
||||
- "8004:8004"
|
||||
networks:
|
||||
- orchestra-net
|
||||
restart: unless-stopped
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "-f", "http://localhost:8004/health"]
|
||||
interval: 30s
|
||||
timeout: 10s
|
||||
retries: 3
|
||||
start_period: 60s
|
||||
depends_on:
|
||||
redis:
|
||||
condition: service_healthy
|
||||
|
||||
agent-designer:
|
||||
build:
|
||||
context: .
|
||||
dockerfile: Dockerfile.agent
|
||||
environment:
|
||||
- AGENT_ID=designer
|
||||
- AGENT_PORT=8005
|
||||
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
|
||||
- GITEA_URL=${GITEA_URL}
|
||||
- GITEA_API_TOKEN=${GITEA_API_TOKEN}
|
||||
- REDIS_URL=redis://redis:6379
|
||||
volumes:
|
||||
- ./config/agents/designer.yml:/app/config/agent.yml:ro
|
||||
- ./data/workspaces/designer:/workspace
|
||||
- ./data/memory/designer:/memory
|
||||
ports:
|
||||
- "8005:8005"
|
||||
networks:
|
||||
- orchestra-net
|
||||
restart: unless-stopped
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "-f", "http://localhost:8005/health"]
|
||||
interval: 30s
|
||||
timeout: 10s
|
||||
retries: 3
|
||||
start_period: 60s
|
||||
depends_on:
|
||||
redis:
|
||||
condition: service_healthy
|
||||
|
||||
volumes:
|
||||
orchestra-data:
|
||||
redis-data:
|
||||
|
||||
networks:
|
||||
orchestra-net:
|
||||
@ -1394,7 +1872,7 @@ COPY src/orchestra/agent/ ./src/orchestra/agent/
|
||||
COPY src/orchestra/tools/ ./src/orchestra/tools/
|
||||
|
||||
# Create workspace structure
|
||||
RUN mkdir -p /workspace /repos /memory
|
||||
RUN mkdir -p /workspace /memory
|
||||
|
||||
# Expose HTTP API port
|
||||
EXPOSE 8001
|
||||
@ -1425,6 +1903,7 @@ dependencies = [
|
||||
"anyio>=4.0.0",
|
||||
"fastapi>=0.109.0", # HTTP API for agent containers
|
||||
"uvicorn>=0.27.0", # ASGI server for FastAPI
|
||||
"redis>=5.0.0", # Async Redis client for message queuing
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
@ -1433,6 +1912,7 @@ dev = [
|
||||
"pytest-asyncio>=0.23.0",
|
||||
"pytest-mock>=3.12.0",
|
||||
"ruff>=0.1.0",
|
||||
"fakeredis>=2.20.0", # Redis mock for testing
|
||||
]
|
||||
```
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user