diff --git a/PLAN.md b/PLAN.md index b9f0cc1..7fe2f23 100644 --- a/PLAN.md +++ b/PLAN.md @@ -250,6 +250,12 @@ agent-orchestra/ │ │ ├── events.py # Event handlers, routing │ │ └── formatter.py # Message formatting │ │ +│ ├── queue/ +│ │ ├── __init__.py +│ │ ├── redis_client.py # AgentQueue class for message handling +│ │ ├── publisher.py # Orchestrator-side message publishing +│ │ └── consumer.py # Agent-side message consumption +│ │ │ ├── tools/ │ │ ├── __init__.py │ │ ├── server.py # create_sdk_mcp_server setup @@ -567,6 +573,105 @@ container: - "PYTHONPATH=/workspace" ``` +### Designer Config (`config/agents/designer.yml`) + +```yaml +id: designer +name: "Designer Dani" +slack_user_id: "${SLACK_DESIGNER_USER_ID}" + +model: "sonnet" +max_turns: 50 + +system_prompt: | + You are Designer Dani, responsible for UI/UX design and frontend aesthetics. + + Your responsibilities: + - Create and refine UI designs based on requirements + - Implement CSS/styling changes + - Ensure consistent design system usage + - Review frontend PRs for design consistency + - Provide design feedback and mockup descriptions + + When working on design tasks: + 1. Understand the user experience goals + 2. Reference the existing design system (shadcn/ui, Tailwind) + 3. Implement pixel-perfect styling + 4. Ensure responsive design across breakpoints + 5. Document design decisions in comments + + You work closely with developers on frontend implementation. + +allowed_tools: + - Read + - Write + - Edit + - Bash + - Glob + - Grep + - WebSearch + - WebFetch + # Slack tools + - mcp__agent__slack_send_message + - mcp__agent__slack_reply_thread + # Task tools + - mcp__agent__tasks_view + - mcp__agent__tasks_update_status + - mcp__agent__tasks_add_comment + # Git tools - can create PRs for design changes + - mcp__agent__gitea_create_pr + - mcp__agent__gitea_list_prs + - mcp__agent__gitea_add_comment + +disallowed_tools: + - mcp__agent__slack_create_channel + - mcp__agent__slack_delete_channel + - mcp__agent__tasks_create_project + - mcp__agent__tasks_create_board + - mcp__agent__gitea_merge_pr + +permissions: + filesystem: + allowed_paths: + - "/workspace/**/*.css" + - "/workspace/**/*.scss" + - "/workspace/**/*.tsx" + - "/workspace/**/*.jsx" + - "/workspace/**/components/**" + - "/workspace/**/styles/**" + - "/workspace/**/public/**" + denied_paths: + - "/workspace/**/.git/config" + - "**/.env" + - "**/secrets/**" + - "/workspace/**/api/**" + - "/workspace/**/server/**" + + git: + branch_pattern: "design/*" + can_push_to: + - "design/*" + cannot_push_to: + - "main" + - "dev" + - "feature/*" + +memory: + path: "/memory/designer" + files: + - "memory.md" + +can_mention: + - developer + - product_manager + - tech_lead + +container: + resources: + memory: "2g" + cpus: "1.0" +``` + ### Product Manager Config (`config/agents/product_manager.yml`) ```yaml @@ -1108,6 +1213,164 @@ class GiteaClient: --- +## Message Queue Architecture (Redis) + +Redis provides reliable message queuing between the orchestrator and agent containers, ensuring messages aren't lost if an agent is temporarily unavailable. + +### Queue Structure + +``` +Redis Keys: +├── queue:agent:{agent_id}:inbox # Messages waiting to be processed +├── queue:agent:{agent_id}:processing # Currently being processed (for reliability) +├── queue:agent:{agent_id}:outbox # Responses ready for orchestrator +├── agent:{agent_id}:status # Agent health status (heartbeat) +└── agent:{agent_id}:current_task # Current task ID (for recovery) +``` + +### Message Flow with Redis + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ Orchestrator │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │ +│ │ Slack │───▶│ Router │───▶│ Redis Publisher │ │ +│ │ Listener │ │ │ │ (LPUSH to inbox) │ │ +│ └─────────────┘ └─────────────┘ └─────────────────────┘ │ +│ │ │ +│ ┌─────────────────────────────────┐ │ │ +│ │ Response Handler │◀────────────┘ │ +│ │ (BRPOP from outbox queues) │ │ +│ └─────────────────────────────────┘ │ +└─────────────────────────────────────────────────────────────────┘ + │ + │ Redis + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ Agent Container │ +│ ┌─────────────────────────────────┐ │ +│ │ Message Consumer │ │ +│ │ (BRPOPLPUSH inbox→processing) │ │ +│ └───────────────┬─────────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌─────────────────────────────────┐ │ +│ │ Claude SDK query() │ │ +│ │ - Inject memory │ │ +│ │ - Execute tools │ │ +│ │ - Update memory │ │ +│ └───────────────┬─────────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌─────────────────────────────────┐ │ +│ │ Response Publisher │ │ +│ │ (LPUSH to outbox, DEL from │ │ +│ │ processing) │ │ +│ └─────────────────────────────────┘ │ +└─────────────────────────────────────────────────────────────────┘ +``` + +### Reliability Features + +1. **At-least-once delivery**: Using `BRPOPLPUSH` moves message to processing queue atomically +2. **Recovery on crash**: On startup, agents check processing queue for incomplete tasks +3. **Heartbeat monitoring**: Agents publish heartbeat to `agent:{id}:status` every 10s +4. **Dead letter queue**: Messages that fail 3 times move to `queue:dead_letter` + +### Message Format + +```python +# Inbox message (orchestrator → agent) +{ + "id": "msg-uuid-123", + "timestamp": "2025-01-15T10:00:00Z", + "type": "slack_message", + "payload": { + "text": "@developer please implement the login feature", + "channel": "proj-website", + "thread_ts": "1234567890.123456", + "user": "U123456", # Slack user who sent it + "context": { + "thread_messages": [...], # Recent thread history + "mentioned_agents": ["developer"] + } + }, + "reply_to": "queue:orchestrator:responses", + "retry_count": 0 +} + +# Outbox message (agent → orchestrator) +{ + "id": "resp-uuid-456", + "request_id": "msg-uuid-123", + "timestamp": "2025-01-15T10:00:05Z", + "agent_id": "developer", + "payload": { + "text": "I'll start working on the login feature...", + "channel": "proj-website", + "thread_ts": "1234567890.123456", + "mentions": ["tech_lead"] # Agents to notify + } +} +``` + +### Redis Client Implementation + +```python +""" +src/orchestra/queue/redis_client.py +""" +import redis.asyncio as redis +from typing import Optional +import json + +class AgentQueue: + def __init__(self, agent_id: str, redis_url: str = "redis://localhost:6379"): + self.agent_id = agent_id + self.redis = redis.from_url(redis_url) + self.inbox = f"queue:agent:{agent_id}:inbox" + self.processing = f"queue:agent:{agent_id}:processing" + self.outbox = f"queue:agent:{agent_id}:outbox" + + async def receive(self, timeout: int = 0) -> Optional[dict]: + """Block until message available, move to processing atomically.""" + result = await self.redis.brpoplpush( + self.inbox, self.processing, timeout=timeout + ) + if result: + return json.loads(result) + return None + + async def complete(self, message_id: str, response: dict) -> None: + """Mark message complete and publish response.""" + pipe = self.redis.pipeline() + # Remove from processing + pipe.lrem(self.processing, 1, message_id) + # Publish response + pipe.lpush(self.outbox, json.dumps(response)) + await pipe.execute() + + async def recover_incomplete(self) -> list[dict]: + """On startup, recover any messages left in processing.""" + messages = [] + while True: + msg = await self.redis.rpoplpush(self.processing, self.inbox) + if not msg: + break + messages.append(json.loads(msg)) + return messages + + async def heartbeat(self) -> None: + """Update agent status.""" + await self.redis.set( + f"agent:{self.agent_id}:status", + json.dumps({"status": "healthy", "timestamp": datetime.utcnow().isoformat()}), + ex=30 # Expire after 30s if no heartbeat + ) +``` + +--- + ## Slack Integration ### Event Flow @@ -1127,12 +1390,17 @@ Slack Event (message/mention) │ ▼ ┌─────────────────┐ -│ Agent Queue │ ← Each agent has async message queue +│ Redis Queue │ ← LPUSH to agent inbox (reliable delivery) └────────┬────────┘ │ ▼ ┌─────────────────┐ -│ Agent Process │ ← Claude SDK processes, may invoke tools +│ Agent Process │ ← BRPOPLPUSH, Claude SDK processes +└────────┬────────┘ + │ + ▼ +┌─────────────────┐ +│ Redis Response │ ← LPUSH to outbox └────────┬────────┘ │ ▼ @@ -1345,6 +1613,22 @@ class MemoryManager: version: '3.8' services: + # Redis for reliable message queuing between orchestrator and agents + redis: + image: redis:7-alpine + command: redis-server --appendonly yes + volumes: + - redis-data:/data + networks: + - orchestra-net + restart: unless-stopped + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 10s + orchestrator: build: context: . @@ -1352,15 +1636,31 @@ services: environment: - SLACK_APP_TOKEN=${SLACK_APP_TOKEN} - SLACK_BOT_TOKEN=${SLACK_BOT_TOKEN} + - REDIS_URL=redis://redis:6379 volumes: - ./config:/app/config:ro networks: - orchestra-net + restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8000/health"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 30s depends_on: - - agent-ceo - - agent-pm - - agent-dev - - agent-techlead + redis: + condition: service_healthy + agent-ceo: + condition: service_healthy + agent-pm: + condition: service_healthy + agent-dev: + condition: service_healthy + agent-techlead: + condition: service_healthy + agent-designer: + condition: service_healthy # Full agent containers (SDK, Node.js, CLI) agent-ceo: @@ -1373,6 +1673,7 @@ services: - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} - GITEA_URL=${GITEA_URL} - GITEA_API_TOKEN=${GITEA_API_TOKEN} + - REDIS_URL=redis://redis:6379 volumes: - ./config/agents/ceo.yml:/app/config/agent.yml:ro - ./data/workspaces/ceo:/workspace @@ -1382,6 +1683,16 @@ services: - "8001:8001" networks: - orchestra-net + restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8001/health"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 60s + depends_on: + redis: + condition: service_healthy agent-pm: build: @@ -1393,6 +1704,7 @@ services: - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} - GITEA_URL=${GITEA_URL} - GITEA_API_TOKEN=${GITEA_API_TOKEN} + - REDIS_URL=redis://redis:6379 volumes: - ./config/agents/product_manager.yml:/app/config/agent.yml:ro - ./data/workspaces/product_manager:/workspace @@ -1402,6 +1714,16 @@ services: - "8002:8002" networks: - orchestra-net + restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8002/health"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 60s + depends_on: + redis: + condition: service_healthy agent-dev: build: @@ -1413,6 +1735,7 @@ services: - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} - GITEA_URL=${GITEA_URL} - GITEA_API_TOKEN=${GITEA_API_TOKEN} + - REDIS_URL=redis://redis:6379 volumes: - ./config/agents/developer.yml:/app/config/agent.yml:ro - ./data/workspaces/developer:/workspace @@ -1421,6 +1744,16 @@ services: - "8003:8003" networks: - orchestra-net + restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8003/health"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 60s + depends_on: + redis: + condition: service_healthy agent-techlead: build: @@ -1432,6 +1765,7 @@ services: - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} - GITEA_URL=${GITEA_URL} - GITEA_API_TOKEN=${GITEA_API_TOKEN} + - REDIS_URL=redis://redis:6379 volumes: - ./config/agents/tech_lead.yml:/app/config/agent.yml:ro - ./data/workspaces/tech_lead:/workspace @@ -1440,9 +1774,50 @@ services: - "8004:8004" networks: - orchestra-net + restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8004/health"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 60s + depends_on: + redis: + condition: service_healthy + + agent-designer: + build: + context: . + dockerfile: Dockerfile.agent + environment: + - AGENT_ID=designer + - AGENT_PORT=8005 + - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} + - GITEA_URL=${GITEA_URL} + - GITEA_API_TOKEN=${GITEA_API_TOKEN} + - REDIS_URL=redis://redis:6379 + volumes: + - ./config/agents/designer.yml:/app/config/agent.yml:ro + - ./data/workspaces/designer:/workspace + - ./data/memory/designer:/memory + ports: + - "8005:8005" + networks: + - orchestra-net + restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8005/health"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 60s + depends_on: + redis: + condition: service_healthy volumes: orchestra-data: + redis-data: networks: orchestra-net: @@ -1528,6 +1903,7 @@ dependencies = [ "anyio>=4.0.0", "fastapi>=0.109.0", # HTTP API for agent containers "uvicorn>=0.27.0", # ASGI server for FastAPI + "redis>=5.0.0", # Async Redis client for message queuing ] [project.optional-dependencies] @@ -1536,6 +1912,7 @@ dev = [ "pytest-asyncio>=0.23.0", "pytest-mock>=3.12.0", "ruff>=0.1.0", + "fakeredis>=2.20.0", # Redis mock for testing ] ```