"""
Sprint 43 Tests: Bandit security fixes — B310, B324, B110 + QuietHTTPServer (PR #354).

Covers:
- gateway_watcher.py: MD5 uses usedforsecurity=False (B324)
- config.py: URL scheme validation before urlopen (B310)
- bootstrap.py: URL scheme validation in wait_for_health (B310)
- server.py: QuietHTTPServer class exists and extends ThreadingHTTPServer
- server.py: QuietHTTPServer.handle_error suppresses client disconnect errors
- server.py: QuietHTTPServer uses sys.exc_info() not traceback.sys.exc_info()
- Logging: at least 5 modules add a module-level logger (B110 remediation)
- routes.py: session titles redacted in /api/sessions list response
"""
import ast
import pathlib
import re
import sys
import unittest

REPO_ROOT = pathlib.Path(__file__).parent.parent
GATEWAY_WATCHER_PY = (REPO_ROOT / "api" / "gateway_watcher.py").read_text()
CONFIG_PY = (REPO_ROOT / "api" / "config.py").read_text()
BOOTSTRAP_PY = (REPO_ROOT / "bootstrap.py").read_text()
SERVER_PY = (REPO_ROOT / "server.py").read_text()
ROUTES_PY = (REPO_ROOT / "api" / "routes.py").read_text()
AUTH_PY = (REPO_ROOT / "api" / "auth.py").read_text()
PROFILES_PY = (REPO_ROOT / "api" / "profiles.py").read_text()
STREAMING_PY = (REPO_ROOT / "api" / "streaming.py").read_text()
WORKSPACE_PY = (REPO_ROOT / "api" / "workspace.py").read_text()
STATE_SYNC_PY = (REPO_ROOT / "api" / "state_sync.py").read_text()


# ── B324: MD5 usedforsecurity=False ─────────────────────────────────────────

class TestMD5SecurityFix(unittest.TestCase):
    """B324: hashlib.md5 must use usedforsecurity=False for non-crypto hashes."""

    def test_gateway_watcher_md5_usedforsecurity_false(self):
        """_snapshot_hash must pass usedforsecurity=False to hashlib.md5 (PR #354)."""
        self.assertIn(
            "usedforsecurity=False",
            GATEWAY_WATCHER_PY,
            "gateway_watcher.py: MD5 must use usedforsecurity=False (B324)",
        )

    def test_gateway_watcher_md5_pattern(self):
        """Exact pattern: hashlib.md5(..., usedforsecurity=False)."""
        # Use re.search with DOTALL since the arg may span parens internally
        import re
        self.assertIsNotNone(
            re.search(r"hashlib\.md5\(.*?usedforsecurity=False\)", GATEWAY_WATCHER_PY, re.DOTALL),
            "MD5 call must include usedforsecurity=False kwarg",
        )


# ── B310: URL scheme validation ──────────────────────────────────────────────

class TestUrlSchemeValidation(unittest.TestCase):
    """B310: urllib.request.urlopen must not be called with arbitrary schemes."""

    def test_config_scheme_validation_present(self):
        """config.py must validate URL scheme before urlopen (B310 fix)."""
        self.assertIn(
            "parsed_url.scheme",
            CONFIG_PY,
            "config.py: URL scheme validation missing (B310)",
        )
        # Must check against allowed schemes
        self.assertRegex(
            CONFIG_PY,
            r'parsed_url\.scheme\s+not\s+in\s+\(',
            "config.py: scheme check must use 'not in (...)' pattern",
        )

    def test_config_urlopen_has_nosec(self):
        """The urlopen call in config.py must have a # nosec B310 comment."""
        self.assertIn(
            "nosec B310",
            CONFIG_PY,
            "config.py: urlopen must have # nosec B310 after scheme validation",
        )

    def test_bootstrap_scheme_validation_present(self):
        """bootstrap.py wait_for_health must validate URL scheme before urlopen."""
        self.assertIn(
            "Invalid health check URL",
            BOOTSTRAP_PY,
            "bootstrap.py: URL scheme validation missing in wait_for_health (B310)",
        )
        self.assertRegex(
            BOOTSTRAP_PY,
            r'url\.startswith\([^)]+http',
            "bootstrap.py: must check url starts with http:// or https://",
        )

    def test_bootstrap_urlopen_has_nosec(self):
        """The urlopen call in bootstrap.py must have a # nosec B310 comment."""
        self.assertIn(
            "nosec B310",
            BOOTSTRAP_PY,
            "bootstrap.py: urlopen must have # nosec B310 after scheme validation",
        )

    def test_config_allows_http_and_https(self):
        """config.py scheme check must permit both http and https."""
        self.assertIn('"http"', CONFIG_PY, "config.py: http must be in allowed schemes")
        self.assertIn('"https"', CONFIG_PY, "config.py: https must be in allowed schemes")


# ── B110: Bare except/pass → logger.debug() ─────────────────────────────────

class TestBareExceptLogging(unittest.TestCase):
    """B110: bare except/pass blocks must be replaced with logger.debug()."""

    MODULES_REQUIRING_LOGGER = [
        ("api/auth.py", AUTH_PY),
        ("api/config.py", CONFIG_PY),
        ("api/gateway_watcher.py", GATEWAY_WATCHER_PY),
        ("api/profiles.py", PROFILES_PY),
        ("api/streaming.py", STREAMING_PY),
        ("api/workspace.py", WORKSPACE_PY),
        ("api/state_sync.py", STATE_SYNC_PY),
        ("api/routes.py", ROUTES_PY),
    ]

    def test_module_level_loggers_present(self):
        """All fixed modules must have a module-level logger = logging.getLogger(__name__)."""
        for name, src in self.MODULES_REQUIRING_LOGGER:
            with self.subTest(module=name):
                self.assertIn(
                    "logger = logging.getLogger(__name__)",
                    src,
                    f"{name}: module-level logger missing (B110 fix requires logger)",
                )

    def test_gateway_watcher_no_bare_pass_in_except(self):
        """gateway_watcher.py critical except blocks must not use bare pass."""
        # The poll loop except block that previously had 'pass' must now use logger
        self.assertIn(
            "logger.debug",
            GATEWAY_WATCHER_PY,
            "gateway_watcher.py: must use logger.debug not bare pass (B110)",
        )

    def test_profiles_reload_dotenv_logs_on_error(self):
        """profiles.py _reload_dotenv except must log + reset _loaded_profile_env_keys."""
        # Both the reset and the debug log should be present in the except block
        self.assertIn(
            "_loaded_profile_env_keys = set()",
            PROFILES_PY,
            "profiles.py: _reload_dotenv except must reset _loaded_profile_env_keys",
        )
        self.assertIn(
            "Failed to reload dotenv",
            PROFILES_PY,
            "profiles.py: _reload_dotenv except must log a warning",
        )


# ── QuietHTTPServer ──────────────────────────────────────────────────────────

class TestQuietHTTPServer(unittest.TestCase):
    """server.py: QuietHTTPServer suppresses client disconnect noise."""

    def test_quiet_http_server_class_exists(self):
        """QuietHTTPServer must be defined in server.py."""
        self.assertIn(
            "class QuietHTTPServer",
            SERVER_PY,
            "server.py: QuietHTTPServer class missing (PR #354)",
        )

    def test_quiet_http_server_extends_threading_http_server(self):
        """QuietHTTPServer must extend ThreadingHTTPServer."""
        self.assertRegex(
            SERVER_PY,
            r"class QuietHTTPServer\(ThreadingHTTPServer\)",
            "QuietHTTPServer must extend ThreadingHTTPServer",
        )

    def test_quiet_http_server_used_as_server(self):
        """main() must instantiate QuietHTTPServer not raw ThreadingHTTPServer."""
        # After the class is defined, the server creation should use QuietHTTPServer
        after_class = SERVER_PY[SERVER_PY.find("class QuietHTTPServer"):]
        self.assertIn(
            "QuietHTTPServer(",
            after_class,
            "main() must use QuietHTTPServer, not ThreadingHTTPServer directly",
        )

    def test_handle_error_suppresses_connection_reset(self):
        """handle_error must suppress ConnectionResetError and BrokenPipeError."""
        self.assertIn(
            "ConnectionResetError",
            SERVER_PY,
            "QuietHTTPServer.handle_error must handle ConnectionResetError",
        )
        self.assertIn(
            "BrokenPipeError",
            SERVER_PY,
            "QuietHTTPServer.handle_error must handle BrokenPipeError",
        )

    def test_uses_sys_exc_info_not_traceback_sys(self):
        """handle_error must use sys.exc_info() not traceback.sys.exc_info() (implementation detail)."""
        self.assertNotIn(
            "traceback.sys.exc_info()",
            SERVER_PY,
            "server.py: must use sys.exc_info() not traceback.sys.exc_info()",
        )
        self.assertIn(
            "sys.exc_info()",
            SERVER_PY,
            "server.py: handle_error must call sys.exc_info()",
        )

    def test_sys_imported_in_server(self):
        """server.py must import sys (needed for sys.exc_info)."""
        import re
        self.assertIsNotNone(
            re.search(r"^import sys", SERVER_PY, re.MULTILINE),
            "server.py: sys must be imported",
        )

    def test_handle_error_calls_super(self):
        """handle_error must call super().handle_error for non-client-disconnect errors."""
        self.assertIn(
            "super().handle_error(request, client_address)",
            SERVER_PY,
            "QuietHTTPServer.handle_error must delegate to super for real errors",
        )


# ── Session title redaction in /api/sessions ────────────────────────────────

class TestSessionTitleRedaction(unittest.TestCase):
    """routes.py: session titles must be redacted in the sessions list endpoint."""

    def test_redact_text_called_on_session_titles(self):
        """routes.py must call _redact_text on session titles in /api/sessions."""
        self.assertRegex(
            ROUTES_PY,
            r'_redact_text\([^)]*\btitle\b[^)]*\)',
            "routes.py: session titles must be redacted via _redact_text in /api/sessions",
        )

    def test_redact_text_imported_in_routes(self):
        """routes.py must import _redact_text from api.helpers."""
        self.assertIn(
            "_redact_text",
            ROUTES_PY,
            "routes.py: _redact_text must be imported from api.helpers",
        )
