Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 50 additions & 2 deletions gateway-bridge/bridge.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ for (const warning of gatewayAliasWarnings) {
const SOCKET_DIR = path.join(homedir(), ".pi", "session-control");
const AGENT_TIMEOUT_MS = 120_000;
const API_PORT = parseInt(process.env.BRIDGE_API_PORT || "7890", 10);
const REPLY_LEDGER_PATH = path.join(homedir(), ".pi", "agent", "slack-reply-log.jsonl");
const REPLY_LEDGER_ROTATED_PATH = `${REPLY_LEDGER_PATH}.1`;
const REPLY_LEDGER_MAX_BYTES = 10 * 1024 * 1024;

// Validate required env vars
for (const key of ["SLACK_BOT_TOKEN", "SLACK_APP_TOKEN"]) {
Expand Down Expand Up @@ -116,6 +119,43 @@ function resolveAckReaction(channel, threadTs) {
});
}

/**
* Append a durable outbound-reply proof entry used by heartbeat reply detection.
* This tracks replies from both /send (with thread_ts) and /reply endpoints.
*/
function rotateReplyLedgerIfNeeded() {
try {
const stats = fs.statSync(REPLY_LEDGER_PATH);
if (stats.size < REPLY_LEDGER_MAX_BYTES) return;
fs.renameSync(REPLY_LEDGER_PATH, REPLY_LEDGER_ROTATED_PATH);
} catch (err) {
if (err && typeof err === "object" && "code" in err && err.code === "ENOENT") return;
console.warn(`⚠️ failed to rotate reply ledger: ${err instanceof Error ? err.message : String(err)}`);
}
}

function appendReplyLedgerEntry({ channel, threadTs, route }) {
if (!threadTs) return;

const entry = {
channel,
thread_ts: threadTs,
route,
replied_at: new Date().toISOString(),
};

try {
fs.mkdirSync(path.dirname(REPLY_LEDGER_PATH), { recursive: true });
rotateReplyLedgerIfNeeded();
fs.appendFileSync(REPLY_LEDGER_PATH, `${JSON.stringify(entry)}\n`, {
encoding: "utf-8",
mode: 0o600,
});
} catch (err) {
console.warn(`⚠️ failed to append reply ledger entry: ${err instanceof Error ? err.message : String(err)}`);
}
}

/**
* Evict the oldest entries when the registry exceeds MAX_THREADS.
* Maps iterate in insertion order, so the first entries are the oldest.
Expand Down Expand Up @@ -490,9 +530,11 @@ function startApiServer() {

console.log(`📤 Sent to ${channel}: ${text.slice(0, 80)}${text.length > 80 ? "..." : ""}`);

// If this is a threaded reply, check for a pending ✅ ack reaction.
// If this is a threaded reply, check for a pending ✅ ack reaction
// and append a durable reply ledger entry for heartbeat detection.
if (thread_ts) {
resolveAckReaction(channel, thread_ts);
appendReplyLedgerEntry({ channel, threadTs: thread_ts, route: "/send" });
}

res.writeHead(200, { "Content-Type": "application/json" });
Expand Down Expand Up @@ -534,8 +576,14 @@ function startApiServer() {

console.log(`📤 Reply to ${thread_id} (${thread.channel}): ${text.slice(0, 80)}${text.length > 80 ? "..." : ""}`);

// Check for a pending ✅ ack reaction on the /reply path too.
// Check for a pending ✅ ack reaction on the /reply path too,
// and append a durable reply ledger entry for heartbeat detection.
resolveAckReaction(thread.channel, thread.thread_ts);
appendReplyLedgerEntry({
channel: thread.channel,
threadTs: thread.thread_ts,
route: "/reply",
});

res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify({ ok: true, ts: result.ts, channel: result.channel }));
Expand Down
52 changes: 50 additions & 2 deletions gateway-bridge/broker-bridge.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ function clampInt(value, min, max, fallback) {
}

const API_PORT = clampInt(process.env.BRIDGE_API_PORT || "7890", 0, 65535, 7890);
const REPLY_LEDGER_PATH = path.join(homedir(), ".pi", "agent", "slack-reply-log.jsonl");
const REPLY_LEDGER_ROTATED_PATH = `${REPLY_LEDGER_PATH}.1`;
const REPLY_LEDGER_MAX_BYTES = 10 * 1024 * 1024;
const POLL_INTERVAL_MS = clampInt(process.env.SLACK_BROKER_POLL_INTERVAL_MS || "3000", 0, 60_000, 3000);
const MAX_MESSAGES = clampInt(process.env.SLACK_BROKER_MAX_MESSAGES || "10", 1, 100, 10);
const MAX_WAIT_SECONDS = 25;
Expand Down Expand Up @@ -196,6 +199,43 @@ function resolveAckReaction(channel, threadTs) {
});
}

/**
* Append a durable outbound-reply proof entry used by heartbeat reply detection.
* This tracks replies from both /send (with thread_ts) and /reply endpoints.
*/
function rotateReplyLedgerIfNeeded() {
try {
const stats = fs.statSync(REPLY_LEDGER_PATH);
if (stats.size < REPLY_LEDGER_MAX_BYTES) return;
fs.renameSync(REPLY_LEDGER_PATH, REPLY_LEDGER_ROTATED_PATH);
} catch (err) {
if (err && typeof err === "object" && "code" in err && err.code === "ENOENT") return;
logWarn(`⚠️ failed to rotate reply ledger: ${err instanceof Error ? err.message : String(err)}`);
}
}

function appendReplyLedgerEntry({ channel, threadTs, route }) {
if (!threadTs) return;

const entry = {
channel,
thread_ts: threadTs,
route,
replied_at: new Date().toISOString(),
};

try {
fs.mkdirSync(path.dirname(REPLY_LEDGER_PATH), { recursive: true });
rotateReplyLedgerIfNeeded();
fs.appendFileSync(REPLY_LEDGER_PATH, `${JSON.stringify(entry)}\n`, {
encoding: "utf-8",
mode: 0o600,
});
} catch (err) {
logWarn(`⚠️ failed to append reply ledger entry: ${err instanceof Error ? err.message : String(err)}`);
}
}

let socketPath = null;

let cryptoState = null;
Expand Down Expand Up @@ -1115,9 +1155,11 @@ function startApiServer() {
actionRequestBody: { text: safeText },
});

// If this is a threaded reply, check for a pending ✅ ack reaction.
// If this is a threaded reply, check for a pending ✅ ack reaction
// and append a durable reply ledger entry for heartbeat detection.
if (thread_ts) {
resolveAckReaction(channel, thread_ts);
appendReplyLedgerEntry({ channel, threadTs: thread_ts, route: "/send" });
}

res.writeHead(200, { "Content-Type": "application/json" });
Expand Down Expand Up @@ -1152,8 +1194,14 @@ function startApiServer() {
actionRequestBody: { text: safeText },
});

// Check for a pending ✅ ack reaction on the /reply path too.
// Check for a pending ✅ ack reaction on the /reply path too,
// and append a durable reply ledger entry for heartbeat detection.
resolveAckReaction(thread.channel, thread.thread_ts);
appendReplyLedgerEntry({
channel: thread.channel,
threadTs: thread.thread_ts,
route: "/reply",
});

res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify({ ok: true, ts: result.ts }));
Expand Down
39 changes: 35 additions & 4 deletions pi/extensions/heartbeat.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ function hasReplyLogEntry(replyLogContent, threadTs) {
return false;
}

function hasOutboundSendCommand(sessionJsonlContent, threadTs) {
function hasOutboundBridgeReplyCommand(sessionJsonlContent, threadTs) {
const escapedThreadTs = threadTs.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
const threadTsPattern = new RegExp(`["']thread_ts["']\\s*:\\s*["']${escapedThreadTs}["']`);

Expand All @@ -108,7 +108,7 @@ function hasOutboundSendCommand(sessionJsonlContent, threadTs) {
if (item?.name !== "bash") continue;
const command = typeof item?.arguments?.command === "string" ? item.arguments.command : "";
if (!command.includes("curl")) continue;
if (!command.includes("/send")) continue;
if (!command.includes("/send") && !command.includes("/reply")) continue;
if (!threadTsPattern.test(command)) continue;
return true;
}
Expand Down Expand Up @@ -470,6 +470,16 @@ describe("heartbeat v2: unanswered mention reply detection", () => {
assert.equal(hasReplyLogEntry(log, "9999.0000"), false);
});

it("matches reply-log entries emitted from both /send and /reply routes", () => {
const log = [
'{"channel":"C111","thread_ts":"3456.7890","route":"/send","replied_at":"2026-02-27T00:10:00Z"}',
'{"channel":"C111","thread_ts":"4567.8901","route":"/reply","replied_at":"2026-02-27T00:11:00Z"}',
].join("\n");

assert.equal(hasReplyLogEntry(log, "3456.7890"), true);
assert.equal(hasReplyLogEntry(log, "4567.8901"), true);
});

it("ignores malformed reply-log lines", () => {
const log = ['{"thread_ts":"1234.5678"}', 'not-json', '{"thread_ts":"2345.6789"}'].join("\n");
assert.equal(hasReplyLogEntry(log, "2345.6789"), true);
Expand All @@ -493,7 +503,28 @@ describe("heartbeat v2: unanswered mention reply detection", () => {
},
});

assert.equal(hasOutboundSendCommand(session, "1234.5678"), true);
assert.equal(hasOutboundBridgeReplyCommand(session, "1234.5678"), true);
});

it("detects outbound curl /reply with matching thread_ts", () => {
const session = JSON.stringify({
type: "message",
message: {
role: "assistant",
content: [
{
type: "toolCall",
name: "bash",
arguments: {
command:
"curl -s -X POST http://127.0.0.1:7890/reply -H 'Content-Type: application/json' -d '{\"thread_id\":\"thread-1\",\"text\":\"hi\",\"thread_ts\":\"4567.8901\"}'",
},
},
],
},
});

assert.equal(hasOutboundBridgeReplyCommand(session, "4567.8901"), true);
});

it("does not treat inbound text containing thread_ts as a reply", () => {
Expand All @@ -505,7 +536,7 @@ describe("heartbeat v2: unanswered mention reply detection", () => {
},
});

assert.equal(hasOutboundSendCommand(inboundOnly, "1234.5678"), false);
assert.equal(hasOutboundBridgeReplyCommand(inboundOnly, "1234.5678"), false);
});
});

Expand Down
Loading