This commit is contained in:
Pierre Wessman 2025-11-27 12:06:29 +01:00
parent 59ac78ca39
commit 896ff001eb

389
PLAN.md
View File

@ -250,6 +250,12 @@ agent-orchestra/
│ │ ├── events.py # Event handlers, routing │ │ ├── events.py # Event handlers, routing
│ │ └── formatter.py # Message formatting │ │ └── formatter.py # Message formatting
│ │ │ │
│ ├── queue/
│ │ ├── __init__.py
│ │ ├── redis_client.py # AgentQueue class for message handling
│ │ ├── publisher.py # Orchestrator-side message publishing
│ │ └── consumer.py # Agent-side message consumption
│ │
│ ├── tools/ │ ├── tools/
│ │ ├── __init__.py │ │ ├── __init__.py
│ │ ├── server.py # create_sdk_mcp_server setup │ │ ├── server.py # create_sdk_mcp_server setup
@ -567,6 +573,105 @@ container:
- "PYTHONPATH=/workspace" - "PYTHONPATH=/workspace"
``` ```
### Designer Config (`config/agents/designer.yml`)
```yaml
id: designer
name: "Designer Dani"
slack_user_id: "${SLACK_DESIGNER_USER_ID}"
model: "sonnet"
max_turns: 50
system_prompt: |
You are Designer Dani, responsible for UI/UX design and frontend aesthetics.
Your responsibilities:
- Create and refine UI designs based on requirements
- Implement CSS/styling changes
- Ensure consistent design system usage
- Review frontend PRs for design consistency
- Provide design feedback and mockup descriptions
When working on design tasks:
1. Understand the user experience goals
2. Reference the existing design system (shadcn/ui, Tailwind)
3. Implement pixel-perfect styling
4. Ensure responsive design across breakpoints
5. Document design decisions in comments
You work closely with developers on frontend implementation.
allowed_tools:
- Read
- Write
- Edit
- Bash
- Glob
- Grep
- WebSearch
- WebFetch
# Slack tools
- mcp__agent__slack_send_message
- mcp__agent__slack_reply_thread
# Task tools
- mcp__agent__tasks_view
- mcp__agent__tasks_update_status
- mcp__agent__tasks_add_comment
# Git tools - can create PRs for design changes
- mcp__agent__gitea_create_pr
- mcp__agent__gitea_list_prs
- mcp__agent__gitea_add_comment
disallowed_tools:
- mcp__agent__slack_create_channel
- mcp__agent__slack_delete_channel
- mcp__agent__tasks_create_project
- mcp__agent__tasks_create_board
- mcp__agent__gitea_merge_pr
permissions:
filesystem:
allowed_paths:
- "/workspace/**/*.css"
- "/workspace/**/*.scss"
- "/workspace/**/*.tsx"
- "/workspace/**/*.jsx"
- "/workspace/**/components/**"
- "/workspace/**/styles/**"
- "/workspace/**/public/**"
denied_paths:
- "/workspace/**/.git/config"
- "**/.env"
- "**/secrets/**"
- "/workspace/**/api/**"
- "/workspace/**/server/**"
git:
branch_pattern: "design/*"
can_push_to:
- "design/*"
cannot_push_to:
- "main"
- "dev"
- "feature/*"
memory:
path: "/memory/designer"
files:
- "memory.md"
can_mention:
- developer
- product_manager
- tech_lead
container:
resources:
memory: "2g"
cpus: "1.0"
```
### Product Manager Config (`config/agents/product_manager.yml`) ### Product Manager Config (`config/agents/product_manager.yml`)
```yaml ```yaml
@ -1108,6 +1213,164 @@ class GiteaClient:
--- ---
## Message Queue Architecture (Redis)
Redis provides reliable message queuing between the orchestrator and agent containers, ensuring messages aren't lost if an agent is temporarily unavailable.
### Queue Structure
```
Redis Keys:
├── queue:agent:{agent_id}:inbox # Messages waiting to be processed
├── queue:agent:{agent_id}:processing # Currently being processed (for reliability)
├── queue:agent:{agent_id}:outbox # Responses ready for orchestrator
├── agent:{agent_id}:status # Agent health status (heartbeat)
└── agent:{agent_id}:current_task # Current task ID (for recovery)
```
### Message Flow with Redis
```
┌─────────────────────────────────────────────────────────────────┐
│ Orchestrator │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│ │ Slack │───▶│ Router │───▶│ Redis Publisher │ │
│ │ Listener │ │ │ │ (LPUSH to inbox) │ │
│ └─────────────┘ └─────────────┘ └─────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────┐ │ │
│ │ Response Handler │◀────────────┘ │
│ │ (BRPOP from outbox queues) │ │
│ └─────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│ Redis
┌─────────────────────────────────────────────────────────────────┐
│ Agent Container │
│ ┌─────────────────────────────────┐ │
│ │ Message Consumer │ │
│ │ (BRPOPLPUSH inbox→processing) │ │
│ └───────────────┬─────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────┐ │
│ │ Claude SDK query() │ │
│ │ - Inject memory │ │
│ │ - Execute tools │ │
│ │ - Update memory │ │
│ └───────────────┬─────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────┐ │
│ │ Response Publisher │ │
│ │ (LPUSH to outbox, DEL from │ │
│ │ processing) │ │
│ └─────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
```
### Reliability Features
1. **At-least-once delivery**: Using `BRPOPLPUSH` moves message to processing queue atomically
2. **Recovery on crash**: On startup, agents check processing queue for incomplete tasks
3. **Heartbeat monitoring**: Agents publish heartbeat to `agent:{id}:status` every 10s
4. **Dead letter queue**: Messages that fail 3 times move to `queue:dead_letter`
### Message Format
```python
# Inbox message (orchestrator → agent)
{
"id": "msg-uuid-123",
"timestamp": "2025-01-15T10:00:00Z",
"type": "slack_message",
"payload": {
"text": "@developer please implement the login feature",
"channel": "proj-website",
"thread_ts": "1234567890.123456",
"user": "U123456", # Slack user who sent it
"context": {
"thread_messages": [...], # Recent thread history
"mentioned_agents": ["developer"]
}
},
"reply_to": "queue:orchestrator:responses",
"retry_count": 0
}
# Outbox message (agent → orchestrator)
{
"id": "resp-uuid-456",
"request_id": "msg-uuid-123",
"timestamp": "2025-01-15T10:00:05Z",
"agent_id": "developer",
"payload": {
"text": "I'll start working on the login feature...",
"channel": "proj-website",
"thread_ts": "1234567890.123456",
"mentions": ["tech_lead"] # Agents to notify
}
}
```
### Redis Client Implementation
```python
"""
src/orchestra/queue/redis_client.py
"""
import redis.asyncio as redis
from typing import Optional
import json
class AgentQueue:
def __init__(self, agent_id: str, redis_url: str = "redis://localhost:6379"):
self.agent_id = agent_id
self.redis = redis.from_url(redis_url)
self.inbox = f"queue:agent:{agent_id}:inbox"
self.processing = f"queue:agent:{agent_id}:processing"
self.outbox = f"queue:agent:{agent_id}:outbox"
async def receive(self, timeout: int = 0) -> Optional[dict]:
"""Block until message available, move to processing atomically."""
result = await self.redis.brpoplpush(
self.inbox, self.processing, timeout=timeout
)
if result:
return json.loads(result)
return None
async def complete(self, message_id: str, response: dict) -> None:
"""Mark message complete and publish response."""
pipe = self.redis.pipeline()
# Remove from processing
pipe.lrem(self.processing, 1, message_id)
# Publish response
pipe.lpush(self.outbox, json.dumps(response))
await pipe.execute()
async def recover_incomplete(self) -> list[dict]:
"""On startup, recover any messages left in processing."""
messages = []
while True:
msg = await self.redis.rpoplpush(self.processing, self.inbox)
if not msg:
break
messages.append(json.loads(msg))
return messages
async def heartbeat(self) -> None:
"""Update agent status."""
await self.redis.set(
f"agent:{self.agent_id}:status",
json.dumps({"status": "healthy", "timestamp": datetime.utcnow().isoformat()}),
ex=30 # Expire after 30s if no heartbeat
)
```
---
## Slack Integration ## Slack Integration
### Event Flow ### Event Flow
@ -1127,12 +1390,17 @@ Slack Event (message/mention)
┌─────────────────┐ ┌─────────────────┐
Agent Queue │ ← Each agent has async message queue Redis Queue │ ← LPUSH to agent inbox (reliable delivery)
└────────┬────────┘ └────────┬────────┘
┌─────────────────┐ ┌─────────────────┐
│ Agent Process │ ← Claude SDK processes, may invoke tools │ Agent Process │ ← BRPOPLPUSH, Claude SDK processes
└────────┬────────┘
┌─────────────────┐
│ Redis Response │ ← LPUSH to outbox
└────────┬────────┘ └────────┬────────┘
@ -1345,6 +1613,22 @@ class MemoryManager:
version: '3.8' version: '3.8'
services: services:
# Redis for reliable message queuing between orchestrator and agents
redis:
image: redis:7-alpine
command: redis-server --appendonly yes
volumes:
- redis-data:/data
networks:
- orchestra-net
restart: unless-stopped
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
start_period: 10s
orchestrator: orchestrator:
build: build:
context: . context: .
@ -1352,15 +1636,31 @@ services:
environment: environment:
- SLACK_APP_TOKEN=${SLACK_APP_TOKEN} - SLACK_APP_TOKEN=${SLACK_APP_TOKEN}
- SLACK_BOT_TOKEN=${SLACK_BOT_TOKEN} - SLACK_BOT_TOKEN=${SLACK_BOT_TOKEN}
- REDIS_URL=redis://redis:6379
volumes: volumes:
- ./config:/app/config:ro - ./config:/app/config:ro
networks: networks:
- orchestra-net - orchestra-net
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 30s
depends_on: depends_on:
- agent-ceo redis:
- agent-pm condition: service_healthy
- agent-dev agent-ceo:
- agent-techlead condition: service_healthy
agent-pm:
condition: service_healthy
agent-dev:
condition: service_healthy
agent-techlead:
condition: service_healthy
agent-designer:
condition: service_healthy
# Full agent containers (SDK, Node.js, CLI) # Full agent containers (SDK, Node.js, CLI)
agent-ceo: agent-ceo:
@ -1373,6 +1673,7 @@ services:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- GITEA_URL=${GITEA_URL} - GITEA_URL=${GITEA_URL}
- GITEA_API_TOKEN=${GITEA_API_TOKEN} - GITEA_API_TOKEN=${GITEA_API_TOKEN}
- REDIS_URL=redis://redis:6379
volumes: volumes:
- ./config/agents/ceo.yml:/app/config/agent.yml:ro - ./config/agents/ceo.yml:/app/config/agent.yml:ro
- ./data/workspaces/ceo:/workspace - ./data/workspaces/ceo:/workspace
@ -1382,6 +1683,16 @@ services:
- "8001:8001" - "8001:8001"
networks: networks:
- orchestra-net - orchestra-net
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8001/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 60s
depends_on:
redis:
condition: service_healthy
agent-pm: agent-pm:
build: build:
@ -1393,6 +1704,7 @@ services:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- GITEA_URL=${GITEA_URL} - GITEA_URL=${GITEA_URL}
- GITEA_API_TOKEN=${GITEA_API_TOKEN} - GITEA_API_TOKEN=${GITEA_API_TOKEN}
- REDIS_URL=redis://redis:6379
volumes: volumes:
- ./config/agents/product_manager.yml:/app/config/agent.yml:ro - ./config/agents/product_manager.yml:/app/config/agent.yml:ro
- ./data/workspaces/product_manager:/workspace - ./data/workspaces/product_manager:/workspace
@ -1402,6 +1714,16 @@ services:
- "8002:8002" - "8002:8002"
networks: networks:
- orchestra-net - orchestra-net
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8002/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 60s
depends_on:
redis:
condition: service_healthy
agent-dev: agent-dev:
build: build:
@ -1413,6 +1735,7 @@ services:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- GITEA_URL=${GITEA_URL} - GITEA_URL=${GITEA_URL}
- GITEA_API_TOKEN=${GITEA_API_TOKEN} - GITEA_API_TOKEN=${GITEA_API_TOKEN}
- REDIS_URL=redis://redis:6379
volumes: volumes:
- ./config/agents/developer.yml:/app/config/agent.yml:ro - ./config/agents/developer.yml:/app/config/agent.yml:ro
- ./data/workspaces/developer:/workspace - ./data/workspaces/developer:/workspace
@ -1421,6 +1744,16 @@ services:
- "8003:8003" - "8003:8003"
networks: networks:
- orchestra-net - orchestra-net
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8003/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 60s
depends_on:
redis:
condition: service_healthy
agent-techlead: agent-techlead:
build: build:
@ -1432,6 +1765,7 @@ services:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- GITEA_URL=${GITEA_URL} - GITEA_URL=${GITEA_URL}
- GITEA_API_TOKEN=${GITEA_API_TOKEN} - GITEA_API_TOKEN=${GITEA_API_TOKEN}
- REDIS_URL=redis://redis:6379
volumes: volumes:
- ./config/agents/tech_lead.yml:/app/config/agent.yml:ro - ./config/agents/tech_lead.yml:/app/config/agent.yml:ro
- ./data/workspaces/tech_lead:/workspace - ./data/workspaces/tech_lead:/workspace
@ -1440,9 +1774,50 @@ services:
- "8004:8004" - "8004:8004"
networks: networks:
- orchestra-net - orchestra-net
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8004/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 60s
depends_on:
redis:
condition: service_healthy
agent-designer:
build:
context: .
dockerfile: Dockerfile.agent
environment:
- AGENT_ID=designer
- AGENT_PORT=8005
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- GITEA_URL=${GITEA_URL}
- GITEA_API_TOKEN=${GITEA_API_TOKEN}
- REDIS_URL=redis://redis:6379
volumes:
- ./config/agents/designer.yml:/app/config/agent.yml:ro
- ./data/workspaces/designer:/workspace
- ./data/memory/designer:/memory
ports:
- "8005:8005"
networks:
- orchestra-net
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8005/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 60s
depends_on:
redis:
condition: service_healthy
volumes: volumes:
orchestra-data: orchestra-data:
redis-data:
networks: networks:
orchestra-net: orchestra-net:
@ -1528,6 +1903,7 @@ dependencies = [
"anyio>=4.0.0", "anyio>=4.0.0",
"fastapi>=0.109.0", # HTTP API for agent containers "fastapi>=0.109.0", # HTTP API for agent containers
"uvicorn>=0.27.0", # ASGI server for FastAPI "uvicorn>=0.27.0", # ASGI server for FastAPI
"redis>=5.0.0", # Async Redis client for message queuing
] ]
[project.optional-dependencies] [project.optional-dependencies]
@ -1536,6 +1912,7 @@ dev = [
"pytest-asyncio>=0.23.0", "pytest-asyncio>=0.23.0",
"pytest-mock>=3.12.0", "pytest-mock>=3.12.0",
"ruff>=0.1.0", "ruff>=0.1.0",
"fakeredis>=2.20.0", # Redis mock for testing
] ]
``` ```