#!/usr/bin/env node /** * JSONL Inbox System — Structured inter-agent messaging * * Usage: * inbox.js send --from rivet --to builder --subject "Deploy fix" --body "Fix the auth bug" [--priority high] [--tag task] * inbox.js read --agent builder [--unread] [--limit 10] [--tag task] * inbox.js ack --agent builder --id * inbox.js ack-all --agent builder * inbox.js stats [--agent builder] * inbox.js list-agents * inbox.js retry --agent builder --id * inbox.js archive --agent builder [--before ] [--acked-only] * inbox.js broadcast --from rivet --subject "Fleet update" --body "..." [--exclude rivet] * * Each agent has: /home/ccuser/shared/inboxes/.jsonl * Messages are append-only. Acks are tracked in /home/ccuser/shared/inboxes/.acks.json * * Message format (one JSON object per line): * { * "id": "msg--", * "from": "rivet", * "to": "builder", * "subject": "Deploy fix", * "body": "Fix the auth bug on /api/ceo/fleet", * "priority": "normal", // low | normal | high | critical * "tag": "task", // task | info | question | alert | ack-request * "ts": "2026-02-17T21:30:00+11:00", * "epoch": 1739789400, * "replyTo": null, // msg-id if this is a reply * "ttl": 86400 // seconds until message expires (0 = never) * } */ const fs = require('fs'); const path = require('path'); const crypto = require('crypto'); const INBOX_DIR = '/home/ccuser/shared/inboxes'; const AGENTS = ['rivet', 'builder', 'susan', 'harper', 'sentinel', 'radar', 'herald', 'cog']; const ARCHIVE_DIR = path.join(INBOX_DIR, 'archive'); // Ensure directories exist if (!fs.existsSync(INBOX_DIR)) fs.mkdirSync(INBOX_DIR, { recursive: true }); if (!fs.existsSync(ARCHIVE_DIR)) fs.mkdirSync(ARCHIVE_DIR, { recursive: true }); function genId() { return `msg-${Date.now()}-${crypto.randomBytes(3).toString('hex')}`; } function inboxPath(agent) { return path.join(INBOX_DIR, `${agent}.jsonl`); } function acksPath(agent) { return path.join(INBOX_DIR, `${agent}.acks.json`); } function loadAcks(agent) { const p = acksPath(agent); if (!fs.existsSync(p)) return {}; try { return JSON.parse(fs.readFileSync(p, 'utf8')); } catch { return {}; } } function saveAcks(agent, acks) { fs.writeFileSync(acksPath(agent), JSON.stringify(acks, null, 2)); } function readInbox(agent) { const p = inboxPath(agent); if (!fs.existsSync(p)) return []; const lines = fs.readFileSync(p, 'utf8').trim().split('\n').filter(Boolean); const messages = []; for (const line of lines) { try { messages.push(JSON.parse(line)); } catch (e) { // Skip malformed lines } } return messages; } function appendMessage(agent, msg) { const p = inboxPath(agent); fs.appendFileSync(p, JSON.stringify(msg) + '\n'); } function parseArgs(argv) { const args = {}; const positional = []; for (let i = 0; i < argv.length; i++) { if (argv[i].startsWith('--')) { const key = argv[i].slice(2); const next = argv[i + 1]; if (next && !next.startsWith('--')) { args[key] = next; i++; } else { args[key] = true; } } else { positional.push(argv[i]); } } return { args, positional }; } function formatMessage(msg, acked) { const status = acked ? 'āœ…' : 'šŸ“©'; const pri = msg.priority === 'critical' ? '🚨' : msg.priority === 'high' ? 'āš ļø' : ''; return `${status} ${pri}[${msg.id}] from:${msg.from} ${msg.ts}\n ${msg.subject}\n ${msg.body ? msg.body.substring(0, 200) : '(no body)'}${msg.body && msg.body.length > 200 ? '...' : ''}`; } // Commands const commands = { send({ args }) { const { from, to, subject, body, priority, tag, ttl, replyTo } = args; if (!from || !to || !subject) { console.error('Usage: inbox.js send --from --to --subject "..." [--body "..."] [--priority low|normal|high|critical] [--tag task|info|question|alert]'); process.exit(1); } // Allow comma-separated recipients const recipients = to.split(',').map(r => r.trim()); const msgBase = { from, subject, body: body || '', priority: priority || 'normal', tag: tag || 'info', ts: new Date().toISOString(), epoch: Math.floor(Date.now() / 1000), replyTo: replyTo || null, ttl: parseInt(ttl) || 0 }; for (const recipient of recipients) { if (!AGENTS.includes(recipient)) { console.error(`Unknown agent: ${recipient}. Known: ${AGENTS.join(', ')}`); continue; } const msg = { ...msgBase, id: genId(), to: recipient }; appendMessage(recipient, msg); console.log(`āœ‰ļø Sent ${msg.id} → ${recipient}: ${subject}`); } }, read({ args }) { const { agent, unread, limit, tag, from: fromFilter } = args; if (!agent) { console.error('Usage: inbox.js read --agent [--unread] [--limit N] [--tag task] [--from rivet]'); process.exit(1); } let messages = readInbox(agent); const acks = loadAcks(agent); const now = Math.floor(Date.now() / 1000); // Filter expired messages = messages.filter(m => !m.ttl || (now - m.epoch) < m.ttl); if (unread) { messages = messages.filter(m => !acks[m.id]); } if (tag) { messages = messages.filter(m => m.tag === tag); } if (fromFilter) { messages = messages.filter(m => m.from === fromFilter); } const maxN = parseInt(limit) || messages.length; const shown = messages.slice(-maxN); if (shown.length === 0) { console.log(`šŸ“­ ${agent}: No ${unread ? 'unread ' : ''}messages${tag ? ` tagged [${tag}]` : ''}`); return; } console.log(`šŸ“¬ ${agent}: ${shown.length} message(s)${unread ? ' (unread)' : ''}:\n`); for (const msg of shown) { console.log(formatMessage(msg, !!acks[msg.id])); console.log(); } }, ack({ args }) { const { agent, id } = args; if (!agent || !id) { console.error('Usage: inbox.js ack --agent --id '); process.exit(1); } const acks = loadAcks(agent); acks[id] = { at: new Date().toISOString(), epoch: Math.floor(Date.now() / 1000) }; saveAcks(agent, acks); console.log(`āœ… Acked ${id} for ${agent}`); }, 'ack-all'({ args }) { const { agent } = args; if (!agent) { console.error('Usage: inbox.js ack-all --agent '); process.exit(1); } const messages = readInbox(agent); const acks = loadAcks(agent); let count = 0; for (const msg of messages) { if (!acks[msg.id]) { acks[msg.id] = { at: new Date().toISOString(), epoch: Math.floor(Date.now() / 1000) }; count++; } } saveAcks(agent, acks); console.log(`āœ… Acked ${count} messages for ${agent}`); }, stats({ args }) { const { agent } = args; const agents = agent ? [agent] : AGENTS; console.log('šŸ“Š Inbox Stats:\n'); console.log('Agent | Total | Unread | High/Crit | Oldest Unread'); console.log('-----------|-------|--------|-----------|------------------'); for (const a of agents) { const messages = readInbox(a); const acks = loadAcks(a); const now = Math.floor(Date.now() / 1000); const valid = messages.filter(m => !m.ttl || (now - m.epoch) < m.ttl); const unread = valid.filter(m => !acks[m.id]); const urgent = unread.filter(m => m.priority === 'high' || m.priority === 'critical'); const oldest = unread.length > 0 ? unread[0].ts.substring(0, 16) : '-'; console.log(`${a.padEnd(10)} | ${String(valid.length).padStart(5)} | ${String(unread.length).padStart(6)} | ${String(urgent.length).padStart(9)} | ${oldest}`); } }, 'list-agents'() { console.log('Known agents:', AGENTS.join(', ')); }, broadcast({ args }) { const { from, subject, body, priority, tag, exclude } = args; if (!from || !subject) { console.error('Usage: inbox.js broadcast --from --subject "..." [--body "..."] [--exclude agent1,agent2]'); process.exit(1); } const excluded = exclude ? exclude.split(',').map(e => e.trim()) : []; const recipients = AGENTS.filter(a => a !== from && !excluded.includes(a)); for (const to of recipients) { const msg = { id: genId(), from, to, subject, body: body || '', priority: priority || 'normal', tag: tag || 'info', ts: new Date().toISOString(), epoch: Math.floor(Date.now() / 1000), replyTo: null, ttl: 0 }; appendMessage(to, msg); console.log(`āœ‰ļø → ${to}`); } console.log(`\nšŸ“¢ Broadcast to ${recipients.length} agents`); }, archive({ args }) { const { agent, before, 'acked-only': ackedOnly } = args; if (!agent) { console.error('Usage: inbox.js archive --agent [--before 2026-02-17] [--acked-only]'); process.exit(1); } const messages = readInbox(agent); const acks = loadAcks(agent); const cutoff = before ? new Date(before).getTime() / 1000 : Infinity; const toArchive = []; const toKeep = []; for (const msg of messages) { const isOld = msg.epoch < cutoff; const isAcked = !!acks[msg.id]; if ((ackedOnly && isAcked && isOld) || (!ackedOnly && isOld)) { toArchive.push(msg); delete acks[msg.id]; } else { toKeep.push(msg); } } if (toArchive.length === 0) { console.log('Nothing to archive.'); return; } // Write archive const archiveFile = path.join(ARCHIVE_DIR, `${agent}-${new Date().toISOString().substring(0, 10)}.jsonl`); fs.appendFileSync(archiveFile, toArchive.map(m => JSON.stringify(m)).join('\n') + '\n'); // Rewrite inbox with kept messages fs.writeFileSync(inboxPath(agent), toKeep.map(m => JSON.stringify(m)).join('\n') + (toKeep.length ? '\n' : '')); saveAcks(agent, acks); console.log(`šŸ“¦ Archived ${toArchive.length} messages from ${agent}, ${toKeep.length} remaining`); }, retry({ args }) { const { agent, id } = args; if (!agent || !id) { console.error('Usage: inbox.js retry --agent --id '); process.exit(1); } const acks = loadAcks(agent); if (acks[id]) { delete acks[id]; saveAcks(agent, acks); console.log(`šŸ”„ Unacked ${id} for ${agent} — will show as unread again`); } else { console.log(`${id} was already unread for ${agent}`); } } }; // Main const { args, positional } = parseArgs(process.argv.slice(2)); const command = positional[0]; if (!command || !commands[command]) { console.log(` šŸ“¬ JSONL Inbox System — Inter-Agent Messaging Commands: send Send a message to an agent read Read an agent's inbox ack Acknowledge a message ack-all Acknowledge all messages for an agent stats Show inbox statistics list-agents List known agents broadcast Send to all agents archive Archive old/acked messages retry Mark a message as unread Examples: node inbox.js send --from rivet --to builder --subject "Deploy fix" --body "Fix auth" --priority high --tag task node inbox.js read --agent builder --unread node inbox.js ack --agent builder --id msg-1234567890-abc123 node inbox.js stats node inbox.js broadcast --from rivet --subject "Standup" --body "Report status" --exclude rivet node inbox.js archive --agent builder --acked-only `); process.exit(0); } commands[command]({ args, positional });