"""Regression tests for /api/models disk cache metadata."""

import json
import time

import api.config as config


def _reset_memory_cache() -> None:
    with config._available_models_cache_lock:
        config._available_models_cache = None
        config._available_models_cache_ts = 0.0
        config._cache_build_in_progress = False
        config._cache_build_cv.notify_all()


def test_save_models_cache_to_disk_preserves_response_metadata(tmp_path, monkeypatch):
    cache_path = tmp_path / "models_cache.json"
    monkeypatch.setattr(config, "_models_cache_path", cache_path)

    payload = {
        "active_provider": "openai",
        "default_model": "gpt-5.4-mini",
        "configured_model_badges": {},
        "groups": [
            {
                "provider": "OpenAI",
                "provider_id": "openai",
                "models": [{"id": "gpt-5.4-mini", "label": "GPT 5.4 Mini"}],
            }
        ],
    }

    config._save_models_cache_to_disk(payload)

    on_disk = json.loads(cache_path.read_text(encoding="utf-8"))
    # The four response-shape fields round-trip verbatim.
    for k, v in payload.items():
        assert on_disk[k] == v, f"Field {k!r} did not round-trip"
    # Plus the disk-only metadata stamps added by #1633 — present but not part
    # of the response payload.
    assert "_schema_version" in on_disk
    # _webui_version may be absent in early-init paths where api.updates isn't
    # yet imported; in normal test runs api.updates IS imported, so assert it.
    import sys
    if "api.updates" in sys.modules:
        assert on_disk.get("_webui_version") == sys.modules["api.updates"].WEBUI_VERSION

    # Load returns ONLY the response-shape fields (stamps stripped).
    loaded = config._load_models_cache_from_disk()
    assert loaded == payload


def test_load_models_cache_from_disk_rejects_legacy_groups_only_cache(tmp_path, monkeypatch):
    cache_path = tmp_path / "models_cache.json"
    monkeypatch.setattr(config, "_models_cache_path", cache_path)
    cache_path.write_text(
        json.dumps(
            {
                "groups": [
                    {
                        "provider": "Legacy",
                        "provider_id": "legacy",
                        "models": [{"id": "legacy-model", "label": "Legacy Model"}],
                    }
                ]
            }
        ),
        encoding="utf-8",
    )

    assert config._load_models_cache_from_disk() is None


def test_load_models_cache_from_disk_rejects_partial_metadata_cache(
    tmp_path,
    monkeypatch,
):
    cache_path = tmp_path / "models_cache.json"
    monkeypatch.setattr(config, "_models_cache_path", cache_path)

    valid_payload = {
        "active_provider": "openai",
        "default_model": "gpt-5.4-mini",
        "configured_model_badges": {},
        "groups": [
            {
                "provider": "OpenAI",
                "provider_id": "openai",
                "models": [{"id": "gpt-5.4-mini", "label": "GPT 5.4 Mini"}],
            }
        ],
    }

    invalid_payloads = [
        {key: value for key, value in valid_payload.items() if key != "active_provider"},
        {key: value for key, value in valid_payload.items() if key != "default_model"},
        {key: value for key, value in valid_payload.items() if key != "groups"},
        {key: value for key, value in valid_payload.items() if key != "configured_model_badges"},
        {**valid_payload, "active_provider": 123},
        {**valid_payload, "default_model": None},
        {**valid_payload, "groups": {}},
        {**valid_payload, "configured_model_badges": []},
    ]

    for payload in invalid_payloads:
        cache_path.write_text(json.dumps(payload), encoding="utf-8")
        assert config._load_models_cache_from_disk() is None


def test_get_available_models_ignores_invalid_ttl_memory_cache(monkeypatch):
    _reset_memory_cache()

    stale_cache = {
        "groups": [
            {
                "provider": "Stale",
                "provider_id": "stale",
                "models": [{"id": "stale-model", "label": "Stale Model"}],
            }
        ]
    }

    saved_mtime = config._cfg_mtime
    try:
        with config._available_models_cache_lock:
            config._available_models_cache = stale_cache
            config._available_models_cache_ts = time.monotonic()

        try:
            config._cfg_mtime = config.Path(config._get_config_path()).stat().st_mtime
        except OSError:
            config._cfg_mtime = 0.0

        result = config.get_available_models()
    finally:
        config._cfg_mtime = saved_mtime
        _reset_memory_cache()

    assert "active_provider" in result
    assert "default_model" in result
    assert "groups" in result
    assert not any(group.get("provider") == "Stale" for group in result["groups"])


def test_get_available_models_does_not_use_disk_cache_after_config_mtime_change(
    tmp_path,
    monkeypatch,
):
    cache_path = tmp_path / "models_cache.json"
    monkeypatch.setattr(config, "_models_cache_path", cache_path)
    cache_path.write_text(
        json.dumps(
            {
                "active_provider": "stale-provider",
                "default_model": "stale-model",
                "groups": [
                    {
                        "provider": "Stale",
                        "provider_id": "stale",
                        "models": [{"id": "stale-model", "label": "Stale Model"}],
                    }
                ],
            }
        ),
        encoding="utf-8",
    )
    _reset_memory_cache()

    saved_mtime = config._cfg_mtime
    try:
        config._cfg_mtime = -1.0
        result = config.get_available_models()
    finally:
        config._cfg_mtime = saved_mtime
        _reset_memory_cache()

    assert result["active_provider"] != "stale-provider"
    assert result["default_model"] != "stale-model"
    assert not any(group.get("provider") == "Stale" for group in result["groups"])

    written = json.loads(cache_path.read_text(encoding="utf-8"))
    assert written["active_provider"] != "stale-provider"
    assert written["default_model"] != "stale-model"
    assert not any(group.get("provider") == "Stale" for group in written["groups"])


def test_get_available_models_ignores_legacy_disk_cache_and_rebuilds(
    tmp_path,
    monkeypatch,
):
    cache_path = tmp_path / "models_cache.json"
    monkeypatch.setattr(config, "_models_cache_path", cache_path)
    cache_path.write_text(
        json.dumps(
            {
                "groups": [
                    {
                        "provider": "Legacy",
                        "provider_id": "legacy",
                        "models": [{"id": "legacy-model", "label": "Legacy Model"}],
                    }
                ]
            }
        ),
        encoding="utf-8",
    )
    _reset_memory_cache()

    saved_mtime = config._cfg_mtime
    try:
        try:
            config._cfg_mtime = config.Path(config._get_config_path()).stat().st_mtime
        except OSError:
            config._cfg_mtime = 0.0

        result = config.get_available_models()
    finally:
        config._cfg_mtime = saved_mtime
        _reset_memory_cache()

    assert "active_provider" in result
    assert "default_model" in result
    assert "groups" in result
    assert not any(group.get("provider") == "Legacy" for group in result["groups"])

    written = json.loads(cache_path.read_text(encoding="utf-8"))
    assert "active_provider" in written
    assert "default_model" in written
    assert "groups" in written
