"""
Hermes Web UI -- Workspace and file system helpers.

Workspace lists and last-used workspace are stored per-profile so each
profile has its own workspace configuration.  State files live at
``{profile_home}/webui_state/workspaces.json`` and
``{profile_home}/webui_state/last_workspace.txt``.  The global STATE_DIR
paths are used as fallback when no profile module is available.
"""
import json
import logging
import os
import stat
import subprocess
import concurrent.futures
from pathlib import Path

logger = logging.getLogger(__name__)

from api.config import (
    WORKSPACES_FILE as _GLOBAL_WS_FILE,
    LAST_WORKSPACE_FILE as _GLOBAL_LW_FILE,
    DEFAULT_WORKSPACE as _BOOT_DEFAULT_WORKSPACE,
    MAX_FILE_BYTES, IMAGE_EXTS, MD_EXTS
)


# ── Profile-aware path resolution ───────────────────────────────────────────

def _profile_state_dir() -> Path:
    """Return the webui_state directory for the active profile.

    For the default profile, returns the global STATE_DIR (respects
    HERMES_WEBUI_STATE_DIR env var for test isolation).
    For named profiles, returns {profile_home}/webui_state/.
    """
    try:
        from api.profiles import get_active_profile_name, get_active_hermes_home
        name = get_active_profile_name()
        if name and name != 'default':
            d = get_active_hermes_home() / 'webui_state'
            d.mkdir(parents=True, exist_ok=True)
            return d
    except ImportError:
        logger.debug("Failed to import profiles module, using global state dir")
    return _GLOBAL_WS_FILE.parent


def _workspaces_file() -> Path:
    """Return the workspaces.json path for the active profile."""
    return _profile_state_dir() / 'workspaces.json'


def _last_workspace_file() -> Path:
    """Return the last_workspace.txt path for the active profile."""
    return _profile_state_dir() / 'last_workspace.txt'


def _profile_default_workspace() -> str:
    """Read the profile's default workspace from its config.yaml.

    Checks keys in priority order:
      1. 'workspace'         — explicit webui workspace key
      2. 'default_workspace' — alternate explicit key
      3. 'terminal.cwd'      — hermes-agent terminal working dir (most common)

    Falls back to the live DEFAULT_WORKSPACE from api.config.
    """
    try:
        from api.config import get_config
        cfg = get_config()
        # Explicit webui workspace keys first
        for key in ('workspace', 'default_workspace'):
            ws = cfg.get(key)
            if ws:
                p = Path(str(ws)).expanduser().resolve()
                if p.is_dir():
                    return str(p)
        # Fall through to terminal.cwd — the agent's configured working directory
        terminal_cfg = cfg.get('terminal', {})
        if isinstance(terminal_cfg, dict):
            cwd = terminal_cfg.get('cwd', '')
            if cwd and str(cwd) not in ('.', ''):
                p = Path(str(cwd)).expanduser().resolve()
                if p.is_dir():
                    return str(p)
    except (ImportError, Exception):
        logger.debug("Failed to load profile default workspace config")
    try:
        from api.config import DEFAULT_WORKSPACE as _LIVE_DEFAULT_WORKSPACE

        return str(Path(_LIVE_DEFAULT_WORKSPACE).expanduser().resolve())
    except Exception:
        return str(Path(_BOOT_DEFAULT_WORKSPACE).expanduser().resolve())


# ── Public API ──────────────────────────────────────────────────────────────

def _clean_workspace_list(workspaces: list) -> list:
    """Sanitize a workspace list:
    - Preserve saved paths even when they are currently missing or inaccessible;
      picker state must not be destroyed by a transient stat/permission failure.
    - Remove entries whose paths live inside another profile's directory
      (e.g. ~/.hermes/profiles/X/... should not appear on a different profile).
    - Rename any entry whose name is literally 'default' to 'Home' (avoids
      confusion with the 'default' profile name).
    Returns the cleaned list (may be empty).
    """
    hermes_profiles = (Path.home() / '.hermes' / 'profiles').resolve()
    result = []
    for w in workspaces:
        path = w.get('path', '')
        name = w.get('name', '')
        if not path:
            continue
        p = _safe_resolve(Path(path).expanduser())
        # Skip paths inside a DIFFERENT profile's directory (cross-profile leak).
        # Allow paths inside the CURRENT profile's own directory (e.g. test workspaces
        # created under ~/.hermes/profiles/webui/webui-mvp-test/).
        try:
            p.relative_to(hermes_profiles)
            # p is under ~/.hermes/profiles/ — only skip if it's under a DIFFERENT profile
            try:
                from api.profiles import get_active_hermes_home
                own_profile_dir = get_active_hermes_home().resolve()
                p.relative_to(own_profile_dir)
                # p is under our own profile dir — keep it
            except (ValueError, Exception):
                continue  # under profiles/ but not our own — cross-profile leak, skip
        except ValueError:
            pass  # not under profiles/ at all — keep it
        # Rename confusing 'default' label to 'Home'
        if name.lower() == 'default':
            name = 'Home'
        result.append({'path': str(p), 'name': name})
    return result


def _workspace_access_error(candidate: Path, *, missing_label: str = "Path does not exist") -> str | None:
    """Return a user-facing validation error for an unusable workspace path.

    ``Path.exists()`` can collapse permission/stat failures into a generic falsey
    result on some Python/OS combinations, which produced misleading "does not
    exist" messages for macOS/TCC-denied directories.  Probe with ``stat()`` so
    missing paths, non-directories, and permission-denied paths can be reported
    separately.
    """
    try:
        st = candidate.stat()
    except FileNotFoundError:
        return f"{missing_label}: {candidate}"
    except PermissionError as exc:
        return (
            f"Cannot access path: {candidate}. The server process could not inspect "
            f"this directory ({exc}). On macOS, grant Full Disk Access or Files and "
            f"Folders permission to the Hermes/WebUI app or server process, then try again."
        )
    except OSError as exc:
        return f"Cannot access path: {candidate}. The server process could not inspect this path ({exc})."
    if not stat.S_ISDIR(st.st_mode):
        return f"Path is not a directory: {candidate}"
    return None


def _migrate_global_workspaces() -> list:
    """Read the legacy global workspaces.json, clean it, and return the result.

    This is the migration path for users upgrading from a pre-profile version:
    their global file may contain cross-profile entries, test artifacts, and
    stale paths accumulated over time.  We clean it in-place and rewrite it.
    """
    if not _GLOBAL_WS_FILE.exists():
        return []
    try:
        raw = json.loads(_GLOBAL_WS_FILE.read_text(encoding='utf-8'))
        cleaned = _clean_workspace_list(raw)
        if len(cleaned) != len(raw):
            # Rewrite the cleaned version so future reads are already clean
            _GLOBAL_WS_FILE.write_text(
                json.dumps(cleaned, ensure_ascii=False, indent=2), encoding='utf-8'
            )
        return cleaned
    except Exception:
        return []


def load_workspaces() -> list:
    ws_file = _workspaces_file()
    if ws_file.exists():
        try:
            raw = json.loads(ws_file.read_text(encoding='utf-8'))
            cleaned = _clean_workspace_list(raw)
            if len(cleaned) != len(raw):
                # Persist the cleaned version so stale entries don't keep reappearing
                try:
                    ws_file.write_text(
                        json.dumps(cleaned, ensure_ascii=False, indent=2), encoding='utf-8'
                    )
                except Exception:
                    logger.debug("Failed to persist cleaned workspace list")
            return cleaned or [{'path': _profile_default_workspace(), 'name': 'Home'}]
        except Exception:
            logger.debug("Failed to load workspaces from %s", ws_file)
    # No profile-local file yet.
    # For the DEFAULT profile: migrate from the legacy global file (one-time cleanup).
    # For NAMED profiles: always start clean with just their own workspace.
    try:
        from api.profiles import get_active_profile_name
        is_default = get_active_profile_name() in ('default', None)
    except ImportError:
        is_default = True
    if is_default:
        migrated = _migrate_global_workspaces()
        if migrated:
            return migrated
    # Fresh start: single entry from the profile's configured workspace, labeled "Home"
    return [{'path': _profile_default_workspace(), 'name': 'Home'}]


def save_workspaces(workspaces: list) -> None:
    ws_file = _workspaces_file()
    ws_file.parent.mkdir(parents=True, exist_ok=True)
    ws_file.write_text(json.dumps(workspaces, ensure_ascii=False, indent=2), encoding='utf-8')


def get_last_workspace() -> str:
    lw_file = _last_workspace_file()
    if lw_file.exists():
        try:
            p = lw_file.read_text(encoding='utf-8').strip()
            if p and Path(p).is_dir():
                return p
        except Exception:
            logger.debug("Failed to read last workspace from %s", lw_file)
    # Fallback: try global file
    if _GLOBAL_LW_FILE.exists():
        try:
            p = _GLOBAL_LW_FILE.read_text(encoding='utf-8').strip()
            if p and Path(p).is_dir():
                return p
        except Exception:
            logger.debug("Failed to read global last workspace")
    return _profile_default_workspace()


def set_last_workspace(path: str) -> None:
    try:
        lw_file = _last_workspace_file()
        lw_file.parent.mkdir(parents=True, exist_ok=True)
        lw_file.write_text(str(path), encoding='utf-8')
    except Exception:
        logger.debug("Failed to set last workspace")


def _safe_resolve(p: Path) -> Path:
    """Path.resolve() that never raises — falls back to the input path on error."""
    try:
        return p.resolve()
    except (OSError, RuntimeError):
        return p


# Per-user temp directories that sit nominally under a "system" prefix but are
# actually user-writable scratch space.  Workspaces registered here (e.g. by
# pytest's ``tmp_path_factory`` on macOS, which uses ``/var/folders/<hash>/T/``)
# must remain accepted even though their parent (``/var``) is blocked.  These
# carve-outs apply to BOTH workspace registration and runtime file ops so a
# symlink target inside the carve-out is also reachable.
_USER_TMP_PREFIXES: tuple[Path, ...] = (
    Path('/var/folders'),         # macOS per-user tmp (literal form)
    Path('/private/var/folders'),  # macOS per-user tmp (resolved form)
    Path('/var/tmp'),               # Linux/macOS system-wide tmp (user-writable)
    Path('/private/var/tmp'),       # macOS resolved form
)


def _workspace_blocked_roots() -> tuple[Path, ...]:
    """System roots that must never be accepted as workspace candidates.

    Returns both the literal path and its symlink-resolved canonical form,
    deduped.  This matters on macOS where ``/etc``, ``/var``, and ``/tmp``
    are symlinks to ``/private/etc`` etc.  Without the resolved forms,
    callers that pass a ``.resolve()``-d candidate (every caller does)
    would compare ``/private/etc`` against literal ``Path('/etc')`` and the
    ``relative_to`` check would miss — letting ``/etc`` through as a
    registered workspace on macOS.

    Carve-outs for legitimate user-tmp paths nominally under these roots
    (e.g. ``/var/folders/.../T/`` on macOS) are handled by
    :func:`_is_blocked_system_path`, not by exclusion from this list.
    """
    _raw = (
        # Linux / macOS
        '/etc',
        '/usr',
        '/var',
        '/bin',
        '/sbin',
        '/boot',
        '/proc',
        '/sys',
        '/dev',
        '/lib',
        '/lib64',
        '/opt/homebrew',
        '/System',
        '/Library',
    )
    _seen: set[Path] = set()
    _out: list[Path] = []
    for _p in _raw:
        for _form in (Path(_p), _safe_resolve(Path(_p))):
            if _form not in _seen:
                _seen.add(_form)
                _out.append(_form)
    return tuple(_out)


def _is_blocked_system_path(candidate: Path) -> bool:
    """Return True if *candidate* falls under a blocked system root.

    Honours :data:`_USER_TMP_PREFIXES` carve-outs so per-user tmp directories
    nominally under ``/var`` (``/var/folders`` on macOS, ``/var/tmp`` on
    Linux/macOS) remain valid workspace candidates and reachable file targets.
    """
    for tmp in _USER_TMP_PREFIXES:
        if _is_within(candidate, tmp):
            return False
    for blocked in _workspace_blocked_roots():
        if _is_within(candidate, blocked):
            return True
    return False


def _workspace_blocked_resolved_subtrees() -> tuple[Path, ...]:
    roots = list(_workspace_blocked_roots()) + [Path('/private/etc')]
    resolved: list[Path] = []
    for root in roots:
        try:
            p = root.expanduser().resolve()
        except Exception:
            p = root
        if p not in resolved:
            resolved.append(p)
    return tuple(resolved)


def _workspace_blocked_exact_roots() -> tuple[Path, ...]:
    roots = [Path('/'), Path('/private/var')]
    for root in _workspace_blocked_roots():
        try:
            roots.append(root.expanduser().resolve())
        except Exception:
            roots.append(root)
    unique: list[Path] = []
    for root in roots:
        if root not in unique:
            unique.append(root)
    return tuple(unique)


def _is_blocked_workspace_path(candidate: Path, raw_path: str | Path | None = None) -> bool:
    """Return True when candidate points at a known OS/system directory.

    Compare both the original spelling and the resolved path.  This closes the
    macOS /etc -> /private/etc bypass without globally banning temporary pytest
    paths under /private/var/folders.
    """
    raw = None
    if raw_path not in (None, ""):
        try:
            raw = Path(raw_path).expanduser()
        except Exception:
            raw = None

    exact = _workspace_blocked_exact_roots()
    if candidate in exact or (raw is not None and raw in _workspace_blocked_roots()):
        return True

    for tmp in _USER_TMP_PREFIXES:
        if _is_within(candidate, tmp) or (raw is not None and _is_within(raw, tmp)):
            return False

    # Raw paths under literal roots (e.g. /etc/ssh, /var/db) are always blocked.
    if raw is not None:
        for blocked in _workspace_blocked_roots():
            if _is_within(raw, blocked):
                return True

    # Resolved subtree checks catch symlink aliases such as /private/etc.  The
    # macOS temp root /private/var/folders is intentionally allowed for pytest
    # and per-user temporary workspaces; other direct /private/var system data
    # such as /private/var/db and /private/var/log remains blocked.
    allowed_private_var = (Path('/private/var/folders'), Path('/private/var/tmp'))
    for blocked in _workspace_blocked_resolved_subtrees():
        if blocked == Path('/private/var'):
            if candidate == blocked:
                return True
            if any(_is_within(candidate, allowed) for allowed in allowed_private_var):
                continue
            if _is_within(candidate, blocked):
                return True
            continue
        if _is_within(candidate, blocked):
            return True
    return False


def _is_within(path: Path, root: Path) -> bool:
    try:
        path.relative_to(root)
        return True
    except ValueError:
        return False


def _trusted_workspace_roots() -> list[Path]:
    roots: list[Path] = []

    def add(candidate: str | Path | None) -> None:
        if candidate in (None, ""):
            return
        try:
            p = Path(candidate).expanduser().resolve()
        except Exception:
            return
        if not p.exists() or not p.is_dir():
            return
        if _is_blocked_workspace_path(p, candidate):
            return
        if p not in roots:
            roots.append(p)

    add(Path.home())
    add(_BOOT_DEFAULT_WORKSPACE)
    for w in load_workspaces():
        add(w.get("path"))
    roots.sort(key=lambda p: len(str(p)))
    return roots


def list_workspace_suggestions(prefix: str = "", limit: int = 12) -> list[str]:
    """Return workspace path suggestions under trusted roots only.

    Suggestions are limited to directories under one of:
      - Path.home()
      - the boot default workspace
      - already-saved workspace roots

    Arbitrary system prefixes return an empty list rather than an error so the
    UI can safely autocomplete while the user types.
    """
    roots = _trusted_workspace_roots()
    if not roots:
        return []

    raw = (prefix or "").strip()
    if not raw:
        return [str(p) for p in roots[:limit]]

    if raw.startswith("~"):
        target = Path(raw).expanduser()
    elif Path(raw).is_absolute():
        target = Path(raw)
    else:
        target = Path.home() / raw

    normalized = str(target)
    normalized_lower = normalized.lower()
    suggestions: list[str] = []

    def add(path: Path) -> None:
        value = str(path)
        if value not in suggestions:
            suggestions.append(value)

    # If the user is typing a partial trusted root like /Users/xuef..., suggest
    # the matching trusted roots without scanning arbitrary system parents.
    for root in roots:
        if str(root).lower().startswith(normalized_lower):
            add(root)

    in_root = [
        root
        for root in roots
        if normalized == str(root) or normalized.startswith(str(root) + os.sep)
    ]
    if not in_root:
        return suggestions[:limit]

    anchor_root = max(in_root, key=lambda p: len(str(p)))
    ends_with_sep = raw.endswith(os.sep) or raw.endswith('/')
    parent = target if ends_with_sep else target.parent
    leaf = '' if ends_with_sep else target.name
    show_hidden = leaf.startswith('.')

    try:
        parent_resolved = parent.expanduser().resolve()
    except Exception:
        return suggestions[:limit]

    if not parent_resolved.exists() or not parent_resolved.is_dir():
        return suggestions[:limit]
    if not _is_within(parent_resolved, anchor_root):
        return suggestions[:limit]

    leaf_lower = leaf.lower()
    try:
        children = sorted(parent_resolved.iterdir(), key=lambda p: p.name.lower())
    except OSError:
        return suggestions[:limit]

    for child in children:
        if not child.is_dir():
            continue
        if child.name.startswith('.') and not show_hidden:
            continue
        if leaf_lower and not child.name.lower().startswith(leaf_lower):
            continue
        add(child.resolve())
        if len(suggestions) >= limit:
            break
    return suggestions[:limit]


def resolve_trusted_workspace(path: str | Path | None = None) -> Path:
    """Resolve and validate a workspace path.

    A path is trusted if it satisfies at least one of:
      (A) It is under the user's home directory (Path.home()).
          Works cross-platform: ~/... on Linux/macOS, C:\\Users\\... on Windows.
      (B) It is already in the profile's saved workspace list.
          This covers self-hosted deployments where workspaces live outside home
          (e.g. /data/projects, /opt/workspace) — once a workspace is saved by
          an admin, it can be reused without re-validation.

    Additionally enforced regardless of (A)/(B):
      1. The path must exist.
      2. The path must be a directory.
      3. The path must not be a known system root (/etc, /usr, /var, /bin, /sbin,
         /boot, /proc, /sys, /dev, /root on Linux/macOS; Windows system dirs).
         This prevents even admin-saved workspaces from pointing at OS internals.

    None/empty path falls back to the boot-time DEFAULT_WORKSPACE, which is always
    trusted (it was validated at server startup).
    """
    if path in (None, ""):
        return Path(_BOOT_DEFAULT_WORKSPACE).expanduser().resolve()

    candidate = Path(path).expanduser().resolve()

    access_error = _workspace_access_error(candidate)
    if access_error:
        raise ValueError(access_error)

    # (A) Trusted if under the user's home directory — cross-platform via Path.home()
    # Must be checked before system roots to allow symlinks like /var/home.
    _home = Path.home().resolve()
    if _home != Path("/"):
        try:
            candidate.relative_to(_home)
            return candidate
        except ValueError:
            pass

    # Block known system roots and their children.
    if _is_blocked_workspace_path(candidate, path):
        raise ValueError(f"Path points to a system directory: {candidate}")

    # (B) Trusted if already in the saved workspace list — covers non-home installs
    try:
        saved = load_workspaces()
        saved_paths = {Path(w["path"]).resolve() for w in saved if w.get("path")}
        if candidate in saved_paths:
            return candidate
    except Exception:
        pass

    # (C) Trusted if it is equal to or under the boot-time DEFAULT_WORKSPACE.
    #     In Docker deployments HERMES_WEBUI_DEFAULT_WORKSPACE is often set to a
    #     volume mount outside the user's home (e.g. /data/workspace).  That path
    #     was already validated at server startup, so any sub-path of it is safe
    #     without requiring the user to add it to the workspace list manually.
    try:
        boot_default = Path(_BOOT_DEFAULT_WORKSPACE).expanduser().resolve()
        candidate.relative_to(boot_default)
        return candidate
    except ValueError:
        pass

    raise ValueError(
        f"Path is outside the user home directory, not in the saved workspace "
        f"list, and not under the default workspace: {candidate}. "
        f"Add it via Settings → Workspaces first."
    )




def _strip_surrounding_quotes(path: str) -> str:
    """Strip a single pair of surrounding single or double quotes from a path string.

    macOS Finder's "Copy as Pathname" (Cmd+Option+C) returns paths wrapped in
    single quotes, e.g. ``'/Users/x/Documents/foo'``. Other shells and OS file
    managers do similar things with double quotes. Users routinely paste these
    quoted strings into the Add Space input expecting them to "just work" —
    the only reason they didn't was a missing strip.

    Only paired quotes are stripped (matching opener and closer). One-sided quotes
    are preserved on the slim chance a path legitimately contains a literal quote
    character.
    """
    s = path.strip()
    if len(s) >= 2 and s[0] == s[-1] and s[0] in ("'", '"'):
        return s[1:-1]
    return s


def validate_workspace_to_add(path: str) -> Path:
    """Validate a path for *adding* to the workspace list (less restrictive than resolve_trusted_workspace).

    When a user explicitly adds a new workspace path, we trust their intent — they
    have console or filesystem access to that path and are consciously registering it.
    We only block: non-existent paths, non-directories, and known system roots.

    The stricter ``resolve_trusted_workspace`` is used when *using* an existing workspace
    (file reads/writes) to prevent path traversal after the list is built.

    Surrounding quotes (single or double) are stripped before validation —
    macOS Finder's "Copy as Pathname" wraps paths in single quotes by default,
    and users routinely paste those into the Add Space input.
    """
    path = _strip_surrounding_quotes(path)
    candidate = Path(path).expanduser().resolve()

    access_error = _workspace_access_error(candidate)
    if access_error:
        raise ValueError(access_error)

    # Home directory is always trusted regardless of where it lives on disk
    # (e.g. /var/home/... on systemd-homed Fedora/RHEL).
    _home = Path.home().resolve()
    if _home != Path("/") and _is_within(candidate, _home):
        return candidate

    # Block known system roots and their immediate children.
    if _is_blocked_workspace_path(candidate, path):
        raise ValueError(f"Path points to a system directory: {candidate}")

    return candidate

def safe_resolve_ws(root: Path, requested: str) -> Path:
    """Resolve a relative path inside a workspace root, raising ValueError on traversal.

    Symlinks whose *unresolved* path is within the workspace root are allowed —
    the user placed them there intentionally.  Only raw ``..`` traversal outside
    the root is blocked.
    """
    import os
    unresolved = root / requested
    resolved = unresolved.resolve()
    # Fast path: resolved path is inside root (covers most cases)
    try:
        resolved.relative_to(root.resolve())
        return resolved
    except ValueError:
        pass
    # Symlink path: normalize '..' (without following symlinks) and check
    # os.path.normpath collapses '..' but does NOT follow symlinks.
    norm = Path(os.path.normpath(str(unresolved)))
    try:
        norm.relative_to(root)
    except ValueError:
        raise ValueError(f"Path traversal blocked: {requested}")
    # Symlink points outside workspace root — additionally block system directories.
    # Even if the user placed the symlink intentionally, prevent reads from
    # /etc, /proc, /sys, /dev and other blocked roots (LLM agents can call
    # read_file_content via tool calls, not just human users).
    if _is_blocked_system_path(resolved):
        raise ValueError(f"Path traversal blocked (system dir): {requested}")
    return resolved


def list_dir(workspace: Path, rel: str='.'):
    target = safe_resolve_ws(workspace, rel)
    if not target.is_dir():
        raise FileNotFoundError(f"Not a directory: {rel}")
    ws_resolved = workspace.resolve()
    entries = []
    for item in sorted(target.iterdir(), key=lambda p: (not p.is_symlink(), p.is_file(), p.name.lower())):
        if item.is_symlink():
            # Resolve the symlink target and check if it stays within workspace
            try:
                link_target = item.resolve()
            except OSError:
                continue
            # Cycle detection: skip if symlink points back to current dir,
            # workspace root, or any ancestor of current dir.
            # This must run REGARDLESS of whether target is inside workspace.
            if (link_target == target.resolve() or link_target == target
                    or link_target == ws_resolved):
                continue
            try:
                target.resolve().relative_to(link_target)
                # target is under link_target — link_target is an ancestor → cycle
                continue
            except ValueError:
                pass
            # Block symlinks that resolve to system directories.
            if _is_blocked_system_path(link_target):
                continue
            is_dir = link_target.is_dir()
            # Keep the display path relative to workspace (don't follow the link)
            display_path = str(Path(item.name))
            if rel and rel != '.':
                display_path = rel + '/' + display_path
            entry = {
                'name': item.name,
                'path': display_path,
                'type': 'symlink',
                'target': str(link_target),
                'is_dir': is_dir,
            }
            if not is_dir:
                try:
                    entry['size'] = link_target.stat().st_size
                except OSError:
                    entry['size'] = None
            entries.append(entry)
        else:
            # Use rel-based path so entries under symlink targets (outside
            # the workspace root) still get a valid workspace-relative path.
            entry_path = item.name
            if rel and rel != '.':
                entry_path = rel + '/' + item.name
            entries.append({
                'name': item.name,
                'path': entry_path,
                'type': 'dir' if item.is_dir() else 'file',
                'size': item.stat().st_size if item.is_file() else None,
            })
        if len(entries) >= 200:
            break
    return entries


def read_file_content(workspace: Path, rel: str) -> dict:
    target = safe_resolve_ws(workspace, rel)
    if not target.is_file():
        raise FileNotFoundError(f"Not a file: {rel}")
    size = target.stat().st_size
    if size > MAX_FILE_BYTES:
        raise ValueError(f"File too large ({size} bytes, max {MAX_FILE_BYTES})")
    content = target.read_text(encoding='utf-8', errors='replace')
    return {'path': rel, 'content': content, 'size': size, 'lines': content.count('\n') + 1}


# ── Git detection ──────────────────────────────────────────────────────────

def _run_git(args, cwd, timeout=3):
    """Run a git command and return stdout, or None on failure."""
    try:
        r = subprocess.run(
            ['git'] + args, cwd=str(cwd), capture_output=True,
            text=True, timeout=timeout,
        )
        return r.stdout.strip() if r.returncode == 0 else None
    except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
        return None


def git_info_for_workspace(workspace: Path) -> dict:
    """Return git info for a workspace directory, or None if not a git repo."""
    if not (workspace / '.git').exists():
        return None
    branch = _run_git(['rev-parse', '--abbrev-ref', 'HEAD'], workspace)
    if branch is None:
        return None
    # Run the remaining git commands in parallel via threads — they are
    # independent subprocess calls and together can take 50-200ms when run
    # serially.  Threading is safe here because each call blocks only on the
    # subprocess pipe, not on the GIL.
    def _ahead():
        r = _run_git(['rev-list', '--count', '@{u}..HEAD'], workspace)
        return int(r) if r and r.isdigit() else 0
    def _behind():
        r = _run_git(['rev-list', '--count', 'HEAD..@{u}'], workspace)
        return int(r) if r and r.isdigit() else 0
    def _status():
        out = _run_git(['status', '--porcelain'], workspace) or ''
        lines = [l for l in out.splitlines() if l]
        modified = sum(1 for l in lines if len(l) >= 2 and (l[0] in 'MAR' or l[1] in 'MAR'))
        untracked = sum(1 for l in lines if l.startswith('??'))
        return len(lines), modified, untracked
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as pool:
        f_status = pool.submit(_status)
        f_ahead  = pool.submit(_ahead)
        f_behind = pool.submit(_behind)
        dirty, modified, untracked = f_status.result()
        ahead  = f_ahead.result()
        behind = f_behind.result()
    return {
        'branch': branch,
        'dirty': dirty,
        'modified': modified,
        'untracked': untracked,
        'ahead': ahead,
        'behind': behind,
        'is_git': True,
    }
