import re
import json
from copy import deepcopy
from enum import Enum, unique
from typing import NamedTuple
from typing_extensions import Self

from yarl import URL
from nonebot import logger
from httpx import AsyncClient
from pydantic import Field, BaseModel, ValidationError
from nonebot.compat import type_validate_json, type_validate_python

from nonebot_bison.post.post import Post
from nonebot_bison.compat import model_rebuild
from nonebot_bison.utils import text_similarity, decode_unicode_escapes
from nonebot_bison.types import Tag, Target, RawPost, ApiError, Category

from .retry import ApiCode352Error, retry_for_352
from .scheduler import BilibiliSite, BililiveSite, BiliBangumiSite
from ..platform import NewMessage, StatusChange, CategoryNotSupport, CategoryNotRecognize
from .models import (
    PostAPI,
    UserAPI,
    PGCMajor,
    DrawMajor,
    LiveMajor,
    OPUSMajor,
    DynRawPost,
    VideoMajor,
    CommonMajor,
    DynamicType,
    ArticleMajor,
    CoursesMajor,
    UnknownMajor,
    LiveRecommendMajor,
)


class _ProcessedText(NamedTuple):
    title: str
    content: str


class _ParsedMojarPost(NamedTuple):
    title: str
    content: str
    pics: list[str]
    url: str | None = None


class Bilibili(NewMessage):
    categories = {
        1: "一般动态",
        2: "专栏文章",
        3: "视频",
        4: "纯文字",
        5: "转发",
        6: "直播推送",
        # 5: "短视频"
    }
    platform_name = "bilibili"
    enable_tag = True
    enabled = True
    is_common = True
    site = BilibiliSite
    name = "B站"
    has_target = True
    parse_target_promot = "请输入用户主页的链接"

    @classmethod
    async def get_target_name(cls, client: AsyncClient, target: Target) -> str | None:
        res = await client.get("https://api.bilibili.com/x/web-interface/card", params={"mid": target})
        res.raise_for_status()
        res_data = type_validate_json(UserAPI, res.content)
        if res_data.code != 0:
            return None
        return res_data.data.card.name if res_data.data else None

    @classmethod
    async def parse_target(cls, target_text: str) -> Target:
        if re.match(r"\d+", target_text):
            return Target(target_text)
        elif re.match(r"UID:(\d+)", target_text):
            return Target(target_text[4:])
        elif m := re.match(r"(?:https?://)?space\.bilibili\.com/(\d+)", target_text):
            return Target(m.group(1))
        else:
            raise cls.ParseTargetException(
                prompt="正确格式:\n1. 用户纯数字id\n2. UID:<用户id>\n3. 用户主页链接: https://space.bilibili.com/xxxx"
            )

    @retry_for_352
    async def get_sub_list(self, target: Target) -> list[DynRawPost]:
        client = await self.ctx.get_client(target)
        params = {"host_mid": target, "timezone_offset": -480, "offset": ""}
        res = await client.get(
            "https://api.bilibili.com/x/polymer/web-dynamic/v1/feed/space",
            params=params,
            timeout=4.0,
        )
        res.raise_for_status()
        try:
            res_obj = type_validate_json(PostAPI, res.content)
        except ValidationError as e:
            logger.exception("解析B站动态列表失败")
            logger.error(res.json())
            raise ApiError(res.request.url) from e

        # 0: 成功
        # -352: 需要cookie
        if res_obj.code == 0:
            if (data := res_obj.data) and (items := data.items):
                logger.trace(f"获取用户{target}的动态列表成功,共{len(items)}条动态")
                logger.trace(f"用户{target}的动态列表: {':'.join(x.id_str or x.basic.rid_str for x in items)}")
                return [item for item in items if item.type != "DYNAMIC_TYPE_NONE"]

            logger.trace(f"获取用户{target}的动态列表成功,但是没有动态")
            return []
        elif res_obj.code == -352:
            raise ApiCode352Error(res.request.url)
        else:
            raise ApiError(res.request.url)

    def get_id(self, post: DynRawPost) -> str:
        return post.id_str

    def get_date(self, post: DynRawPost) -> int:
        return post.modules.module_author.pub_ts

    def _do_get_category(self, post_type: DynamicType) -> Category:
        match post_type:
            case "DYNAMIC_TYPE_DRAW" | "DYNAMIC_TYPE_COMMON_VERTICAL" | "DYNAMIC_TYPE_COMMON_SQUARE":
                return Category(1)
            case "DYNAMIC_TYPE_ARTICLE":
                return Category(2)
            case "DYNAMIC_TYPE_AV":
                return Category(3)
            case "DYNAMIC_TYPE_WORD":
                return Category(4)
            case "DYNAMIC_TYPE_FORWARD":
                # 转发
                return Category(5)
            case "DYNAMIC_TYPE_LIVE_RCMD" | "DYNAMIC_TYPE_LIVE":
                return Category(6)
            case unknown_type:
                raise CategoryNotRecognize(unknown_type)

    def get_category(self, post: DynRawPost) -> Category:
        post_type = post.type
        return self._do_get_category(post_type)

    def get_tags(self, raw_post: DynRawPost) -> list[Tag]:
        tags: list[Tag] = []
        if raw_post.topic:
            tags.append(raw_post.topic.name)
        if desc := raw_post.modules.module_dynamic.desc:
            for node in desc.rich_text_nodes:
                if (node_type := node.get("type", None)) and node_type == "RICH_TEXT_NODE_TYPE_TOPIC":
                    tags.append(node["text"].strip("#"))
        return tags

    def _text_process(self, dynamic: str, desc: str, title: str) -> _ProcessedText:

        # 计算视频标题和视频描述相似度
        title_similarity = 0.0 if len(title) == 0 or len(desc) == 0 else text_similarity(title, desc[: len(title)])
        if title_similarity > 0.9:
            desc = desc[len(title) :].lstrip()
        # 计算视频描述和动态描述相似度
        content_similarity = 0.0 if len(dynamic) == 0 or len(desc) == 0 else text_similarity(dynamic, desc)
        if content_similarity > 0.8:
            return _ProcessedText(title, desc if len(dynamic) < len(desc) else dynamic)  # 选择较长的描述
        else:
            return _ProcessedText(title, f"{desc}" + (f"\n=================\n{dynamic}" if dynamic else ""))

    def pre_parse_by_mojar(self, raw_post: DynRawPost) -> _ParsedMojarPost:
        dyn = raw_post.modules.module_dynamic

        match raw_post.modules.module_dynamic.major:
            case VideoMajor(archive=archive):
                desc_text = dyn.desc.text if dyn.desc else ""
                parsed = self._text_process(desc_text, archive.desc, archive.title)
                return _ParsedMojarPost(
                    title=parsed.title,
                    content=parsed.content,
                    pics=[archive.cover],
                    url=URL(archive.jump_url).with_scheme("https").human_repr(),
                )
            case LiveRecommendMajor(live_rcmd=live_rcmd):
                live_play_info = type_validate_json(LiveRecommendMajor.Content, live_rcmd.content).live_play_info
                return _ParsedMojarPost(
                    title=live_play_info.title,
                    content=f"{live_play_info.parent_area_name} {live_play_info.area_name}",
                    pics=[live_play_info.cover],
                    url=URL(live_play_info.link).with_scheme("https").with_query(None).human_repr(),
                )
            case LiveMajor(live=live):
                return _ParsedMojarPost(
                    title=live.title,
                    content=f"{live.desc_first}\n{live.desc_second}",
                    pics=[live.cover],
                    url=URL(live.jump_url).with_scheme("https").human_repr(),
                )
            case ArticleMajor(article=article):
                return _ParsedMojarPost(
                    title=article.title,
                    content=article.desc,
                    pics=article.covers,
                    url=URL(article.jump_url).with_scheme("https").human_repr(),
                )
            case DrawMajor(draw=draw):
                return _ParsedMojarPost(
                    title="",
                    content=dyn.desc.text if dyn.desc else "",
                    pics=[item.src for item in draw.items],
                    url=f"https://t.bilibili.com/{raw_post.id_str}",
                )
            case PGCMajor(pgc=pgc):
                return _ParsedMojarPost(
                    title=pgc.title,
                    content="",
                    pics=[pgc.cover],
                    url=URL(pgc.jump_url).with_scheme("https").human_repr(),
                )
            case OPUSMajor(opus=opus):
                return _ParsedMojarPost(
                    title=opus.title,
                    content=opus.summary.text,
                    pics=[pic.url for pic in opus.pics],
                    url=URL(opus.jump_url).with_scheme("https").human_repr(),
                )
            case CommonMajor(common=common):
                return _ParsedMojarPost(
                    title=common.title,
                    content=common.desc,
                    pics=[common.cover],
                    url=URL(common.jump_url).with_scheme("https").human_repr(),
                )
            case CoursesMajor(courses=courses):
                return _ParsedMojarPost(
                    title=courses.title,
                    content=f"{courses.sub_title}\n{courses.desc}",
                    pics=[courses.cover],
                    url=URL(courses.jump_url).with_scheme("https").human_repr(),
                )
            case UnknownMajor(type=unknown_type):
                raise CategoryNotSupport(unknown_type)
            case None:  # 没有major的情况
                return _ParsedMojarPost(
                    title="",
                    content=dyn.desc.text if dyn.desc else "",
                    pics=[],
                    url=f"https://t.bilibili.com/{raw_post.id_str}",
                )
            case _:
                raise CategoryNotSupport(f"{raw_post.id_str=}")

    async def parse(self, raw_post: DynRawPost) -> Post:
        parsed_raw_post = self.pre_parse_by_mojar(raw_post)
        parsed_raw_repost = None
        if self._do_get_category(raw_post.type) == Category(5):
            if raw_post.orig:
                parsed_raw_repost = self.pre_parse_by_mojar(raw_post.orig)
            else:
                logger.warning(f"转发动态{raw_post.id_str}没有原动态")

        post = Post(
            self,
            content=decode_unicode_escapes(parsed_raw_post.content),
            title=parsed_raw_post.title,
            images=list(parsed_raw_post.pics),
            timestamp=self.get_date(raw_post),
            url=parsed_raw_post.url,
            avatar=raw_post.modules.module_author.face,
            nickname=raw_post.modules.module_author.name,
        )
        if parsed_raw_repost:
            orig = raw_post.orig
            assert orig
            post.repost = Post(
                self,
                content=decode_unicode_escapes(parsed_raw_repost.content),
                title=parsed_raw_repost.title,
                images=list(parsed_raw_repost.pics),
                timestamp=self.get_date(orig),
                url=parsed_raw_repost.url,
                avatar=orig.modules.module_author.face,
                nickname=orig.modules.module_author.name,
            )
        return post


class Bilibililive(StatusChange):
    categories = {1: "开播提醒", 2: "标题更新提醒", 3: "下播提醒"}
    platform_name = "bilibili-live"
    enable_tag = False
    enabled = True
    is_common = True
    site = BililiveSite
    name = "Bilibili直播"
    has_target = True
    use_batch = True
    default_theme = "brief"

    @unique
    class LiveStatus(Enum):
        # 直播状态
        # 0: 未开播
        # 1: 正在直播
        # 2: 轮播中
        OFF = 0
        ON = 1
        CYCLE = 2

    @unique
    class LiveAction(Enum):
        # 当前直播行为,由新旧直播状态对比决定
        # on: 正在直播
        # off: 未开播
        # turn_on: 状态变更为正在直播
        # turn_off: 状态变更为未开播
        # title_update: 标题更新
        TURN_ON = "turn_on"
        TURN_OFF = "turn_off"
        ON = "on"
        OFF = "off"
        TITLE_UPDATE = "title_update"

    class Info(BaseModel):
        title: str
        room_id: int  # 直播间号
        uid: int  # 主播uid
        live_time: int  # 开播时间
        live_status: "Bilibililive.LiveStatus"
        area_name: str = Field(alias="area_v2_name")  # 新版分区名
        uname: str  # 主播名
        face: str  # 头像url
        cover: str = Field(alias="cover_from_user")  # 封面url
        keyframe: str  # 关键帧url,可能会有延迟
        category: Category = Field(default=Category(0))

        def get_live_action(self, old_info: Self) -> "Bilibililive.LiveAction":
            status = Bilibililive.LiveStatus
            action = Bilibililive.LiveAction
            if old_info.live_status in [status.OFF, status.CYCLE] and self.live_status == status.ON:
                return action.TURN_ON
            elif old_info.live_status == status.ON and self.live_status in [
                status.OFF,
                status.CYCLE,
            ]:
                return action.TURN_OFF
            elif old_info.live_status == status.ON and self.live_status == status.ON:
                if old_info.title != self.title:
                    # 开播时通常会改标题,避免短时间推送两次
                    return action.TITLE_UPDATE
                else:
                    return action.ON
            else:
                return action.OFF

    @classmethod
    async def get_target_name(cls, client: AsyncClient, target: Target) -> str | None:
        res = await client.get("https://api.bilibili.com/x/web-interface/card", params={"mid": target})
        res_data = json.loads(res.text)
        if res_data["code"]:
            return None
        return res_data["data"]["card"]["name"]

    def _gen_empty_info(self, uid: int) -> Info:
        """返回一个空的Info,用于该用户没有直播间的情况"""
        return Bilibililive.Info(
            title="",
            room_id=0,
            uid=uid,
            live_time=0,
            live_status=Bilibililive.LiveStatus.OFF,
            area_v2_name="",
            uname="",
            face="",
            cover_from_user="",
            keyframe="",
        )

    async def batch_get_status(self, targets: list[Target]) -> list[Info]:
        client = await self.ctx.get_client()
        # https://github.com/SocialSisterYi/bilibili-API-collect/blob/master/docs/live/info.md#批量查询直播间状态
        res = await client.get(
            "https://api.live.bilibili.com/room/v1/Room/get_status_info_by_uids",
            params={"uids[]": targets},
            timeout=4.0,
        )
        res_dict = res.json()

        if res_dict["code"] != 0:
            raise self.FetchError()

        data = res_dict.get("data", {})
        infos = []
        for target in targets:
            if target in data.keys():
                infos.append(type_validate_python(self.Info, data[target]))
            else:
                infos.append(self._gen_empty_info(int(target)))
        return infos

    def compare_status(self, _: Target, old_status: Info, new_status: Info) -> list[RawPost]:
        action = Bilibililive.LiveAction
        match new_status.get_live_action(old_status):
            case action.TURN_ON:
                return self._gen_current_status(new_status, 1)
            case action.TITLE_UPDATE:
                return self._gen_current_status(new_status, 2)
            case action.TURN_OFF:
                return self._gen_current_status(new_status, 3)
            case _:
                return []

    def _gen_current_status(self, new_status: Info, category: Category):
        current_status = deepcopy(new_status)
        current_status.category = Category(category)
        return [current_status]

    def get_category(self, status: Info) -> Category:
        assert status.category != Category(0)
        return status.category

    async def parse(self, raw_post: Info) -> Post:
        url = f"https://live.bilibili.com/{raw_post.room_id}"
        pic = [raw_post.cover] if raw_post.category == Category(1) else [raw_post.keyframe]
        title = f"[{self.categories[raw_post.category].rstrip('提醒')}] {raw_post.title}"
        target_name = f"{raw_post.uname} {raw_post.area_name}"
        return Post(
            self,
            content="",
            title=title,
            url=url,
            images=list(pic),
            nickname=target_name,
            compress=True,
        )


class BilibiliBangumi(StatusChange):
    categories = {}
    platform_name = "bilibili-bangumi"
    enable_tag = False
    enabled = True
    is_common = True
    site = BiliBangumiSite
    name = "Bilibili剧集"
    has_target = True
    parse_target_promot = "请输入剧集主页"
    default_theme = "brief"

    _url = "https://api.bilibili.com/pgc/review/user"

    @classmethod
    async def get_target_name(cls, client: AsyncClient, target: Target) -> str | None:
        res = await client.get(cls._url, params={"media_id": target})
        res_data = res.json()
        if res_data["code"]:
            return None
        return res_data["result"]["media"]["title"]

    @classmethod
    async def parse_target(cls, target_string: str) -> Target:
        if re.match(r"\d+", target_string):
            return Target(target_string)
        elif m := re.match(r"md(\d+)", target_string):
            return Target(m[1])
        elif m := re.match(r"(?:https?://)?www\.bilibili\.com/bangumi/media/md(\d+)", target_string):
            return Target(m[1])
        raise cls.ParseTargetException(
            prompt="正确格式:\n1. 剧集id\n2. 剧集主页链接 https://www.bilibili.com/bangumi/media/mdxxxx"
        )

    async def get_status(self, target: Target):
        client = await self.ctx.get_client()
        res = await client.get(
            self._url,
            params={"media_id": target},
            timeout=4.0,
        )
        res_dict = res.json()
        if res_dict["code"] == 0:
            return {
                "index": res_dict["result"]["media"]["new_ep"]["index"],
                "index_show": res_dict["result"]["media"]["new_ep"]["index_show"],
                "season_id": res_dict["result"]["media"]["season_id"],
            }
        else:
            raise self.FetchError

    def compare_status(self, target: Target, old_status, new_status) -> list[RawPost]:
        if new_status["index"] != old_status["index"]:
            return [new_status]
        else:
            return []

    async def parse(self, raw_post: RawPost) -> Post:
        client = await self.ctx.get_client()
        detail_res = await client.get(f'https://api.bilibili.com/pgc/view/web/season?season_id={raw_post["season_id"]}')
        detail_dict = detail_res.json()
        lastest_episode = None
        for episode in detail_dict["result"]["episodes"][::-1]:
            if episode["badge"] in ("", "会员"):
                lastest_episode = episode
                break
        if not lastest_episode:
            lastest_episode = detail_dict["result"]["episodes"]

        url = lastest_episode["link"]
        pic: list[str] = [lastest_episode["cover"]]
        target_name = detail_dict["result"]["season_title"]
        content = raw_post["index_show"]
        title = lastest_episode["share_copy"]
        return Post(
            self,
            content=content,
            title=title,
            url=url,
            images=list(pic),
            nickname=target_name,
            compress=True,
        )


model_rebuild(Bilibililive.Info)