♻️ 为B站动态API建立model,转发动态放入repost字段 (#507)

* 🐛 替换为pyd兼容性写法

* ♻️ 优化 Bilibili 推送解析

* 🐛 为B站动态API建立Model,动态转发部分放入repost字段

* 🔥 删掉无关紧要的ttl字段

* 🐛 修理转发链接解析

* 🧑‍💻 增加可读性

* 🐛 使用pyd兼容函数

* 🐛 处理 pyd1 model rebuild 时没有递归的情况

* ♻️ 将链接生成进行统一
This commit is contained in:
Azide
2024-03-29 15:08:06 +08:00
committed by GitHub
parent 6cceb07a33
commit 9bc5be5426
7 changed files with 295 additions and 431 deletions
+196 -85
View File
@@ -1,16 +1,16 @@
import re
import json
from abc import ABC
from typing import Any
from copy import deepcopy
from enum import Enum, unique
from typing_extensions import Self
from datetime import datetime, timedelta
from typing import Any, TypeVar, TypeAlias, NamedTuple
from httpx import AsyncClient
from nonebot.log import logger
from pydantic import Field, BaseModel
from nonebot.compat import type_validate_python
from nonebot.compat import PYDANTIC_V2, ConfigDict, type_validate_json, type_validate_python
from nonebot_bison.compat import model_rebuild
@@ -19,6 +19,90 @@ from ..types import Tag, Target, RawPost, ApiError, Category
from ..utils import SchedulerConfig, http_client, text_similarity
from .platform import NewMessage, StatusChange, CategoryNotSupport, CategoryNotRecognize
TBaseModel = TypeVar("TBaseModel", bound=type[BaseModel])
# 不能当成装饰器用
# 当装饰器用时,global namespace 中还没有被装饰的类,会报错
def model_rebuild_recurse(cls: TBaseModel) -> TBaseModel:
"""Recursively rebuild all BaseModel subclasses in the class."""
if not PYDANTIC_V2:
from inspect import isclass, getmembers
for _, sub_cls in getmembers(cls, lambda x: isclass(x) and issubclass(x, BaseModel)):
model_rebuild_recurse(sub_cls)
model_rebuild(cls)
return cls
class Base(BaseModel):
if PYDANTIC_V2:
model_config = ConfigDict(from_attributes=True)
else:
class Config:
orm_mode = True
class APIBase(Base):
"""Bilibili API返回的基础数据"""
code: int
message: str
class UserAPI(APIBase):
class Card(Base):
name: str
class Data(Base):
card: "UserAPI.Card"
data: Data | None = None
class PostAPI(APIBase):
class Info(Base):
uname: str
class UserProfile(Base):
info: "PostAPI.Info"
class Origin(Base):
uid: int
dynamic_id: int
dynamic_id_str: str
timestamp: int
type: int
rid: int
bvid: str | None = None
class Desc(Base):
dynamic_id: int
dynamic_id_str: str
timestamp: int
type: int
user_profile: "PostAPI.UserProfile"
rid: int
bvid: str | None = None
origin: "PostAPI.Origin | None" = None
class Card(Base):
desc: "PostAPI.Desc"
card: str
class Data(Base):
cards: "list[PostAPI.Card] | None"
data: Data | None = None
DynRawPost: TypeAlias = PostAPI.Card
model_rebuild_recurse(UserAPI)
model_rebuild_recurse(PostAPI)
class BilibiliClient:
_client: AsyncClient
@@ -95,10 +179,10 @@ class Bilibili(NewMessage):
async def get_target_name(cls, client: AsyncClient, target: Target) -> str | None:
res = await client.get("https://api.bilibili.com/x/web-interface/card", params={"mid": target})
res.raise_for_status()
res_data = res.json()
if res_data["code"]:
res_data = type_validate_json(UserAPI, res.content)
if res_data.code != 0:
return None
return res_data["data"]["card"]["name"]
return res_data.data.card.name if res_data.data else None
@classmethod
async def parse_target(cls, target_text: str) -> Target:
@@ -109,7 +193,7 @@ class Bilibili(NewMessage):
else:
raise cls.ParseTargetException()
async def get_sub_list(self, target: Target) -> list[RawPost]:
async def get_sub_list(self, target: Target) -> list[DynRawPost]:
params = {"host_uid": target, "offset": 0, "need_top": 0}
res = await self.client.get(
"https://api.vc.bilibili.com/dynamic_svr/v1/dynamic_svr/space_history",
@@ -117,42 +201,45 @@ class Bilibili(NewMessage):
timeout=4.0,
)
res.raise_for_status()
res_dict = res.json()
if res_dict["code"] == 0:
return res_dict["data"].get("cards", [])
else:
raise ApiError(res.request.url)
res_obj = type_validate_json(PostAPI, res.content)
def get_id(self, post: RawPost) -> Any:
return post["desc"]["dynamic_id"]
if res_obj.code == 0:
if (data := res_obj.data) and (card := data.cards):
return card
return []
raise ApiError(res.request.url)
def get_date(self, post: RawPost) -> int:
return post["desc"]["timestamp"]
def get_id(self, post: DynRawPost) -> int:
return post.desc.dynamic_id
def get_date(self, post: DynRawPost) -> int:
return post.desc.timestamp
def _do_get_category(self, post_type: int) -> Category:
if post_type == 2:
return Category(1)
elif post_type == 64:
return Category(2)
elif post_type == 8:
return Category(3)
elif post_type == 4:
return Category(4)
elif post_type == 1:
# 转发
return Category(5)
raise CategoryNotRecognize(post_type)
match post_type:
case 2:
return Category(1)
case 64:
return Category(2)
case 8:
return Category(3)
case 4:
return Category(4)
case 1:
# 转发
return Category(5)
case unknown_type:
raise CategoryNotRecognize(unknown_type)
def get_category(self, post: RawPost) -> Category:
post_type = post["desc"]["type"]
def get_category(self, post: DynRawPost) -> Category:
post_type = post.desc.type
return self._do_get_category(post_type)
def get_tags(self, raw_post: RawPost) -> list[Tag]:
# FIXME: 更深的原因可能是返回格式的变动,需要进一步确认
if topic_info := raw_post["display"].get("topic_info"):
return [tp["topic_name"] for tp in topic_info["topic_details"]]
return []
def get_tags(self, raw_post: DynRawPost) -> list[Tag]:
card_content = json.loads(raw_post.card)
text: str = card_content["item"]["content"]
result: list[str] = re.findall(r"#(.*?)#", text)
return result
def _text_process(self, dynamic: str, desc: str, title: str) -> str:
similarity = 1.0 if len(dynamic) == 0 or len(desc) == 0 else text_similarity(dynamic, desc)
@@ -164,61 +251,85 @@ class Bilibili(NewMessage):
text = dynamic + "\n=================\n" + title + "\n\n" + desc
return text
def _get_info(self, post_type: Category, card) -> tuple[str, list]:
if post_type == 1:
# 一般动态
text = card["item"]["description"]
pic = [img["img_src"] for img in card["item"]["pictures"]]
elif post_type == 2:
# 专栏文章
text = "{} {}".format(card["title"], card["summary"])
pic = card["image_urls"]
elif post_type == 3:
# 视频
dynamic = card.get("dynamic", "")
title = card["title"]
desc = card.get("desc", "")
text = self._text_process(dynamic, desc, title)
pic = [card["pic"]]
elif post_type == 4:
# 纯文字
text = card["item"]["content"]
pic = []
else:
raise CategoryNotSupport(post_type)
return text, pic
def _raw_post_parse(self, raw_post: DynRawPost, in_repost: bool = False):
class ParsedPost(NamedTuple):
text: str
pics: list[str]
url: str | None
repost_owner: str | None = None
repost: "ParsedPost | None" = None
async def parse(self, raw_post: RawPost) -> Post:
card_content = json.loads(raw_post["card"])
post_type = self.get_category(raw_post)
target_name = raw_post["desc"]["user_profile"]["info"]["uname"]
if post_type >= 1 and post_type < 5:
url = ""
if post_type == 1:
card_content: dict[str, Any] = json.loads(raw_post.card)
repost_owner: str | None = ou["info"]["uname"] if (ou := card_content.get("origin_user")) else None
def extract_url_id(url_template: str, name: str) -> str | None:
if in_repost:
if origin := raw_post.desc.origin:
return url_template.format(getattr(origin, name))
return None
return url_template.format(getattr(raw_post.desc, name))
match self._do_get_category(raw_post.desc.type):
case 1:
# 一般动态
url = "https://t.bilibili.com/{}".format(raw_post["desc"]["dynamic_id_str"])
elif post_type == 2:
url = extract_url_id("https://t.bilibili.com/{}", "dynamic_id_str")
text: str = card_content["item"]["description"]
pic: list[str] = [img["img_src"] for img in card_content["item"]["pictures"]]
return ParsedPost(text, pic, url, repost_owner)
case 2:
# 专栏文章
url = "https://www.bilibili.com/read/cv{}".format(raw_post["desc"]["rid"])
elif post_type == 3:
url = extract_url_id("https://www.bilibili.com/read/cv{}", "rid")
text = "{} {}".format(card_content["title"], card_content["summary"])
pic = card_content["image_urls"]
return ParsedPost(text, pic, url, repost_owner)
case 3:
# 视频
url = "https://www.bilibili.com/video/{}".format(raw_post["desc"]["bvid"])
elif post_type == 4:
url = extract_url_id("https://www.bilibili.com/video/{}", "bvid")
dynamic = card_content.get("dynamic", "")
title = card_content["title"]
desc = card_content.get("desc", "")
text = self._text_process(dynamic, desc, title)
pic = [card_content["pic"]]
return ParsedPost(text, pic, url, repost_owner)
case 4:
# 纯文字
url = "https://t.bilibili.com/{}".format(raw_post["desc"]["dynamic_id_str"])
text, pic = self._get_info(post_type, card_content)
elif post_type == 5:
# 转发
url = "https://t.bilibili.com/{}".format(raw_post["desc"]["dynamic_id_str"])
text = card_content["item"]["content"]
orig_type = card_content["item"]["orig_type"]
orig = json.loads(card_content["origin"])
orig_text, pic = self._get_info(self._do_get_category(orig_type), orig)
text += "\n--------------\n"
text += orig_text
else:
raise CategoryNotSupport(post_type)
return Post(self, text, url=url, images=pic, nickname=target_name)
url = extract_url_id("https://t.bilibili.com/{}", "dynamic_id_str")
text = card_content["item"]["content"]
pic = []
return ParsedPost(text, pic, url, repost_owner)
case 5:
# 转发
url = extract_url_id("https://t.bilibili.com/{}", "dynamic_id_str")
text = card_content["item"]["content"]
orig_type: int = card_content["item"]["orig_type"]
orig_card: str = card_content["origin"]
orig_post = DynRawPost(desc=raw_post.desc, card=orig_card)
orig_post.desc.type = orig_type
orig_parsed_post = self._raw_post_parse(orig_post, in_repost=True)
return ParsedPost(text, [], url, repost_owner, orig_parsed_post)
case unsupported_type:
raise CategoryNotSupport(unsupported_type)
async def parse(self, raw_post: DynRawPost) -> Post:
parsed_raw_post = self._raw_post_parse(raw_post)
post = Post(
self,
parsed_raw_post.text,
url=parsed_raw_post.url,
images=list(parsed_raw_post.pics),
nickname=raw_post.desc.user_profile.info.uname,
)
if rp := parsed_raw_post.repost:
post.repost = Post(
self,
rp.text,
url=rp.url,
images=list(rp.pics),
nickname=rp.repost_owner,
)
return post
class Bilibililive(StatusChange):
+1 -1
View File
@@ -130,7 +130,7 @@ class Weibo(NewMessage):
def _get_text(self, raw_text: str) -> str:
text = raw_text.replace("<br/>", "\n").replace("<br />", "\n")
selector = etree.HTML(text)
selector = etree.HTML(text, parser=None)
if selector is None:
return text
url_elems = selector.xpath("//a[@href]/span[@class='surl-text']")