import re
import json
from copy import deepcopy
from functools import wraps
from enum import Enum, unique
from typing_extensions import Self
from typing import TypeVar, NamedTuple
from collections.abc import Callable, Awaitable

from yarl import URL
from nonebot import logger
from httpx import AsyncClient
from httpx import URL as HttpxURL
from pydantic import Field, BaseModel, ValidationError
from nonebot.compat import type_validate_json, type_validate_python

from nonebot_bison.post.post import Post
from nonebot_bison.compat import model_rebuild
from nonebot_bison.utils import text_similarity, decode_unicode_escapes
from nonebot_bison.types import Tag, Target, RawPost, ApiError, Category

from .scheduler import BilibiliSite, BililiveSite, BiliBangumiSite
from ..platform import NewMessage, StatusChange, CategoryNotSupport, CategoryNotRecognize
from .models import (
    PostAPI,
    UserAPI,
    PGCMajor,
    DrawMajor,
    LiveMajor,
    OPUSMajor,
    DynRawPost,
    VideoMajor,
    CommonMajor,
    DynamicType,
    ArticleMajor,
    CoursesMajor,
    UnknownMajor,
    LiveRecommendMajor,
)

B = TypeVar("B", bound="Bilibili")
MAX_352_RETRY_COUNT = 3


class ApiCode352Error(Exception):
    def __init__(self, url: HttpxURL) -> None:
        msg = f"api {url} error"
        super().__init__(msg)


def retry_for_352(func: Callable[[B, Target], Awaitable[list[DynRawPost]]]):
    retried_times = 0

    @wraps(func)
    async def wrapper(bls: B, *args, **kwargs):
        nonlocal retried_times
        try:
            res = await func(bls, *args, **kwargs)
        except ApiCode352Error as e:
            if retried_times < MAX_352_RETRY_COUNT:
                retried_times += 1
                logger.warning(f"获取动态列表失败,尝试刷新cookie: {retried_times}/{MAX_352_RETRY_COUNT}")
                await bls.ctx.refresh_client()
                return []  # 返回空列表
            else:
                raise ApiError(e.args[0])
        else:
            retried_times = 0
            return res

    return wrapper


class _ProcessedText(NamedTuple):
    title: str
    content: str


class _ParsedMojarPost(NamedTuple):
    title: str
    content: str
    pics: list[str]
    url: str | None = None


class Bilibili(NewMessage):
    categories = {
        1: "一般动态",
        2: "专栏文章",
        3: "视频",
        4: "纯文字",
        5: "转发",
        6: "直播推送",
        # 5: "短视频"
    }
    platform_name = "bilibili"
    enable_tag = True
    enabled = True
    is_common = True
    site = BilibiliSite
    name = "B站"
    has_target = True
    parse_target_promot = "请输入用户主页的链接"

    @classmethod
    async def get_target_name(cls, client: AsyncClient, target: Target) -> str | None:
        res = await client.get("https://api.bilibili.com/x/web-interface/card", params={"mid": target})
        res.raise_for_status()
        res_data = type_validate_json(UserAPI, res.content)
        if res_data.code != 0:
            return None
        return res_data.data.card.name if res_data.data else None

    @classmethod
    async def parse_target(cls, target_text: str) -> Target:
        if re.match(r"\d+", target_text):
            return Target(target_text)
        elif re.match(r"UID:(\d+)", target_text):
            return Target(target_text[4:])
        elif m := re.match(r"(?:https?://)?space\.bilibili\.com/(\d+)", target_text):
            return Target(m.group(1))
        else:
            raise cls.ParseTargetException(
                prompt="正确格式:\n1. 用户纯数字id\n2. UID:<用户id>\n3. 用户主页链接: https://space.bilibili.com/xxxx"
            )

    @retry_for_352
    async def get_sub_list(self, target: Target) -> list[DynRawPost]:
        client = await self.ctx.get_client(target)
        params = {"host_mid": target, "timezone_offset": -480, "offset": ""}
        res = await client.get(
            "https://api.bilibili.com/x/polymer/web-dynamic/v1/feed/space",
            params=params,
            timeout=4.0,
        )
        res.raise_for_status()
        try:
            res_obj = type_validate_json(PostAPI, res.content)
        except ValidationError as e:
            logger.exception("解析B站动态列表失败")
            logger.error(res.json())
            raise ApiError(res.request.url) from e

        # 0: 成功
        # -352: 需要cookie
        if res_obj.code == 0:
            if (data := res_obj.data) and (items := data.items):
                logger.trace(f"获取用户{target}的动态列表成功,共{len(items)}条动态")
                logger.trace(f"用户{target}的动态列表: {':'.join(x.id_str or x.basic.rid_str for x in items)}")
                return [item for item in items if item.type != "DYNAMIC_TYPE_NONE"]

            logger.trace(f"获取用户{target}的动态列表成功,但是没有动态")
            return []
        elif res_obj.code == -352:
            raise ApiCode352Error(res.request.url)
        else:
            raise ApiError(res.request.url)

    def get_id(self, post: DynRawPost) -> str:
        return post.id_str

    def get_date(self, post: DynRawPost) -> int:
        return post.modules.module_author.pub_ts

    def _do_get_category(self, post_type: DynamicType) -> Category:
        match post_type:
            case "DYNAMIC_TYPE_DRAW" | "DYNAMIC_TYPE_COMMON_VERTICAL" | "DYNAMIC_TYPE_COMMON_SQUARE":
                return Category(1)
            case "DYNAMIC_TYPE_ARTICLE":
                return Category(2)
            case "DYNAMIC_TYPE_AV":
                return Category(3)
            case "DYNAMIC_TYPE_WORD":
                return Category(4)
            case "DYNAMIC_TYPE_FORWARD":
                # 转发
                return Category(5)
            case "DYNAMIC_TYPE_LIVE_RCMD" | "DYNAMIC_TYPE_LIVE":
                return Category(6)
            case unknown_type:
                raise CategoryNotRecognize(unknown_type)

    def get_category(self, post: DynRawPost) -> Category:
        post_type = post.type
        return self._do_get_category(post_type)

    def get_tags(self, raw_post: DynRawPost) -> list[Tag]:
        tags: list[Tag] = []
        if raw_post.topic:
            tags.append(raw_post.topic.name)
        if desc := raw_post.modules.module_dynamic.desc:
            for node in desc.rich_text_nodes:
                if (node_type := node.get("type", None)) and node_type == "RICH_TEXT_NODE_TYPE_TOPIC":
                    tags.append(node["text"].strip("#"))
        return tags

    def _text_process(self, dynamic: str, desc: str, title: str) -> _ProcessedText:

        # 计算视频标题和视频描述相似度
        title_similarity = 0.0 if len(title) == 0 or len(desc) == 0 else text_similarity(title, desc[: len(title)])
        if title_similarity > 0.9:
            desc = desc[len(title) :].lstrip()
        # 计算视频描述和动态描述相似度
        content_similarity = 0.0 if len(dynamic) == 0 or len(desc) == 0 else text_similarity(dynamic, desc)
        if content_similarity > 0.8:
            return _ProcessedText(title, desc if len(dynamic) < len(desc) else dynamic)  # 选择较长的描述
        else:
            return _ProcessedText(title, f"{desc}" + (f"\n=================\n{dynamic}" if dynamic else ""))

    def pre_parse_by_mojar(self, raw_post: DynRawPost) -> _ParsedMojarPost:
        dyn = raw_post.modules.module_dynamic

        match raw_post.modules.module_dynamic.major:
            case VideoMajor(archive=archive):
                desc_text = dyn.desc.text if dyn.desc else ""
                parsed = self._text_process(desc_text, archive.desc, archive.title)
                return _ParsedMojarPost(
                    title=parsed.title,
                    content=parsed.content,
                    pics=[archive.cover],
                    url=URL(archive.jump_url).with_scheme("https").human_repr(),
                )
            case LiveRecommendMajor(live_rcmd=live_rcmd):
                live_play_info = type_validate_json(LiveRecommendMajor.Content, live_rcmd.content).live_play_info
                return _ParsedMojarPost(
                    title=live_play_info.title,
                    content=f"{live_play_info.parent_area_name} {live_play_info.area_name}",
                    pics=[live_play_info.cover],
                    url=URL(live_play_info.link).with_scheme("https").with_query(None).human_repr(),
                )
            case LiveMajor(live=live):
                return _ParsedMojarPost(
                    title=live.title,
                    content=f"{live.desc_first}\n{live.desc_second}",
                    pics=[live.cover],
                    url=URL(live.jump_url).with_scheme("https").human_repr(),
                )
            case ArticleMajor(article=article):
                return _ParsedMojarPost(
                    title=article.title,
                    content=article.desc,
                    pics=article.covers,
                    url=URL(article.jump_url).with_scheme("https").human_repr(),
                )
            case DrawMajor(draw=draw):
                return _ParsedMojarPost(
                    title="",
                    content=dyn.desc.text if dyn.desc else "",
                    pics=[item.src for item in draw.items],
                    url=f"https://t.bilibili.com/{raw_post.id_str}",
                )
            case PGCMajor(pgc=pgc):
                return _ParsedMojarPost(
                    title=pgc.title,
                    content="",
                    pics=[pgc.cover],
                    url=URL(pgc.jump_url).with_scheme("https").human_repr(),
                )
            case OPUSMajor(opus=opus):
                return _ParsedMojarPost(
                    title=opus.title,
                    content=opus.summary.text,
                    pics=[pic.url for pic in opus.pics],
                    url=URL(opus.jump_url).with_scheme("https").human_repr(),
                )
            case CommonMajor(common=common):
                return _ParsedMojarPost(
                    title=common.title,
                    content=common.desc,
                    pics=[common.cover],
                    url=URL(common.jump_url).with_scheme("https").human_repr(),
                )
            case CoursesMajor(courses=courses):
                return _ParsedMojarPost(
                    title=courses.title,
                    content=f"{courses.sub_title}\n{courses.desc}",
                    pics=[courses.cover],
                    url=URL(courses.jump_url).with_scheme("https").human_repr(),
                )
            case UnknownMajor(type=unknown_type):
                raise CategoryNotSupport(unknown_type)
            case None:  # 没有major的情况
                return _ParsedMojarPost(
                    title="",
                    content=dyn.desc.text if dyn.desc else "",
                    pics=[],
                    url=f"https://t.bilibili.com/{raw_post.id_str}",
                )
            case _:
                raise CategoryNotSupport(f"{raw_post.id_str=}")

    async def parse(self, raw_post: DynRawPost) -> Post:
        parsed_raw_post = self.pre_parse_by_mojar(raw_post)
        parsed_raw_repost = None
        if self._do_get_category(raw_post.type) == Category(5):
            if raw_post.orig:
                parsed_raw_repost = self.pre_parse_by_mojar(raw_post.orig)
            else:
                logger.warning(f"转发动态{raw_post.id_str}没有原动态")

        post = Post(
            self,
            content=decode_unicode_escapes(parsed_raw_post.content),
            title=parsed_raw_post.title,
            images=list(parsed_raw_post.pics),
            timestamp=self.get_date(raw_post),
            url=parsed_raw_post.url,
            avatar=raw_post.modules.module_author.face,
            nickname=raw_post.modules.module_author.name,
        )
        if parsed_raw_repost:
            orig = raw_post.orig
            assert orig
            post.repost = Post(
                self,
                content=decode_unicode_escapes(parsed_raw_repost.content),
                title=parsed_raw_repost.title,
                images=list(parsed_raw_repost.pics),
                timestamp=self.get_date(orig),
                url=parsed_raw_repost.url,
                avatar=orig.modules.module_author.face,
                nickname=orig.modules.module_author.name,
            )
        return post


class Bilibililive(StatusChange):
    categories = {1: "开播提醒", 2: "标题更新提醒", 3: "下播提醒"}
    platform_name = "bilibili-live"
    enable_tag = False
    enabled = True
    is_common = True
    site = BililiveSite
    name = "Bilibili直播"
    has_target = True
    use_batch = True
    default_theme = "brief"

    @unique
    class LiveStatus(Enum):
        # 直播状态
        # 0: 未开播
        # 1: 正在直播
        # 2: 轮播中
        OFF = 0
        ON = 1
        CYCLE = 2

    @unique
    class LiveAction(Enum):
        # 当前直播行为,由新旧直播状态对比决定
        # on: 正在直播
        # off: 未开播
        # turn_on: 状态变更为正在直播
        # turn_off: 状态变更为未开播
        # title_update: 标题更新
        TURN_ON = "turn_on"
        TURN_OFF = "turn_off"
        ON = "on"
        OFF = "off"
        TITLE_UPDATE = "title_update"

    class Info(BaseModel):
        title: str
        room_id: int  # 直播间号
        uid: int  # 主播uid
        live_time: int  # 开播时间
        live_status: "Bilibililive.LiveStatus"
        area_name: str = Field(alias="area_v2_name")  # 新版分区名
        uname: str  # 主播名
        face: str  # 头像url
        cover: str = Field(alias="cover_from_user")  # 封面url
        keyframe: str  # 关键帧url,可能会有延迟
        category: Category = Field(default=Category(0))

        def get_live_action(self, old_info: Self) -> "Bilibililive.LiveAction":
            status = Bilibililive.LiveStatus
            action = Bilibililive.LiveAction
            if old_info.live_status in [status.OFF, status.CYCLE] and self.live_status == status.ON:
                return action.TURN_ON
            elif old_info.live_status == status.ON and self.live_status in [
                status.OFF,
                status.CYCLE,
            ]:
                return action.TURN_OFF
            elif old_info.live_status == status.ON and self.live_status == status.ON:
                if old_info.title != self.title:
                    # 开播时通常会改标题,避免短时间推送两次
                    return action.TITLE_UPDATE
                else:
                    return action.ON
            else:
                return action.OFF

    @classmethod
    async def get_target_name(cls, client: AsyncClient, target: Target) -> str | None:
        res = await client.get("https://api.bilibili.com/x/web-interface/card", params={"mid": target})
        res_data = json.loads(res.text)
        if res_data["code"]:
            return None
        return res_data["data"]["card"]["name"]

    def _gen_empty_info(self, uid: int) -> Info:
        """返回一个空的Info,用于该用户没有直播间的情况"""
        return Bilibililive.Info(
            title="",
            room_id=0,
            uid=uid,
            live_time=0,
            live_status=Bilibililive.LiveStatus.OFF,
            area_v2_name="",
            uname="",
            face="",
            cover_from_user="",
            keyframe="",
        )

    async def batch_get_status(self, targets: list[Target]) -> list[Info]:
        client = await self.ctx.get_client()
        # https://github.com/SocialSisterYi/bilibili-API-collect/blob/master/docs/live/info.md#批量查询直播间状态
        res = await client.get(
            "https://api.live.bilibili.com/room/v1/Room/get_status_info_by_uids",
            params={"uids[]": targets},
            timeout=4.0,
        )
        res_dict = res.json()

        if res_dict["code"] != 0:
            raise self.FetchError()

        data = res_dict.get("data", {})
        infos = []
        for target in targets:
            if target in data.keys():
                infos.append(type_validate_python(self.Info, data[target]))
            else:
                infos.append(self._gen_empty_info(int(target)))
        return infos

    def compare_status(self, _: Target, old_status: Info, new_status: Info) -> list[RawPost]:
        action = Bilibililive.LiveAction
        match new_status.get_live_action(old_status):
            case action.TURN_ON:
                return self._gen_current_status(new_status, 1)
            case action.TITLE_UPDATE:
                return self._gen_current_status(new_status, 2)
            case action.TURN_OFF:
                return self._gen_current_status(new_status, 3)
            case _:
                return []

    def _gen_current_status(self, new_status: Info, category: Category):
        current_status = deepcopy(new_status)
        current_status.category = Category(category)
        return [current_status]

    def get_category(self, status: Info) -> Category:
        assert status.category != Category(0)
        return status.category

    async def parse(self, raw_post: Info) -> Post:
        url = f"https://live.bilibili.com/{raw_post.room_id}"
        pic = [raw_post.cover] if raw_post.category == Category(1) else [raw_post.keyframe]
        title = f"[{self.categories[raw_post.category].rstrip('提醒')}] {raw_post.title}"
        target_name = f"{raw_post.uname} {raw_post.area_name}"
        return Post(
            self,
            "",
            title=title,
            url=url,
            images=list(pic),
            nickname=target_name,
            compress=True,
        )


class BilibiliBangumi(StatusChange):
    categories = {}
    platform_name = "bilibili-bangumi"
    enable_tag = False
    enabled = True
    is_common = True
    site = BiliBangumiSite
    name = "Bilibili剧集"
    has_target = True
    parse_target_promot = "请输入剧集主页"
    default_theme = "brief"

    _url = "https://api.bilibili.com/pgc/review/user"

    @classmethod
    async def get_target_name(cls, client: AsyncClient, target: Target) -> str | None:
        res = await client.get(cls._url, params={"media_id": target})
        res_data = res.json()
        if res_data["code"]:
            return None
        return res_data["result"]["media"]["title"]

    @classmethod
    async def parse_target(cls, target_string: str) -> Target:
        if re.match(r"\d+", target_string):
            return Target(target_string)
        elif m := re.match(r"md(\d+)", target_string):
            return Target(m[1])
        elif m := re.match(r"(?:https?://)?www\.bilibili\.com/bangumi/media/md(\d+)", target_string):
            return Target(m[1])
        raise cls.ParseTargetException(
            prompt="正确格式:\n1. 剧集id\n2. 剧集主页链接 https://www.bilibili.com/bangumi/media/mdxxxx"
        )

    async def get_status(self, target: Target):
        client = await self.ctx.get_client()
        res = await client.get(
            self._url,
            params={"media_id": target},
            timeout=4.0,
        )
        res_dict = res.json()
        if res_dict["code"] == 0:
            return {
                "index": res_dict["result"]["media"]["new_ep"]["index"],
                "index_show": res_dict["result"]["media"]["new_ep"]["index_show"],
                "season_id": res_dict["result"]["media"]["season_id"],
            }
        else:
            raise self.FetchError

    def compare_status(self, target: Target, old_status, new_status) -> list[RawPost]:
        if new_status["index"] != old_status["index"]:
            return [new_status]
        else:
            return []

    async def parse(self, raw_post: RawPost) -> Post:
        client = await self.ctx.get_client()
        detail_res = await client.get(f'https://api.bilibili.com/pgc/view/web/season?season_id={raw_post["season_id"]}')
        detail_dict = detail_res.json()
        lastest_episode = None
        for episode in detail_dict["result"]["episodes"][::-1]:
            if episode["badge"] in ("", "会员"):
                lastest_episode = episode
                break
        if not lastest_episode:
            lastest_episode = detail_dict["result"]["episodes"]

        url = lastest_episode["link"]
        pic: list[str] = [lastest_episode["cover"]]
        target_name = detail_dict["result"]["season_title"]
        content = raw_post["index_show"]
        title = lastest_episode["share_copy"]
        return Post(
            self,
            content,
            title=title,
            url=url,
            images=list(pic),
            nickname=target_name,
            compress=True,
        )


model_rebuild(Bilibililive.Info)