import type { RuntimeEnv, ReplyPayload, OpenClawConfig } from "openclaw/plugin-sdk/tlon";
import { createLoggerBackedRuntime, createReplyPrefixOptions } from "openclaw/plugin-sdk/tlon";
import { getTlonRuntime } from "../runtime.js";
import { createSettingsManager, type TlonSettingsStore } from "../settings.js";
import { normalizeShip, parseChannelNest } from "../targets.js";
import { resolveTlonAccount } from "../types.js";
import { authenticate } from "../urbit/auth.js";
import { ssrfPolicyFromAllowPrivateNetwork } from "../urbit/context.js";
import type { Foreigns, DmInvite } from "../urbit/foreigns.js";
import { sendDm, sendGroupMessage } from "../urbit/send.js";
import { UrbitSSEClient } from "../urbit/sse-client.js";
import {
  type PendingApproval,
  type AdminCommand,
  createPendingApproval,
  formatApprovalRequest,
  formatApprovalConfirmation,
  parseApprovalResponse,
  isApprovalResponse,
  findPendingApproval,
  removePendingApproval,
  parseAdminCommand,
  isAdminCommand,
  formatBlockedList,
  formatPendingList,
} from "./approval.js";
import { fetchAllChannels, fetchInitData } from "./discovery.js";
import { cacheMessage, getChannelHistory, fetchThreadHistory } from "./history.js";
import { downloadMessageImages } from "./media.js";
import { createProcessedMessageTracker } from "./processed-messages.js";
import {
  extractMessageText,
  extractCites,
  formatModelName,
  isBotMentioned,
  stripBotMention,
  isDmAllowed,
  isSummarizationRequest,
  type ParsedCite,
} from "./utils.js";

export type MonitorTlonOpts = {
  runtime?: RuntimeEnv;
  abortSignal?: AbortSignal;
  accountId?: string | null;
};

type ChannelAuthorization = {
  mode?: "restricted" | "open";
  allowedShips?: string[];
};

/**
 * Resolve channel authorization by merging file config with settings store.
 * Settings store takes precedence for fields it defines.
 */
function resolveChannelAuthorization(
  cfg: OpenClawConfig,
  channelNest: string,
  settings?: TlonSettingsStore,
): { mode: "restricted" | "open"; allowedShips: string[] } {
  const tlonConfig = cfg.channels?.tlon as
    | {
        authorization?: { channelRules?: Record<string, ChannelAuthorization> };
        defaultAuthorizedShips?: string[];
      }
    | undefined;

  // Merge channel rules: settings override file config
  const fileRules = tlonConfig?.authorization?.channelRules ?? {};
  const settingsRules = settings?.channelRules ?? {};
  const rule = settingsRules[channelNest] ?? fileRules[channelNest];

  // Merge default authorized ships: settings override file config
  const defaultShips = settings?.defaultAuthorizedShips ?? tlonConfig?.defaultAuthorizedShips ?? [];

  const allowedShips = rule?.allowedShips ?? defaultShips;
  const mode = rule?.mode ?? "restricted";
  return { mode, allowedShips };
}

export async function monitorTlonProvider(opts: MonitorTlonOpts = {}): Promise<void> {
  const core = getTlonRuntime();
  const cfg = core.config.loadConfig() as OpenClawConfig;
  if (cfg.channels?.tlon?.enabled === false) {
    return;
  }

  const logger = core.logging.getChildLogger({ module: "tlon-auto-reply" });
  const runtime: RuntimeEnv =
    opts.runtime ??
    createLoggerBackedRuntime({
      logger,
    });

  const account = resolveTlonAccount(cfg, opts.accountId ?? undefined);
  if (!account.enabled) {
    return;
  }
  if (!account.configured || !account.ship || !account.url || !account.code) {
    throw new Error("Tlon account not configured (ship/url/code required)");
  }

  const botShipName = normalizeShip(account.ship);
  runtime.log?.(`[tlon] Starting monitor for ${botShipName}`);

  const ssrfPolicy = ssrfPolicyFromAllowPrivateNetwork(account.allowPrivateNetwork);

  // Store validated values for use in closures (TypeScript narrowing doesn't propagate)
  const accountUrl = account.url;
  const accountCode = account.code;

  // Helper to authenticate with retry logic
  async function authenticateWithRetry(maxAttempts = 10): Promise<string> {
    for (let attempt = 1; ; attempt++) {
      if (opts.abortSignal?.aborted) {
        throw new Error("Aborted while waiting to authenticate");
      }
      try {
        runtime.log?.(`[tlon] Attempting authentication to ${accountUrl}...`);
        return await authenticate(accountUrl, accountCode, { ssrfPolicy });
      } catch (error: any) {
        runtime.error?.(
          `[tlon] Failed to authenticate (attempt ${attempt}): ${error?.message ?? String(error)}`,
        );
        if (attempt >= maxAttempts) {
          throw error;
        }
        const delay = Math.min(30000, 1000 * Math.pow(2, attempt - 1));
        runtime.log?.(`[tlon] Retrying authentication in ${delay}ms...`);
        await new Promise<void>((resolve, reject) => {
          const timer = setTimeout(resolve, delay);
          if (opts.abortSignal) {
            const onAbort = () => {
              clearTimeout(timer);
              reject(new Error("Aborted"));
            };
            opts.abortSignal.addEventListener("abort", onAbort, { once: true });
          }
        });
      }
    }
  }

  let api: UrbitSSEClient | null = null;
  const cookie = await authenticateWithRetry();
  api = new UrbitSSEClient(account.url, cookie, {
    ship: botShipName,
    ssrfPolicy,
    logger: {
      log: (message) => runtime.log?.(message),
      error: (message) => runtime.error?.(message),
    },
    // Re-authenticate on reconnect in case the session expired
    onReconnect: async (client) => {
      runtime.log?.("[tlon] Re-authenticating on SSE reconnect...");
      const newCookie = await authenticateWithRetry(5);
      client.updateCookie(newCookie);
      runtime.log?.("[tlon] Re-authentication successful");
    },
  });

  const processedTracker = createProcessedMessageTracker(2000);
  let groupChannels: string[] = [];
  let botNickname: string | null = null;

  // Settings store manager for hot-reloading config
  const settingsManager = createSettingsManager(api, {
    log: (msg) => runtime.log?.(msg),
    error: (msg) => runtime.error?.(msg),
  });

  // Reactive state that can be updated via settings store
  let effectiveDmAllowlist: string[] = account.dmAllowlist;
  let effectiveShowModelSig: boolean = account.showModelSignature ?? false;
  let effectiveAutoAcceptDmInvites: boolean = account.autoAcceptDmInvites ?? false;
  let effectiveAutoAcceptGroupInvites: boolean = account.autoAcceptGroupInvites ?? false;
  let effectiveGroupInviteAllowlist: string[] = account.groupInviteAllowlist;
  let effectiveAutoDiscoverChannels: boolean = account.autoDiscoverChannels ?? false;
  let effectiveOwnerShip: string | null = account.ownerShip
    ? normalizeShip(account.ownerShip)
    : null;
  let pendingApprovals: PendingApproval[] = [];
  let currentSettings: TlonSettingsStore = {};

  // Track threads we've participated in (by parentId) - respond without mention requirement
  const participatedThreads = new Set<string>();

  // Track DM senders per session to detect shared sessions (security warning)
  const dmSendersBySession = new Map<string, Set<string>>();
  let sharedSessionWarningSent = false;

  // Fetch bot's nickname from contacts
  try {
    const selfProfile = await api.scry("/contacts/v1/self.json");
    if (selfProfile && typeof selfProfile === "object") {
      const profile = selfProfile as { nickname?: { value?: string } };
      botNickname = profile.nickname?.value || null;
      if (botNickname) {
        runtime.log?.(`[tlon] Bot nickname: ${botNickname}`);
      }
    }
  } catch (error: any) {
    runtime.log?.(`[tlon] Could not fetch nickname: ${error?.message ?? String(error)}`);
  }

  // Store init foreigns for processing after settings are loaded
  let initForeigns: Foreigns | null = null;

  // Migrate file config to settings store (seed on first run)
  async function migrateConfigToSettings() {
    const migrations: Array<{ key: string; fileValue: unknown; settingsValue: unknown }> = [
      {
        key: "dmAllowlist",
        fileValue: account.dmAllowlist,
        settingsValue: currentSettings.dmAllowlist,
      },
      {
        key: "groupInviteAllowlist",
        fileValue: account.groupInviteAllowlist,
        settingsValue: currentSettings.groupInviteAllowlist,
      },
      {
        key: "groupChannels",
        fileValue: account.groupChannels,
        settingsValue: currentSettings.groupChannels,
      },
      {
        key: "defaultAuthorizedShips",
        fileValue: account.defaultAuthorizedShips,
        settingsValue: currentSettings.defaultAuthorizedShips,
      },
      {
        key: "autoDiscoverChannels",
        fileValue: account.autoDiscoverChannels,
        settingsValue: currentSettings.autoDiscoverChannels,
      },
      {
        key: "autoAcceptDmInvites",
        fileValue: account.autoAcceptDmInvites,
        settingsValue: currentSettings.autoAcceptDmInvites,
      },
      {
        key: "autoAcceptGroupInvites",
        fileValue: account.autoAcceptGroupInvites,
        settingsValue: currentSettings.autoAcceptGroupInvites,
      },
      {
        key: "showModelSig",
        fileValue: account.showModelSignature,
        settingsValue: currentSettings.showModelSig,
      },
    ];

    for (const { key, fileValue, settingsValue } of migrations) {
      // Only migrate if file has a value and settings store doesn't
      const hasFileValue = Array.isArray(fileValue) ? fileValue.length > 0 : fileValue != null;
      const hasSettingsValue = Array.isArray(settingsValue)
        ? settingsValue.length > 0
        : settingsValue != null;

      if (hasFileValue && !hasSettingsValue) {
        try {
          await api!.poke({
            app: "settings",
            mark: "settings-event",
            json: {
              "put-entry": {
                "bucket-key": "tlon",
                "entry-key": key,
                value: fileValue,
                desk: "moltbot",
              },
            },
          });
          runtime.log?.(`[tlon] Migrated ${key} from config to settings store`);
        } catch (err) {
          runtime.log?.(`[tlon] Failed to migrate ${key}: ${String(err)}`);
        }
      }
    }
  }

  // Load settings from settings store (hot-reloadable config)
  try {
    currentSettings = await settingsManager.load();

    // Migrate file config to settings store if not already present
    await migrateConfigToSettings();

    // Apply settings overrides
    // Note: groupChannels from settings store are merged AFTER discovery runs (below)
    if (currentSettings.defaultAuthorizedShips?.length) {
      runtime.log?.(
        `[tlon] Using defaultAuthorizedShips from settings store: ${currentSettings.defaultAuthorizedShips.join(", ")}`,
      );
    }
    if (currentSettings.autoDiscoverChannels !== undefined) {
      effectiveAutoDiscoverChannels = currentSettings.autoDiscoverChannels;
      runtime.log?.(
        `[tlon] Using autoDiscoverChannels from settings store: ${effectiveAutoDiscoverChannels}`,
      );
    }
    if (currentSettings.dmAllowlist?.length) {
      effectiveDmAllowlist = currentSettings.dmAllowlist;
      runtime.log?.(
        `[tlon] Using dmAllowlist from settings store: ${effectiveDmAllowlist.join(", ")}`,
      );
    }
    if (currentSettings.showModelSig !== undefined) {
      effectiveShowModelSig = currentSettings.showModelSig;
    }
    if (currentSettings.autoAcceptDmInvites !== undefined) {
      effectiveAutoAcceptDmInvites = currentSettings.autoAcceptDmInvites;
      runtime.log?.(
        `[tlon] Using autoAcceptDmInvites from settings store: ${effectiveAutoAcceptDmInvites}`,
      );
    }
    if (currentSettings.autoAcceptGroupInvites !== undefined) {
      effectiveAutoAcceptGroupInvites = currentSettings.autoAcceptGroupInvites;
      runtime.log?.(
        `[tlon] Using autoAcceptGroupInvites from settings store: ${effectiveAutoAcceptGroupInvites}`,
      );
    }
    if (currentSettings.groupInviteAllowlist?.length) {
      effectiveGroupInviteAllowlist = currentSettings.groupInviteAllowlist;
      runtime.log?.(
        `[tlon] Using groupInviteAllowlist from settings store: ${effectiveGroupInviteAllowlist.join(", ")}`,
      );
    }
    if (currentSettings.ownerShip) {
      effectiveOwnerShip = normalizeShip(currentSettings.ownerShip);
      runtime.log?.(`[tlon] Using ownerShip from settings store: ${effectiveOwnerShip}`);
    }
    if (currentSettings.pendingApprovals?.length) {
      pendingApprovals = currentSettings.pendingApprovals;
      runtime.log?.(`[tlon] Loaded ${pendingApprovals.length} pending approval(s) from settings`);
    }
  } catch (err) {
    runtime.log?.(`[tlon] Settings store not available, using file config: ${String(err)}`);
  }

  // Run channel discovery AFTER settings are loaded (so settings store value is used)
  if (effectiveAutoDiscoverChannels) {
    try {
      const initData = await fetchInitData(api, runtime);
      if (initData.channels.length > 0) {
        groupChannels = initData.channels;
      }
      initForeigns = initData.foreigns;
    } catch (error: any) {
      runtime.error?.(`[tlon] Auto-discovery failed: ${error?.message ?? String(error)}`);
    }
  }

  // Merge manual config with auto-discovered channels
  if (account.groupChannels.length > 0) {
    for (const ch of account.groupChannels) {
      if (!groupChannels.includes(ch)) {
        groupChannels.push(ch);
      }
    }
    runtime.log?.(
      `[tlon] Added ${account.groupChannels.length} manual groupChannels to monitoring`,
    );
  }

  // Also merge settings store groupChannels (may have been set via tlon settings command)
  if (currentSettings.groupChannels?.length) {
    for (const ch of currentSettings.groupChannels) {
      if (!groupChannels.includes(ch)) {
        groupChannels.push(ch);
      }
    }
  }

  if (groupChannels.length > 0) {
    runtime.log?.(
      `[tlon] Monitoring ${groupChannels.length} group channel(s): ${groupChannels.join(", ")}`,
    );
  } else {
    runtime.log?.("[tlon] No group channels to monitor (DMs only)");
  }

  // Helper to resolve cited message content
  async function resolveCiteContent(cite: ParsedCite): Promise<string | null> {
    if (cite.type !== "chan" || !cite.nest || !cite.postId) {
      return null;
    }

    try {
      // Scry for the specific post: /v4/{nest}/posts/post/{postId}
      const scryPath = `/channels/v4/${cite.nest}/posts/post/${cite.postId}.json`;
      runtime.log?.(`[tlon] Fetching cited post: ${scryPath}`);

      const data: any = await api!.scry(scryPath);

      // Extract text from the post's essay content
      if (data?.essay?.content) {
        const text = extractMessageText(data.essay.content);
        return text || null;
      }

      return null;
    } catch (err) {
      runtime.log?.(`[tlon] Failed to fetch cited post: ${String(err)}`);
      return null;
    }
  }

  // Resolve all cites in message content and return quoted text
  async function resolveAllCites(content: unknown): Promise<string> {
    const cites = extractCites(content);
    if (cites.length === 0) {
      return "";
    }

    const resolved: string[] = [];
    for (const cite of cites) {
      const text = await resolveCiteContent(cite);
      if (text) {
        const author = cite.author || "unknown";
        resolved.push(`> ${author} wrote: ${text}`);
      }
    }

    return resolved.length > 0 ? resolved.join("\n") + "\n\n" : "";
  }

  // Helper to save pending approvals to settings store
  async function savePendingApprovals(): Promise<void> {
    try {
      await api!.poke({
        app: "settings",
        mark: "settings-event",
        json: {
          "put-entry": {
            desk: "moltbot",
            "bucket-key": "tlon",
            "entry-key": "pendingApprovals",
            value: JSON.stringify(pendingApprovals),
          },
        },
      });
    } catch (err) {
      runtime.error?.(`[tlon] Failed to save pending approvals: ${String(err)}`);
    }
  }

  // Helper to update dmAllowlist in settings store
  async function addToDmAllowlist(ship: string): Promise<void> {
    const normalizedShip = normalizeShip(ship);
    if (!effectiveDmAllowlist.includes(normalizedShip)) {
      effectiveDmAllowlist = [...effectiveDmAllowlist, normalizedShip];
    }
    try {
      await api!.poke({
        app: "settings",
        mark: "settings-event",
        json: {
          "put-entry": {
            desk: "moltbot",
            "bucket-key": "tlon",
            "entry-key": "dmAllowlist",
            value: effectiveDmAllowlist,
          },
        },
      });
      runtime.log?.(`[tlon] Added ${normalizedShip} to dmAllowlist`);
    } catch (err) {
      runtime.error?.(`[tlon] Failed to update dmAllowlist: ${String(err)}`);
    }
  }

  // Helper to update channelRules in settings store
  async function addToChannelAllowlist(ship: string, channelNest: string): Promise<void> {
    const normalizedShip = normalizeShip(ship);
    const channelRules = currentSettings.channelRules ?? {};
    const rule = channelRules[channelNest] ?? { mode: "restricted", allowedShips: [] };
    const allowedShips = [...(rule.allowedShips ?? [])]; // Clone to avoid mutation

    if (!allowedShips.includes(normalizedShip)) {
      allowedShips.push(normalizedShip);
    }

    const updatedRules = {
      ...channelRules,
      [channelNest]: { ...rule, allowedShips },
    };

    // Update local state immediately (don't wait for settings subscription)
    currentSettings = { ...currentSettings, channelRules: updatedRules };

    try {
      await api!.poke({
        app: "settings",
        mark: "settings-event",
        json: {
          "put-entry": {
            desk: "moltbot",
            "bucket-key": "tlon",
            "entry-key": "channelRules",
            value: JSON.stringify(updatedRules),
          },
        },
      });
      runtime.log?.(`[tlon] Added ${normalizedShip} to ${channelNest} allowlist`);
    } catch (err) {
      runtime.error?.(`[tlon] Failed to update channelRules: ${String(err)}`);
    }
  }

  // Helper to block a ship using Tlon's native blocking
  async function blockShip(ship: string): Promise<void> {
    const normalizedShip = normalizeShip(ship);
    try {
      await api!.poke({
        app: "chat",
        mark: "chat-block-ship",
        json: { ship: normalizedShip },
      });
      runtime.log?.(`[tlon] Blocked ship ${normalizedShip}`);
    } catch (err) {
      runtime.error?.(`[tlon] Failed to block ship ${normalizedShip}: ${String(err)}`);
    }
  }

  // Check if a ship is blocked using Tlon's native block list
  async function isShipBlocked(ship: string): Promise<boolean> {
    const normalizedShip = normalizeShip(ship);
    try {
      const blocked = (await api!.scry("/chat/blocked.json")) as string[] | undefined;
      return Array.isArray(blocked) && blocked.some((s) => normalizeShip(s) === normalizedShip);
    } catch (err) {
      runtime.log?.(`[tlon] Failed to check blocked list: ${String(err)}`);
      return false;
    }
  }

  // Get all blocked ships
  async function getBlockedShips(): Promise<string[]> {
    try {
      const blocked = (await api!.scry("/chat/blocked.json")) as string[] | undefined;
      return Array.isArray(blocked) ? blocked : [];
    } catch (err) {
      runtime.log?.(`[tlon] Failed to get blocked list: ${String(err)}`);
      return [];
    }
  }

  // Helper to unblock a ship using Tlon's native blocking
  async function unblockShip(ship: string): Promise<boolean> {
    const normalizedShip = normalizeShip(ship);
    try {
      await api!.poke({
        app: "chat",
        mark: "chat-unblock-ship",
        json: { ship: normalizedShip },
      });
      runtime.log?.(`[tlon] Unblocked ship ${normalizedShip}`);
      return true;
    } catch (err) {
      runtime.error?.(`[tlon] Failed to unblock ship ${normalizedShip}: ${String(err)}`);
      return false;
    }
  }

  // Helper to send DM notification to owner
  async function sendOwnerNotification(message: string): Promise<void> {
    if (!effectiveOwnerShip) {
      runtime.log?.("[tlon] No ownerShip configured, cannot send notification");
      return;
    }
    try {
      await sendDm({
        api: api!,
        fromShip: botShipName,
        toShip: effectiveOwnerShip,
        text: message,
      });
      runtime.log?.(`[tlon] Sent notification to owner ${effectiveOwnerShip}`);
    } catch (err) {
      runtime.error?.(`[tlon] Failed to send notification to owner: ${String(err)}`);
    }
  }

  // Queue a new approval request and notify the owner
  async function queueApprovalRequest(approval: PendingApproval): Promise<void> {
    // Check if ship is blocked - silently ignore
    if (await isShipBlocked(approval.requestingShip)) {
      runtime.log?.(`[tlon] Ignoring request from blocked ship ${approval.requestingShip}`);
      return;
    }

    // Check for duplicate - if found, update it with new content and re-notify
    const existingIndex = pendingApprovals.findIndex(
      (a) =>
        a.type === approval.type &&
        a.requestingShip === approval.requestingShip &&
        (approval.type !== "channel" || a.channelNest === approval.channelNest) &&
        (approval.type !== "group" || a.groupFlag === approval.groupFlag),
    );

    if (existingIndex !== -1) {
      // Update existing approval with new content (preserves the original ID)
      const existing = pendingApprovals[existingIndex];
      if (approval.originalMessage) {
        existing.originalMessage = approval.originalMessage;
        existing.messagePreview = approval.messagePreview;
      }
      runtime.log?.(
        `[tlon] Updated existing approval for ${approval.requestingShip} (${approval.type}) - re-sending notification`,
      );
      await savePendingApprovals();
      const message = formatApprovalRequest(existing);
      await sendOwnerNotification(message);
      return;
    }

    pendingApprovals.push(approval);
    await savePendingApprovals();

    const message = formatApprovalRequest(approval);
    await sendOwnerNotification(message);
    runtime.log?.(
      `[tlon] Queued approval request: ${approval.id} (${approval.type} from ${approval.requestingShip})`,
    );
  }

  // Process the owner's approval response
  async function handleApprovalResponse(text: string): Promise<boolean> {
    const parsed = parseApprovalResponse(text);
    if (!parsed) {
      return false;
    }

    const approval = findPendingApproval(pendingApprovals, parsed.id);
    if (!approval) {
      await sendOwnerNotification(
        "No pending approval found" + (parsed.id ? ` for ID: ${parsed.id}` : ""),
      );
      return true; // Still consumed the message
    }

    if (parsed.action === "approve") {
      switch (approval.type) {
        case "dm":
          await addToDmAllowlist(approval.requestingShip);
          // Process the original message if available
          if (approval.originalMessage) {
            runtime.log?.(
              `[tlon] Processing original message from ${approval.requestingShip} after approval`,
            );
            await processMessage({
              messageId: approval.originalMessage.messageId,
              senderShip: approval.requestingShip,
              messageText: approval.originalMessage.messageText,
              messageContent: approval.originalMessage.messageContent,
              isGroup: false,
              timestamp: approval.originalMessage.timestamp,
            });
          }
          break;

        case "channel":
          if (approval.channelNest) {
            await addToChannelAllowlist(approval.requestingShip, approval.channelNest);
            // Process the original message if available
            if (approval.originalMessage) {
              const parsed = parseChannelNest(approval.channelNest);
              runtime.log?.(
                `[tlon] Processing original message from ${approval.requestingShip} in ${approval.channelNest} after approval`,
              );
              await processMessage({
                messageId: approval.originalMessage.messageId,
                senderShip: approval.requestingShip,
                messageText: approval.originalMessage.messageText,
                messageContent: approval.originalMessage.messageContent,
                isGroup: true,
                channelNest: approval.channelNest,
                hostShip: parsed?.hostShip,
                channelName: parsed?.channelName,
                timestamp: approval.originalMessage.timestamp,
                parentId: approval.originalMessage.parentId,
                isThreadReply: approval.originalMessage.isThreadReply,
              });
            }
          }
          break;

        case "group":
          // Accept the group invite (don't add to allowlist - each invite requires approval)
          if (approval.groupFlag) {
            try {
              await api!.poke({
                app: "groups",
                mark: "group-join",
                json: {
                  flag: approval.groupFlag,
                  "join-all": true,
                },
              });
              runtime.log?.(`[tlon] Joined group ${approval.groupFlag} after approval`);

              // Immediately discover channels from the newly joined group
              // Small delay to allow the join to propagate
              setTimeout(async () => {
                try {
                  const discoveredChannels = await fetchAllChannels(api!, runtime);
                  let newCount = 0;
                  for (const channelNest of discoveredChannels) {
                    if (!watchedChannels.has(channelNest)) {
                      watchedChannels.add(channelNest);
                      newCount++;
                    }
                  }
                  if (newCount > 0) {
                    runtime.log?.(
                      `[tlon] Discovered ${newCount} new channel(s) after joining group`,
                    );
                  }
                } catch (err) {
                  runtime.log?.(`[tlon] Channel discovery after group join failed: ${String(err)}`);
                }
              }, 2000);
            } catch (err) {
              runtime.error?.(`[tlon] Failed to join group ${approval.groupFlag}: ${String(err)}`);
            }
          }
          break;
      }

      await sendOwnerNotification(formatApprovalConfirmation(approval, "approve"));
    } else if (parsed.action === "block") {
      // Block the ship using Tlon's native blocking
      await blockShip(approval.requestingShip);
      await sendOwnerNotification(formatApprovalConfirmation(approval, "block"));
    } else {
      // Denied - just remove from pending, no notification to requester
      await sendOwnerNotification(formatApprovalConfirmation(approval, "deny"));
    }

    // Remove from pending
    pendingApprovals = removePendingApproval(pendingApprovals, approval.id);
    await savePendingApprovals();

    return true;
  }

  // Handle admin commands from owner (unblock, blocked, pending)
  async function handleAdminCommand(text: string): Promise<boolean> {
    const command = parseAdminCommand(text);
    if (!command) {
      return false;
    }

    switch (command.type) {
      case "blocked": {
        const blockedShips = await getBlockedShips();
        await sendOwnerNotification(formatBlockedList(blockedShips));
        runtime.log?.(`[tlon] Owner requested blocked ships list (${blockedShips.length} ships)`);
        return true;
      }

      case "pending": {
        await sendOwnerNotification(formatPendingList(pendingApprovals));
        runtime.log?.(
          `[tlon] Owner requested pending approvals list (${pendingApprovals.length} pending)`,
        );
        return true;
      }

      case "unblock": {
        const shipToUnblock = command.ship;
        const isBlocked = await isShipBlocked(shipToUnblock);
        if (!isBlocked) {
          await sendOwnerNotification(`${shipToUnblock} is not blocked.`);
          return true;
        }
        const success = await unblockShip(shipToUnblock);
        if (success) {
          await sendOwnerNotification(`Unblocked ${shipToUnblock}.`);
        } else {
          await sendOwnerNotification(`Failed to unblock ${shipToUnblock}.`);
        }
        return true;
      }
    }
  }

  // Check if a ship is the owner (always allowed to DM)
  function isOwner(ship: string): boolean {
    if (!effectiveOwnerShip) {
      return false;
    }
    return normalizeShip(ship) === effectiveOwnerShip;
  }

  /**
   * Extract the DM partner ship from the 'whom' field.
   * This is the canonical source for DM routing (more reliable than essay.author).
   * Returns empty string if whom doesn't contain a valid patp-like value.
   */
  function extractDmPartnerShip(whom: unknown): string {
    const raw =
      typeof whom === "string"
        ? whom
        : whom && typeof whom === "object" && "ship" in whom && typeof whom.ship === "string"
          ? whom.ship
          : "";
    const normalized = normalizeShip(raw);
    // Keep DM routing strict: accept only patp-like values.
    return /^~?[a-z-]+$/i.test(normalized) ? normalized : "";
  }

  const processMessage = async (params: {
    messageId: string;
    senderShip: string;
    messageText: string;
    messageContent?: unknown; // Raw Tlon content for media extraction
    isGroup: boolean;
    channelNest?: string;
    hostShip?: string;
    channelName?: string;
    timestamp: number;
    parentId?: string | null;
    isThreadReply?: boolean;
  }) => {
    const {
      messageId,
      senderShip,
      isGroup,
      channelNest,
      hostShip,
      channelName,
      timestamp,
      parentId,
      isThreadReply,
      messageContent,
    } = params;
    const groupChannel = channelNest; // For compatibility
    let messageText = params.messageText;

    // Download any images from the message content
    let attachments: Array<{ path: string; contentType: string }> = [];
    if (messageContent) {
      try {
        attachments = await downloadMessageImages(messageContent);
        if (attachments.length > 0) {
          runtime.log?.(`[tlon] Downloaded ${attachments.length} image(s) from message`);
        }
      } catch (error: any) {
        runtime.log?.(`[tlon] Failed to download images: ${error?.message ?? String(error)}`);
      }
    }

    // Fetch thread context when entering a thread for the first time
    if (isThreadReply && parentId && groupChannel) {
      try {
        const threadHistory = await fetchThreadHistory(api, groupChannel, parentId, 20, runtime);
        if (threadHistory.length > 0) {
          const threadContext = threadHistory
            .slice(-10) // Last 10 messages for context
            .map((msg) => `${msg.author}: ${msg.content}`)
            .join("\n");

          // Prepend thread context to the message
          // Include note about ongoing conversation for agent judgment
          const contextNote = `[Thread conversation - ${threadHistory.length} previous replies. You are participating in this thread. Only respond if relevant or helpful - you don't need to reply to every message.]`;
          messageText = `${contextNote}\n\n[Previous messages]\n${threadContext}\n\n[Current message]\n${messageText}`;
          runtime?.log?.(
            `[tlon] Added thread context (${threadHistory.length} replies) to message`,
          );
        }
      } catch (error: any) {
        runtime?.log?.(`[tlon] Could not fetch thread context: ${error?.message ?? String(error)}`);
        // Continue without thread context - not critical
      }
    }

    if (isGroup && groupChannel && isSummarizationRequest(messageText)) {
      try {
        const history = await getChannelHistory(api, groupChannel, 50, runtime);
        if (history.length === 0) {
          const noHistoryMsg =
            "I couldn't fetch any messages for this channel. It might be empty or there might be a permissions issue.";
          if (isGroup) {
            const parsed = parseChannelNest(groupChannel);
            if (parsed) {
              await sendGroupMessage({
                api: api,
                fromShip: botShipName,
                hostShip: parsed.hostShip,
                channelName: parsed.channelName,
                text: noHistoryMsg,
              });
            }
          } else {
            await sendDm({
              api: api,
              fromShip: botShipName,
              toShip: senderShip,
              text: noHistoryMsg,
            });
          }
          return;
        }

        const historyText = history
          .map(
            (msg) => `[${new Date(msg.timestamp).toLocaleString()}] ${msg.author}: ${msg.content}`,
          )
          .join("\n");

        messageText =
          `Please summarize this channel conversation (${history.length} recent messages):\n\n${historyText}\n\n` +
          "Provide a concise summary highlighting:\n" +
          "1. Main topics discussed\n" +
          "2. Key decisions or conclusions\n" +
          "3. Action items if any\n" +
          "4. Notable participants";
      } catch (error: any) {
        const errorMsg = `Sorry, I encountered an error while fetching the channel history: ${error?.message ?? String(error)}`;
        if (isGroup && groupChannel) {
          const parsed = parseChannelNest(groupChannel);
          if (parsed) {
            await sendGroupMessage({
              api: api,
              fromShip: botShipName,
              hostShip: parsed.hostShip,
              channelName: parsed.channelName,
              text: errorMsg,
            });
          }
        } else {
          await sendDm({ api: api, fromShip: botShipName, toShip: senderShip, text: errorMsg });
        }
        return;
      }
    }

    const route = core.channel.routing.resolveAgentRoute({
      cfg,
      channel: "tlon",
      accountId: opts.accountId ?? undefined,
      peer: {
        kind: isGroup ? "group" : "direct",
        id: isGroup ? (groupChannel ?? senderShip) : senderShip,
      },
    });

    // Warn if multiple users share a DM session (insecure dmScope configuration)
    if (!isGroup) {
      const sessionKey = route.sessionKey;
      if (!dmSendersBySession.has(sessionKey)) {
        dmSendersBySession.set(sessionKey, new Set());
      }
      const senders = dmSendersBySession.get(sessionKey)!;
      if (senders.size > 0 && !senders.has(senderShip)) {
        // Log warning
        runtime.log?.(
          `[tlon] ⚠️ SECURITY: Multiple users sharing DM session. ` +
            `Configure "session.dmScope: per-channel-peer" in OpenClaw config.`,
        );

        // Notify owner via DM (once per monitor session)
        if (!sharedSessionWarningSent && effectiveOwnerShip) {
          sharedSessionWarningSent = true;
          const warningMsg =
            `⚠️ Security Warning: Multiple users are sharing a DM session with this bot. ` +
            `This can leak conversation context between users.\n\n` +
            `Fix: Add to your OpenClaw config:\n` +
            `session:\n  dmScope: "per-channel-peer"\n\n` +
            `Docs: https://docs.openclaw.ai/concepts/session#secure-dm-mode`;

          // Send async, don't block message processing
          sendDm({
            api,
            fromShip: botShipName,
            toShip: effectiveOwnerShip,
            text: warningMsg,
          }).catch((err) =>
            runtime.error?.(`[tlon] Failed to send security warning to owner: ${err}`),
          );
        }
      }
      senders.add(senderShip);
    }

    const senderRole = isOwner(senderShip) ? "owner" : "user";
    const fromLabel = isGroup
      ? `${senderShip} [${senderRole}] in ${channelNest}`
      : `${senderShip} [${senderRole}]`;

    // Compute command authorization for slash commands (owner-only)
    const shouldComputeAuth = core.channel.commands.shouldComputeCommandAuthorized(
      messageText,
      cfg,
    );
    let commandAuthorized = false;

    if (shouldComputeAuth) {
      const useAccessGroups = cfg.commands?.useAccessGroups !== false;
      const senderIsOwner = isOwner(senderShip);

      commandAuthorized = core.channel.commands.resolveCommandAuthorizedFromAuthorizers({
        useAccessGroups,
        authorizers: [{ configured: Boolean(effectiveOwnerShip), allowed: senderIsOwner }],
      });

      // Log when non-owner attempts a slash command (will be silently ignored by Gateway)
      if (!commandAuthorized) {
        console.log(
          `[tlon] Command attempt denied: ${senderShip} is not owner (owner=${effectiveOwnerShip ?? "not configured"})`,
        );
      }
    }

    // Prepend attachment annotations to message body (similar to Signal format)
    let bodyWithAttachments = messageText;
    if (attachments.length > 0) {
      const mediaLines = attachments
        .map((a) => `[media attached: ${a.path} (${a.contentType}) | ${a.path}]`)
        .join("\n");
      bodyWithAttachments = mediaLines + "\n" + messageText;
    }

    const body = core.channel.reply.formatAgentEnvelope({
      channel: "Tlon",
      from: fromLabel,
      timestamp,
      body: bodyWithAttachments,
    });

    // Strip bot ship mention for CommandBody so "/status" is recognized as command-only
    const commandBody = isGroup ? stripBotMention(messageText, botShipName) : messageText;

    const ctxPayload = core.channel.reply.finalizeInboundContext({
      Body: body,
      RawBody: messageText,
      CommandBody: commandBody,
      From: isGroup ? `tlon:group:${groupChannel}` : `tlon:${senderShip}`,
      To: `tlon:${botShipName}`,
      SessionKey: route.sessionKey,
      AccountId: route.accountId,
      ChatType: isGroup ? "group" : "direct",
      ConversationLabel: fromLabel,
      SenderName: senderShip,
      SenderId: senderShip,
      SenderRole: senderRole,
      CommandAuthorized: commandAuthorized,
      CommandSource: "text" as const,
      Provider: "tlon",
      Surface: "tlon",
      MessageSid: messageId,
      // Include downloaded media attachments
      ...(attachments.length > 0 && { Attachments: attachments }),
      OriginatingChannel: "tlon",
      OriginatingTo: `tlon:${isGroup ? groupChannel : botShipName}`,
      // Include thread context for automatic reply routing
      ...(parentId && { ThreadId: String(parentId), ReplyToId: String(parentId) }),
    });

    const dispatchStartTime = Date.now();

    const responsePrefix = core.channel.reply.resolveEffectiveMessagesConfig(
      cfg,
      route.agentId,
    ).responsePrefix;
    const humanDelay = core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId);

    await core.channel.reply.dispatchReplyWithBufferedBlockDispatcher({
      ctx: ctxPayload,
      cfg,
      dispatcherOptions: {
        responsePrefix,
        humanDelay,
        deliver: async (payload: ReplyPayload) => {
          let replyText = payload.text;
          if (!replyText) {
            return;
          }

          // Use settings store value if set, otherwise fall back to file config
          const showSignature = effectiveShowModelSig;
          if (showSignature) {
            const extPayload = payload as ReplyPayload & {
              metadata?: { model?: string };
              model?: string;
            };
            const extRoute = route as typeof route & { model?: string };
            const defaultModel = cfg.agents?.defaults?.model;
            const modelInfo =
              extPayload.metadata?.model ||
              extPayload.model ||
              extRoute.model ||
              (typeof defaultModel === "string" ? defaultModel : defaultModel?.primary);
            extPayload.metadata?.model ||
              extPayload.model ||
              extRoute.model ||
              (typeof defaultModel === "string" ? defaultModel : defaultModel?.primary);
            replyText = `${replyText}\n\n_[Generated by ${formatModelName(modelInfo)}]_`;
          }

          if (isGroup && groupChannel) {
            const parsed = parseChannelNest(groupChannel);
            if (!parsed) {
              return;
            }
            await sendGroupMessage({
              api: api,
              fromShip: botShipName,
              hostShip: parsed.hostShip,
              channelName: parsed.channelName,
              text: replyText,
              replyToId: parentId ?? undefined,
            });
            // Track thread participation for future replies without mention
            if (parentId) {
              participatedThreads.add(String(parentId));
              runtime.log?.(`[tlon] Now tracking thread for future replies: ${parentId}`);
            }
          } else {
            await sendDm({ api: api, fromShip: botShipName, toShip: senderShip, text: replyText });
          }
        },
        onError: (err, info) => {
          const dispatchDuration = Date.now() - dispatchStartTime;
          runtime.error?.(
            `[tlon] ${info.kind} reply failed after ${dispatchDuration}ms: ${String(err)}`,
          );
        },
      },
    });
  };

  // Track which channels we're interested in for filtering firehose events
  const watchedChannels = new Set<string>(groupChannels);
  const _watchedDMs = new Set<string>();

  // Firehose handler for all channel messages (/v2)
  const handleChannelsFirehose = async (event: any) => {
    try {
      const nest = event?.nest;
      if (!nest) {
        return;
      }

      // Only process channels we're watching
      if (!watchedChannels.has(nest)) {
        return;
      }

      const response = event?.response;
      if (!response) {
        return;
      }

      // Handle post responses (new posts and replies)
      const essay = response?.post?.["r-post"]?.set?.essay;
      const memo = response?.post?.["r-post"]?.reply?.["r-reply"]?.set?.memo;
      if (!essay && !memo) {
        return;
      }

      const content = memo || essay;
      const isThreadReply = Boolean(memo);
      const messageId = isThreadReply ? response?.post?.["r-post"]?.reply?.id : response?.post?.id;

      if (!processedTracker.mark(messageId)) {
        return;
      }

      const senderShip = normalizeShip(content.author ?? "");
      if (!senderShip || senderShip === botShipName) {
        return;
      }

      // Resolve any cited/quoted messages first
      const citedContent = await resolveAllCites(content.content);
      const rawText = extractMessageText(content.content);
      const messageText = citedContent + rawText;
      if (!messageText.trim()) {
        return;
      }

      cacheMessage(nest, {
        author: senderShip,
        content: messageText,
        timestamp: content.sent || Date.now(),
        id: messageId,
      });

      // Get thread info early for participation check
      const seal = isThreadReply
        ? response?.post?.["r-post"]?.reply?.["r-reply"]?.set?.seal
        : response?.post?.["r-post"]?.set?.seal;
      const parentId = seal?.["parent-id"] || seal?.parent || null;

      // Check if we should respond:
      // 1. Direct mention always triggers response
      // 2. Thread replies where we've participated - respond if relevant (let agent decide)
      const mentioned = isBotMentioned(messageText, botShipName, botNickname ?? undefined);
      const inParticipatedThread =
        isThreadReply && parentId && participatedThreads.has(String(parentId));

      if (!mentioned && !inParticipatedThread) {
        return;
      }

      // Log why we're responding
      if (inParticipatedThread && !mentioned) {
        runtime.log?.(`[tlon] Responding to thread we participated in (no mention): ${parentId}`);
      }

      // Owner is always allowed
      if (isOwner(senderShip)) {
        runtime.log?.(`[tlon] Owner ${senderShip} is always allowed in channels`);
      } else {
        const { mode, allowedShips } = resolveChannelAuthorization(cfg, nest, currentSettings);
        if (mode === "restricted") {
          const normalizedAllowed = allowedShips.map(normalizeShip);
          if (!normalizedAllowed.includes(senderShip)) {
            // If owner is configured, queue approval request
            if (effectiveOwnerShip) {
              const approval = createPendingApproval({
                type: "channel",
                requestingShip: senderShip,
                channelNest: nest,
                messagePreview: messageText.substring(0, 100),
                originalMessage: {
                  messageId: messageId ?? "",
                  messageText,
                  messageContent: content.content,
                  timestamp: content.sent || Date.now(),
                  parentId: parentId ?? undefined,
                  isThreadReply,
                },
              });
              await queueApprovalRequest(approval);
            } else {
              runtime.log?.(
                `[tlon] Access denied: ${senderShip} in ${nest} (allowed: ${allowedShips.join(", ")})`,
              );
            }
            return;
          }
        }
      }

      const parsed = parseChannelNest(nest);
      await processMessage({
        messageId: messageId ?? "",
        senderShip,
        messageText,
        messageContent: content.content, // Pass raw content for media extraction
        isGroup: true,
        channelNest: nest,
        hostShip: parsed?.hostShip,
        channelName: parsed?.channelName,
        timestamp: content.sent || Date.now(),
        parentId,
        isThreadReply,
      });
    } catch (error: any) {
      runtime.error?.(
        `[tlon] Error handling channel firehose event: ${error?.message ?? String(error)}`,
      );
    }
  };

  // Firehose handler for all DM messages (/v3)
  // Track which DM invites we've already processed to avoid duplicate accepts
  const processedDmInvites = new Set<string>();

  const handleChatFirehose = async (event: any) => {
    try {
      // Handle DM invite lists (arrays)
      if (Array.isArray(event)) {
        for (const invite of event as DmInvite[]) {
          const ship = normalizeShip(invite.ship || "");
          if (!ship || processedDmInvites.has(ship)) {
            continue;
          }

          // Owner is always allowed
          if (isOwner(ship)) {
            try {
              await api.poke({
                app: "chat",
                mark: "chat-dm-rsvp",
                json: { ship, ok: true },
              });
              processedDmInvites.add(ship);
              runtime.log?.(`[tlon] Auto-accepted DM invite from owner ${ship}`);
            } catch (err) {
              runtime.error?.(`[tlon] Failed to auto-accept DM from owner: ${String(err)}`);
            }
            continue;
          }

          // Auto-accept if on allowlist and auto-accept is enabled
          if (effectiveAutoAcceptDmInvites && isDmAllowed(ship, effectiveDmAllowlist)) {
            try {
              await api.poke({
                app: "chat",
                mark: "chat-dm-rsvp",
                json: { ship, ok: true },
              });
              processedDmInvites.add(ship);
              runtime.log?.(`[tlon] Auto-accepted DM invite from ${ship}`);
            } catch (err) {
              runtime.error?.(`[tlon] Failed to auto-accept DM from ${ship}: ${String(err)}`);
            }
            continue;
          }

          // If owner is configured and ship is not on allowlist, queue approval
          if (effectiveOwnerShip && !isDmAllowed(ship, effectiveDmAllowlist)) {
            const approval = createPendingApproval({
              type: "dm",
              requestingShip: ship,
              messagePreview: "(DM invite - no message yet)",
            });
            await queueApprovalRequest(approval);
            processedDmInvites.add(ship); // Mark as processed to avoid duplicate notifications
          }
        }
        return;
      }
      if (!("whom" in event) || !("response" in event)) {
        return;
      }

      const whom = event.whom; // DM partner ship or club ID
      const messageId = event.id;
      const response = event.response;

      // Handle add events (new messages)
      const essay = response?.add?.essay;
      if (!essay) {
        return;
      }

      if (!processedTracker.mark(messageId)) {
        return;
      }

      const authorShip = normalizeShip(essay.author ?? "");
      const partnerShip = extractDmPartnerShip(whom);
      const senderShip = partnerShip || authorShip;

      // Ignore the bot's own outbound DM events.
      if (authorShip === botShipName) {
        return;
      }
      if (!senderShip || senderShip === botShipName) {
        return;
      }

      // Log mismatch between author and partner for debugging
      if (authorShip && partnerShip && authorShip !== partnerShip) {
        runtime.log?.(
          `[tlon] DM ship mismatch (author=${authorShip}, partner=${partnerShip}) - routing to partner`,
        );
      }

      // Resolve any cited/quoted messages first
      const citedContent = await resolveAllCites(essay.content);
      const rawText = extractMessageText(essay.content);
      const messageText = citedContent + rawText;
      if (!messageText.trim()) {
        return;
      }

      // Check if this is the owner sending an approval response
      if (isOwner(senderShip) && isApprovalResponse(messageText)) {
        const handled = await handleApprovalResponse(messageText);
        if (handled) {
          runtime.log?.(`[tlon] Processed approval response from owner: ${messageText}`);
          return;
        }
      }

      // Check if this is the owner sending an admin command
      if (isOwner(senderShip) && isAdminCommand(messageText)) {
        const handled = await handleAdminCommand(messageText);
        if (handled) {
          runtime.log?.(`[tlon] Processed admin command from owner: ${messageText}`);
          return;
        }
      }

      // Owner is always allowed to DM (bypass allowlist)
      if (isOwner(senderShip)) {
        runtime.log?.(`[tlon] Processing DM from owner ${senderShip}`);
        await processMessage({
          messageId: messageId ?? "",
          senderShip,
          messageText,
          messageContent: essay.content,
          isGroup: false,
          timestamp: essay.sent || Date.now(),
        });
        return;
      }

      // For DMs from others, check allowlist
      if (!isDmAllowed(senderShip, effectiveDmAllowlist)) {
        // If owner is configured, queue approval request
        if (effectiveOwnerShip) {
          const approval = createPendingApproval({
            type: "dm",
            requestingShip: senderShip,
            messagePreview: messageText.substring(0, 100),
            originalMessage: {
              messageId: messageId ?? "",
              messageText,
              messageContent: essay.content,
              timestamp: essay.sent || Date.now(),
            },
          });
          await queueApprovalRequest(approval);
        } else {
          runtime.log?.(`[tlon] Blocked DM from ${senderShip}: not in allowlist`);
        }
        return;
      }

      await processMessage({
        messageId: messageId ?? "",
        senderShip,
        messageText,
        messageContent: essay.content, // Pass raw content for media extraction
        isGroup: false,
        timestamp: essay.sent || Date.now(),
      });
    } catch (error: any) {
      runtime.error?.(
        `[tlon] Error handling chat firehose event: ${error?.message ?? String(error)}`,
      );
    }
  };

  try {
    runtime.log?.("[tlon] Subscribing to firehose updates...");

    // Subscribe to channels firehose (/v2)
    await api.subscribe({
      app: "channels",
      path: "/v2",
      event: handleChannelsFirehose,
      err: (error) => {
        runtime.error?.(`[tlon] Channels firehose error: ${String(error)}`);
      },
      quit: () => {
        runtime.log?.("[tlon] Channels firehose subscription ended");
      },
    });
    runtime.log?.("[tlon] Subscribed to channels firehose (/v2)");

    // Subscribe to chat/DM firehose (/v3)
    await api.subscribe({
      app: "chat",
      path: "/v3",
      event: handleChatFirehose,
      err: (error) => {
        runtime.error?.(`[tlon] Chat firehose error: ${String(error)}`);
      },
      quit: () => {
        runtime.log?.("[tlon] Chat firehose subscription ended");
      },
    });
    runtime.log?.("[tlon] Subscribed to chat firehose (/v3)");

    // Subscribe to contacts updates to track nickname changes
    await api.subscribe({
      app: "contacts",
      path: "/v1/news",
      event: (event: any) => {
        try {
          // Look for self profile updates
          if (event?.self) {
            const selfUpdate = event.self;
            if (selfUpdate?.contact?.nickname?.value !== undefined) {
              const newNickname = selfUpdate.contact.nickname.value || null;
              if (newNickname !== botNickname) {
                botNickname = newNickname;
                runtime.log?.(`[tlon] Nickname updated: ${botNickname}`);
              }
            }
          }
        } catch (error: any) {
          runtime.error?.(
            `[tlon] Error handling contacts event: ${error?.message ?? String(error)}`,
          );
        }
      },
      err: (error) => {
        runtime.error?.(`[tlon] Contacts subscription error: ${String(error)}`);
      },
      quit: () => {
        runtime.log?.("[tlon] Contacts subscription ended");
      },
    });
    runtime.log?.("[tlon] Subscribed to contacts updates (/v1/news)");

    // Subscribe to settings store for hot-reloading config
    settingsManager.onChange((newSettings) => {
      currentSettings = newSettings;

      // Update watched channels if settings changed
      if (newSettings.groupChannels?.length) {
        const newChannels = newSettings.groupChannels;
        for (const ch of newChannels) {
          if (!watchedChannels.has(ch)) {
            watchedChannels.add(ch);
            runtime.log?.(`[tlon] Settings: now watching channel ${ch}`);
          }
        }
        // Note: we don't remove channels from watchedChannels to avoid missing messages
        // during transitions. The authorization check handles access control.
      }

      // Update DM allowlist
      if (newSettings.dmAllowlist !== undefined) {
        effectiveDmAllowlist =
          newSettings.dmAllowlist.length > 0 ? newSettings.dmAllowlist : account.dmAllowlist;
        runtime.log?.(`[tlon] Settings: dmAllowlist updated to ${effectiveDmAllowlist.join(", ")}`);
      }

      // Update model signature setting
      if (newSettings.showModelSig !== undefined) {
        effectiveShowModelSig = newSettings.showModelSig;
        runtime.log?.(`[tlon] Settings: showModelSig = ${effectiveShowModelSig}`);
      }

      // Update auto-accept DM invites setting
      if (newSettings.autoAcceptDmInvites !== undefined) {
        effectiveAutoAcceptDmInvites = newSettings.autoAcceptDmInvites;
        runtime.log?.(`[tlon] Settings: autoAcceptDmInvites = ${effectiveAutoAcceptDmInvites}`);
      }

      // Update auto-accept group invites setting
      if (newSettings.autoAcceptGroupInvites !== undefined) {
        effectiveAutoAcceptGroupInvites = newSettings.autoAcceptGroupInvites;
        runtime.log?.(
          `[tlon] Settings: autoAcceptGroupInvites = ${effectiveAutoAcceptGroupInvites}`,
        );
      }

      // Update group invite allowlist
      if (newSettings.groupInviteAllowlist !== undefined) {
        effectiveGroupInviteAllowlist =
          newSettings.groupInviteAllowlist.length > 0
            ? newSettings.groupInviteAllowlist
            : account.groupInviteAllowlist;
        runtime.log?.(
          `[tlon] Settings: groupInviteAllowlist updated to ${effectiveGroupInviteAllowlist.join(", ")}`,
        );
      }

      if (newSettings.defaultAuthorizedShips !== undefined) {
        runtime.log?.(
          `[tlon] Settings: defaultAuthorizedShips updated to ${(newSettings.defaultAuthorizedShips || []).join(", ")}`,
        );
      }

      // Update auto-discover channels
      if (newSettings.autoDiscoverChannels !== undefined) {
        effectiveAutoDiscoverChannels = newSettings.autoDiscoverChannels;
        runtime.log?.(`[tlon] Settings: autoDiscoverChannels = ${effectiveAutoDiscoverChannels}`);
      }

      // Update owner ship
      if (newSettings.ownerShip !== undefined) {
        effectiveOwnerShip = newSettings.ownerShip
          ? normalizeShip(newSettings.ownerShip)
          : account.ownerShip
            ? normalizeShip(account.ownerShip)
            : null;
        runtime.log?.(`[tlon] Settings: ownerShip = ${effectiveOwnerShip}`);
      }

      // Update pending approvals
      if (newSettings.pendingApprovals !== undefined) {
        pendingApprovals = newSettings.pendingApprovals;
        runtime.log?.(
          `[tlon] Settings: pendingApprovals updated (${pendingApprovals.length} items)`,
        );
      }
    });

    try {
      await settingsManager.startSubscription();
    } catch (err) {
      // Settings subscription is optional - don't fail if it doesn't work
      runtime.log?.(`[tlon] Settings subscription not available: ${String(err)}`);
    }

    // Subscribe to groups-ui for real-time channel additions (when invites are accepted)
    try {
      await api.subscribe({
        app: "groups",
        path: "/groups/ui",
        event: async (event: any) => {
          try {
            // Handle group/channel join events
            // Event structure: { group: { flag: "~host/group-name", ... }, channels: { ... } }
            if (event && typeof event === "object") {
              // Check for new channels being added to groups
              if (event.channels && typeof event.channels === "object") {
                const channels = event.channels as Record<string, any>;
                for (const [channelNest, _channelData] of Object.entries(channels)) {
                  // Only monitor chat channels
                  if (!channelNest.startsWith("chat/")) {
                    continue;
                  }

                  // If this is a new channel we're not watching yet, add it
                  if (!watchedChannels.has(channelNest)) {
                    watchedChannels.add(channelNest);
                    runtime.log?.(
                      `[tlon] Auto-detected new channel (invite accepted): ${channelNest}`,
                    );

                    // Persist to settings store so it survives restarts
                    if (effectiveAutoAcceptGroupInvites) {
                      try {
                        const currentChannels = currentSettings.groupChannels || [];
                        if (!currentChannels.includes(channelNest)) {
                          const updatedChannels = [...currentChannels, channelNest];
                          // Poke settings store to persist
                          await api.poke({
                            app: "settings",
                            mark: "settings-event",
                            json: {
                              "put-entry": {
                                "bucket-key": "tlon",
                                "entry-key": "groupChannels",
                                value: updatedChannels,
                                desk: "moltbot",
                              },
                            },
                          });
                          runtime.log?.(`[tlon] Persisted ${channelNest} to settings store`);
                        }
                      } catch (err) {
                        runtime.error?.(
                          `[tlon] Failed to persist channel to settings: ${String(err)}`,
                        );
                      }
                    }
                  }
                }
              }

              // Also check for the "join" event structure
              if (event.join && typeof event.join === "object") {
                const join = event.join as { group?: string; channels?: string[] };
                if (join.channels) {
                  for (const channelNest of join.channels) {
                    if (!channelNest.startsWith("chat/")) {
                      continue;
                    }
                    if (!watchedChannels.has(channelNest)) {
                      watchedChannels.add(channelNest);
                      runtime.log?.(`[tlon] Auto-detected joined channel: ${channelNest}`);

                      // Persist to settings store
                      if (effectiveAutoAcceptGroupInvites) {
                        try {
                          const currentChannels = currentSettings.groupChannels || [];
                          if (!currentChannels.includes(channelNest)) {
                            const updatedChannels = [...currentChannels, channelNest];
                            await api.poke({
                              app: "settings",
                              mark: "settings-event",
                              json: {
                                "put-entry": {
                                  "bucket-key": "tlon",
                                  "entry-key": "groupChannels",
                                  value: updatedChannels,
                                  desk: "moltbot",
                                },
                              },
                            });
                            runtime.log?.(`[tlon] Persisted ${channelNest} to settings store`);
                          }
                        } catch (err) {
                          runtime.error?.(
                            `[tlon] Failed to persist channel to settings: ${String(err)}`,
                          );
                        }
                      }
                    }
                  }
                }
              }
            }
          } catch (error: any) {
            runtime.error?.(
              `[tlon] Error handling groups-ui event: ${error?.message ?? String(error)}`,
            );
          }
        },
        err: (error) => {
          runtime.error?.(`[tlon] Groups-ui subscription error: ${String(error)}`);
        },
        quit: () => {
          runtime.log?.("[tlon] Groups-ui subscription ended");
        },
      });
      runtime.log?.("[tlon] Subscribed to groups-ui for real-time channel detection");
    } catch (err) {
      // Groups-ui subscription is optional - channel discovery will still work via polling
      runtime.log?.(`[tlon] Groups-ui subscription failed (will rely on polling): ${String(err)}`);
    }

    // Subscribe to foreigns for auto-accepting group invites
    // Always subscribe so we can hot-reload the setting via settings store
    {
      const processedGroupInvites = new Set<string>();

      // Helper to process pending invites
      const processPendingInvites = async (foreigns: Foreigns) => {
        if (!foreigns || typeof foreigns !== "object") {
          return;
        }

        for (const [groupFlag, foreign] of Object.entries(foreigns)) {
          if (processedGroupInvites.has(groupFlag)) {
            continue;
          }
          if (!foreign.invites || foreign.invites.length === 0) {
            continue;
          }

          const validInvite = foreign.invites.find((inv) => inv.valid);
          if (!validInvite) {
            continue;
          }

          const inviterShip = validInvite.from;
          const normalizedInviter = normalizeShip(inviterShip);

          // Owner invites are always accepted
          if (isOwner(inviterShip)) {
            try {
              await api.poke({
                app: "groups",
                mark: "group-join",
                json: {
                  flag: groupFlag,
                  "join-all": true,
                },
              });
              processedGroupInvites.add(groupFlag);
              runtime.log?.(`[tlon] Auto-accepted group invite from owner: ${groupFlag}`);
            } catch (err) {
              runtime.error?.(`[tlon] Failed to accept group invite from owner: ${String(err)}`);
            }
            continue;
          }

          // Skip if auto-accept is disabled
          if (!effectiveAutoAcceptGroupInvites) {
            // If owner is configured, queue approval
            if (effectiveOwnerShip) {
              const approval = createPendingApproval({
                type: "group",
                requestingShip: inviterShip,
                groupFlag,
              });
              await queueApprovalRequest(approval);
              processedGroupInvites.add(groupFlag);
            }
            continue;
          }

          // Check if inviter is on allowlist
          const isAllowed =
            effectiveGroupInviteAllowlist.length > 0
              ? effectiveGroupInviteAllowlist
                  .map((s) => normalizeShip(s))
                  .some((s) => s === normalizedInviter)
              : false; // Fail-safe: empty allowlist means deny

          if (!isAllowed) {
            // If owner is configured, queue approval
            if (effectiveOwnerShip) {
              const approval = createPendingApproval({
                type: "group",
                requestingShip: inviterShip,
                groupFlag,
              });
              await queueApprovalRequest(approval);
              processedGroupInvites.add(groupFlag);
            } else {
              runtime.log?.(
                `[tlon] Rejected group invite from ${inviterShip} (not in groupInviteAllowlist): ${groupFlag}`,
              );
              processedGroupInvites.add(groupFlag);
            }
            continue;
          }

          // Inviter is on allowlist - accept the invite
          try {
            await api.poke({
              app: "groups",
              mark: "group-join",
              json: {
                flag: groupFlag,
                "join-all": true,
              },
            });
            processedGroupInvites.add(groupFlag);
            runtime.log?.(
              `[tlon] Auto-accepted group invite: ${groupFlag} (from ${validInvite.from})`,
            );
          } catch (err) {
            runtime.error?.(`[tlon] Failed to auto-accept group ${groupFlag}: ${String(err)}`);
          }
        }
      };

      // Process existing pending invites from init data
      if (initForeigns) {
        await processPendingInvites(initForeigns);
      }

      try {
        await api.subscribe({
          app: "groups",
          path: "/v1/foreigns",
          event: (data: unknown) => {
            void (async () => {
              try {
                await processPendingInvites(data as Foreigns);
              } catch (error: any) {
                runtime.error?.(
                  `[tlon] Error handling foreigns event: ${error?.message ?? String(error)}`,
                );
              }
            })();
          },
          err: (error) => {
            runtime.error?.(`[tlon] Foreigns subscription error: ${String(error)}`);
          },
          quit: () => {
            runtime.log?.("[tlon] Foreigns subscription ended");
          },
        });
        runtime.log?.(
          "[tlon] Subscribed to foreigns (/v1/foreigns) for auto-accepting group invites",
        );
      } catch (err) {
        runtime.log?.(`[tlon] Foreigns subscription failed: ${String(err)}`);
      }
    }

    // Discover channels to watch
    if (effectiveAutoDiscoverChannels) {
      const discoveredChannels = await fetchAllChannels(api, runtime);
      for (const channelNest of discoveredChannels) {
        watchedChannels.add(channelNest);
      }
      runtime.log?.(`[tlon] Watching ${watchedChannels.size} channel(s)`);
    }

    // Log watched channels
    for (const channelNest of watchedChannels) {
      runtime.log?.(`[tlon] Watching channel: ${channelNest}`);
    }

    runtime.log?.("[tlon] All subscriptions registered, connecting to SSE stream...");
    await api.connect();
    runtime.log?.("[tlon] Connected! Firehose subscriptions active");

    // Periodically refresh channel discovery
    const pollInterval = setInterval(
      async () => {
        if (!opts.abortSignal?.aborted) {
          try {
            if (effectiveAutoDiscoverChannels) {
              const discoveredChannels = await fetchAllChannels(api, runtime);
              for (const channelNest of discoveredChannels) {
                if (!watchedChannels.has(channelNest)) {
                  watchedChannels.add(channelNest);
                  runtime.log?.(`[tlon] Now watching new channel: ${channelNest}`);
                }
              }
            }
          } catch (error: any) {
            runtime.error?.(`[tlon] Channel refresh error: ${error?.message ?? String(error)}`);
          }
        }
      },
      2 * 60 * 1000,
    );

    if (opts.abortSignal) {
      const signal = opts.abortSignal;
      await new Promise((resolve) => {
        signal.addEventListener(
          "abort",
          () => {
            clearInterval(pollInterval);
            resolve(null);
          },
          { once: true },
        );
      });
    } else {
      await new Promise(() => {});
    }
  } finally {
    try {
      await api?.close();
    } catch (error: any) {
      runtime.error?.(`[tlon] Cleanup error: ${error?.message ?? String(error)}`);
    }
  }
}
