nonebot-bison/tests/platforms/test_bilibili.py

556 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import random
from time import time
from datetime import datetime
from typing import TYPE_CHECKING, Any
import respx
import pytest
from loguru import logger
from nonebug.app import App
from httpx import URL, Response
from freezegun import freeze_time
from pytest_mock import MockerFixture
from nonebot.compat import model_dump, type_validate_python
from .utils import get_json
@pytest.fixture
def bing_dy_list(app: App):
from nonebot_bison.platform.bilibili.models import PostAPI
return type_validate_python(PostAPI, get_json("bilibili-new.json")).data.items # type: ignore
if TYPE_CHECKING:
from nonebot_bison.platform.bilibili import Bilibili
@pytest.fixture
def bilibili(app: App) -> "Bilibili":
from nonebot_bison.utils import ProcessContext
from nonebot_bison.platform import platform_manager
from nonebot_bison.platform.bilibili import BilibiliClientManager
return platform_manager["bilibili"](ProcessContext(BilibiliClientManager(platform_manager["bilibili"].site))) # type: ignore
@pytest.fixture
def without_dynamic(app: App):
from nonebot_bison.platform.bilibili.models import PostAPI
# 先验证实际的空动态返回能否通过校验,再重新导出
return model_dump(
type_validate_python(
PostAPI,
{
"code": 0,
"ttl": 1,
"message": "",
"data": {
"items": None,
"has_more": 0,
"next_offset": 0,
"_gt_": 0,
},
},
)
)
@pytest.mark.asyncio
async def test_reset_on_exception(app: App):
from strenum import StrEnum
from nonebot_bison.platform.bilibili.fsm import FSM, StateGraph, Transition, ActionReturn, reset_on_exception
class State(StrEnum):
A = "A"
B = "B"
C = "C"
class Event(StrEnum):
A = "A"
B = "B"
C = "C"
class Addon:
pass
async def raction(from_: State, event: Event, to: State, addon: Addon) -> ActionReturn:
logger.info(f"action: {from_} -> {to}")
raise RuntimeError("test")
async def action(from_: State, event: Event, to: State, addon: Addon) -> ActionReturn:
logger.info(f"action: {from_} -> {to}")
graph: StateGraph[State, Event, Addon] = {
"transitions": {
State.A: {
Event.A: Transition(raction, State.B),
Event.B: Transition(action, State.C),
},
State.B: {
Event.B: Transition(action, State.C),
},
State.C: {
Event.C: Transition(action, State.A),
},
},
"initial": State.A,
}
addon = Addon()
class AFSM(FSM[State, Event, Addon]):
@reset_on_exception(auto_start=True)
async def emit(self, event: Event):
return await super().emit(event)
fsm = AFSM(graph, addon)
await fsm.start()
with pytest.raises(RuntimeError):
await fsm.emit(Event.A)
assert fsm.started is True
await fsm.emit(Event.B)
await fsm.emit(Event.C)
class BFSM(FSM[State, Event, Addon]):
@reset_on_exception
async def emit(self, event: Event):
return await super().emit(event)
fsm = BFSM(graph, addon)
await fsm.start()
with pytest.raises(RuntimeError):
await fsm.emit(Event.A)
assert fsm.started is False
with pytest.raises(TypeError, match="can't send non-None value to a just-started async generator"):
await fsm.emit(Event.B)
class CFSM(FSM[State, Event, Addon]): ...
fsm = CFSM(graph, addon)
await fsm.start()
with pytest.raises(RuntimeError):
await fsm.emit(Event.A)
assert fsm.started is True
with pytest.raises(StopAsyncIteration):
await fsm.emit(Event.B)
@pytest.mark.asyncio
async def test_retry_for_352(app: App, mocker: MockerFixture):
from nonebot_bison.post import Post
from nonebot_bison.types import Target, RawPost
from nonebot_bison.platform.platform import NewMessage
from nonebot_bison.platform.bilibili.platforms import ApiCode352Error
from nonebot_bison.utils import ClientManager, ProcessContext, http_client
from nonebot_bison.platform.bilibili.retry import RetryAddon, RetryState, _retry_fsm, retry_for_352
mocker.patch.object(random, "random", return_value=0.0) # 稳定触发RAISE阶段的随缘刷新
now = time()
raw_post_1 = {"id": 1, "text": "p1", "date": now, "tags": ["tag1"], "category": 1}
raw_post_2 = {"id": 2, "text": "p2", "date": now + 1, "tags": ["tag2"], "category": 2}
class MockPlatform(NewMessage):
platform_name = "fakebili"
name = "fakebili"
enabled = True
is_common = True
schedule_interval = 10
enable_tag = False
categories = {}
has_target = True
raise352 = False
sub_index = 0
@classmethod
async def get_target_name(cls, client, _: "Target"):
return "MockPlatform"
def get_id(self, post: RawPost) -> Any:
return post["id"]
def get_date(self, raw_post: RawPost) -> float:
return raw_post["date"]
async def parse(self, raw_post: RawPost) -> Post:
return Post(
self,
raw_post["text"],
"http://t.tt/" + str(self.get_id(raw_post)),
nickname="Mock",
)
def set_raise352(self, value: bool):
self.raise352 = value
@retry_for_352 # type: ignore 保证接收self、target参数返回list即可
async def get_sub_list(self, t: Target):
await self.ctx.get_client(t)
if not self.raise352:
if self.sub_index == 0:
self.sub_index += 1
return [raw_post_1]
return [raw_post_1, raw_post_2]
else:
raise ApiCode352Error(URL("http://t.tt/1"))
class MockClientManager(ClientManager):
get_client_call_count = 0
get_client_for_static_call_count = 0
get_query_name_client_call_count = 0
refresh_client_call_count = 0
async def get_client(self, target: Target | None):
logger.debug(f"call get_client: {target}, {datetime.now()}")
logger.debug(f"times: {self.get_client_call_count} + 1")
self.get_client_call_count += 1
return http_client()
async def get_client_for_static(self):
logger.debug(f"call get_client_for_static: {datetime.now()}")
logger.debug(f"times: {self.get_client_for_static_call_count} + 1")
self.get_client_for_static_call_count += 1
return http_client()
async def get_query_name_client(self):
logger.debug(f"call get_query_name_client: {datetime.now()}")
logger.debug(f"times: {self.get_query_name_client_call_count} + 1")
self.get_query_name_client_call_count += 1
return http_client()
async def refresh_client(self):
logger.debug(f"call refresh_client: {datetime.now()}")
logger.debug(f"times: {self.refresh_client_call_count} + 1")
self.refresh_client_call_count += 1
fakebili = MockPlatform(ProcessContext(MockClientManager()))
client_mgr = fakebili.ctx._client_mgr
assert isinstance(client_mgr, MockClientManager)
assert client_mgr.get_client_call_count == 0
assert client_mgr.get_client_for_static_call_count == 0
assert client_mgr.get_query_name_client_call_count == 0
assert client_mgr.refresh_client_call_count == 0
# 无异常
res: list[dict[str, Any]] = await fakebili.get_sub_list(Target("t1")) # type: ignore
assert len(res) == 1
assert res[0]["id"] == 1
assert client_mgr.get_client_call_count == 1
assert client_mgr.refresh_client_call_count == 0
res = await fakebili.get_sub_list(Target("t1")) # type: ignore
assert len(res) == 2
assert res[0]["id"] == 1
assert res[1]["id"] == 2
assert client_mgr.get_client_call_count == 2
assert client_mgr.refresh_client_call_count == 0
addon = RetryAddon()
# 异常直到最终报错
test_state_list: list[RetryState] = [RetryState.NROMAL] + [RetryState.REFRESH] * addon.max_refresh_count
for _ in range(addon.max_backoff_count):
test_state_list += [RetryState.BACKOFF] * 2
test_state_list += [RetryState.REFRESH] * addon.max_refresh_count
test_state_list += [RetryState.RAISE] * 2
freeze_start = datetime(2024, 6, 19, 0, 0, 0, 0)
timedelta_length = addon.backoff_timedelta
fakebili.set_raise352(True)
for state in test_state_list[:-3]:
logger.info(f"\n\nnow state should be {state}")
assert _retry_fsm.current_state == state
with freeze_time(freeze_start):
res = await fakebili.get_sub_list(Target("t1")) # type: ignore
assert not res
if state == RetryState.BACKOFF:
freeze_start += timedelta_length * (_retry_fsm.addon.backoff_count + 1) ** 2
for state in test_state_list[-3:]:
logger.info(f"\n\nnow state should be {state}")
assert _retry_fsm.current_state == state
with pytest.raises(ApiCode352Error):
await fakebili.get_sub_list(Target("t1")) # type: ignore
assert client_mgr.refresh_client_call_count == 4 * 3 + 3 # refresh + raise
assert client_mgr.get_client_call_count == 2 + 4 * 3 + 3 # previous + refresh + raise
# 重置回正常状态
fakebili.set_raise352(False)
res = await fakebili.get_sub_list(Target("t1")) # type: ignore
assert res
# REFRESH阶段中途正常返回
test_state_list2 = [RetryState.NROMAL, RetryState.REFRESH, RetryState.NROMAL]
for idx, _ in enumerate(test_state_list2):
if idx == len(test_state_list2) - 1:
fakebili.set_raise352(False)
res = await fakebili.get_sub_list(Target("t1")) # type: ignore
assert res
else:
fakebili.set_raise352(True)
res = await fakebili.get_sub_list(Target("t1")) # type: ignore
assert not res
fakebili.set_raise352(False)
# BACKOFF阶段在回避时间中
test_state_list3 = [RetryState.NROMAL] + [RetryState.REFRESH] * addon.max_refresh_count + [RetryState.BACKOFF]
for idx, _ in enumerate(test_state_list3):
if idx == len(test_state_list3) - 1:
fakebili.set_raise352(False)
res = await fakebili.get_sub_list(Target("t1")) # type: ignore
assert not res
else:
fakebili.set_raise352(True)
res = await fakebili.get_sub_list(Target("t1")) # type: ignore
assert not res
# 测试重置
await _retry_fsm.reset()
await fakebili.get_sub_list(Target("t1")) # type: ignore
await _retry_fsm.reset()
@pytest.mark.asyncio
async def test_parser(bilibili: "Bilibili"):
from nonebot_bison.platform.bilibili.models import PostAPI, UnknownMajor
test_data = get_json("bilibili-new.json")
res = type_validate_python(PostAPI, test_data)
assert res.data is not None
for item in res.data.items or []:
assert item.modules
assert not isinstance(item.modules.module_dynamic.major, UnknownMajor)
@pytest.mark.asyncio
async def test_get_tag(bilibili: "Bilibili", bing_dy_list):
from nonebot_bison.platform.bilibili.models import DynRawPost
raw_post_has_tag = type_validate_python(DynRawPost, bing_dy_list[6])
res1 = bilibili.get_tags(raw_post_has_tag)
assert set(res1) == {"明日方舟", "123罗德岛"}
async def test_dynamic_video(bilibili: "Bilibili", bing_dy_list: list):
from nonebot_bison.post import Post
post: Post = await bilibili.parse(bing_dy_list[8])
assert post.title == "《明日方舟》SideStory「巴别塔」活动宣传PV"
assert post.content == (
"SideStory「巴别塔」限时活动即将开启\r\n\r\n"
"追逐未来的道路上,\r\n"
"两种同样伟大的理想对撞,几场同样壮烈的悲剧上演。\r\n\r\n"
"———————————— \r\n"
"详细活动内容敬请关注《明日方舟》官网及游戏内相关公告。"
)
assert post.url == "https://www.bilibili.com/video/BV1Jp421y72e/"
async def test_dynamic_forward(bilibili: "Bilibili", bing_dy_list: list):
from nonebot_bison.post import Post
post: Post = await bilibili.parse(bing_dy_list[7])
assert post.content == (
"「2024明日方舟音律联觉-不觅浪尘」将于12:00正式开启预售票预售票购票链接https://m.damai.cn/shows/item.html?itemId=778626949623"
)
assert post.url == "https://t.bilibili.com/917092495452536836"
assert (rp := post.repost)
assert rp.content == (
"互动抽奖 #明日方舟##音律联觉#\n\n"
"「2024音律联觉」票务信息公开\n\n\n\n"
"「2024明日方舟音律联觉-不觅浪尘」将于【4月6日12:00】正式开启预售票预售票购票链接"
"https://m.damai.cn/shows/item.html?itemId=778626949623\n\n"
"【活动地点】\n\n"
"上海久事体育旗忠网球中心主场馆上海市闵行区元江路5500弄\n\n"
"【活动时间】\n\n「不觅浪尘-日场」5月1日-5月2日\u0026"
"5月4日-5月5日 13:00\n\n"
"「不觅浪尘-夜场」5月1日-5月2日\u00265月4日-5月5日 18:30\n\n"
"【温馨提醒】\n\n"
"*「2024明日方舟音律联觉-不觅浪尘」演出共计4天每天有日场和夜场各1场演出共计8场次每场演出为相同内容。\n\n"
"*「2024明日方舟音律联觉-不觅浪尘」演出全部录播内容后续将于bilibili独家上线敬请期待。\n\n"
"* 音律联觉将为博士们准备活动场馆往返莘庄、北桥地铁站的免费接驳车,"
"有需要乘坐的博士请合理安排自己的出行时间。\n\n"
"【票务相关】\n\n"
"* 本次票务和大麦网 进行合作应相关要求本次预售仅预先开放70%的票量,"
"剩余票量的开票时间请继续关注明日方舟官方自媒体账号的后续通知。\n\n"
"* 预售期间,由于未正式开票,下单后无法立即为您配票,待正式开票后,您可通过订单详情页或票夹详情,"
"查看票品信息。\n\n* 本次演出为非选座电子票,依照您的付款时间顺序出票,如果您同一订单下购买多张票,"
"正式开票后,系统将会优先为您选择相连座位。\n\n\n"
"更多详情可查看下方长图或查看活动详情页https://ak.hypergryph.com/special/amb-2024/ \n\n\n\n"
"* 请各位购票前务必确认票档和相关购票须知,并提前在对应票务平台后台添加完整的个人购票信息,以方便购票。\n\n"
"* 明日方舟嘉年华及音律联觉活动场地存在一定距离,且活动时间有部分重叠,推荐都参加的博士选择不同日期购票。\n\n"
"\n\n关注并转发本动态我们将会在5月6日抽取20位博士各赠送【2024音律联觉主题亚克力画随机一款】一份。"
"中奖的幸运博士请于开奖后5日内提交获奖信息逾期视为自动放弃奖励。"
)
assert rp.images
assert len(rp.images) == 9
assert rp.url == "https://t.bilibili.com/915793667264872453"
@pytest.mark.asyncio
@respx.mock
async def test_fetch_new_without_dynamic(bilibili, dummy_user_subinfo, without_dynamic):
from nonebot_bison.types import Target, SubUnit
target = Target("161775300")
post_router = respx.get(
f"https://api.bilibili.com/x/polymer/web-dynamic/v1/feed/space?host_mid={target}&timezone_offset=-480&offset="
)
post_router.mock(return_value=Response(200, json=without_dynamic))
res = await bilibili.fetch_new_post(SubUnit(target, [dummy_user_subinfo]))
assert post_router.called
assert len(res) == 0
@pytest.mark.asyncio
@respx.mock
async def test_fetch_new(bilibili, dummy_user_subinfo):
from nonebot.compat import model_dump, type_validate_python
from nonebot_bison.types import Target, SubUnit
from nonebot_bison.platform.bilibili.models import PostAPI
target = Target("161775300")
post_router = respx.get(
f"https://api.bilibili.com/x/polymer/web-dynamic/v1/feed/space?host_mid={target}&timezone_offset=-480&offset="
)
post_list = type_validate_python(PostAPI, get_json("bilibili-new.json"))
assert post_list.data
assert post_list.data.items
post_0 = post_list.data.items.pop(0)
post_router.mock(return_value=Response(200, json=model_dump(post_list)))
res = await bilibili.fetch_new_post(SubUnit(target, [dummy_user_subinfo]))
assert post_router.called
assert len(res) == 0
post_0.modules.module_author.pub_ts = int(datetime.now().timestamp())
post_list.data.items.insert(0, post_0)
post_router.mock(return_value=Response(200, json=model_dump(post_list)))
res2 = await bilibili.fetch_new_post(SubUnit(target, [dummy_user_subinfo]))
assert len(res2[0][1]) == 1
post = res2[0][1][0]
assert post.content == (
"SideStory「巴别塔」限时活动即将开启\n\n\n\n"
"一、全新SideStory「巴别塔」活动关卡开启\n\n"
"二、【如死亦终】限时寻访开启\n\n"
"三、新干员登场,信赖获取提升\n\n"
"四、【时代】系列,新装限时上架\n\n"
"五、复刻时装限时上架\n\n"
"六、新增【“疤痕商场的回忆”】主题家具,限时获取\n\n"
"七、礼包限时上架\n\n"
"八、【前路回响】限时寻访开启\n\n"
"九、【玛尔特】系列,限时复刻上架\n\n\n\n"
"更多活动内容请持续关注《明日方舟》游戏内公告及官方公告。"
)
@pytest.mark.asyncio
@respx.mock
async def test_fetch_new_live_rcmd(bilibili: "Bilibili", dummy_user_subinfo):
from nonebot.compat import model_dump, type_validate_python
from nonebot_bison.types import Target, SubUnit
from nonebot_bison.platform.bilibili.models import PostAPI
target = Target("13164144")
post_router = respx.get(
f"https://api.bilibili.com/x/polymer/web-dynamic/v1/feed/space?host_mid={target}&timezone_offset=-480&offset="
)
post_list = type_validate_python(PostAPI, get_json("bilibili-dynamic-live-rcmd.json"))
assert post_list.data
assert post_list.data.items
post_0 = post_list.data.items.pop(0)
post_router.mock(return_value=Response(200, json=model_dump(post_list)))
res = await bilibili.fetch_new_post(SubUnit(target, [dummy_user_subinfo]))
assert post_router.called
assert len(res) == 0
post_0.modules.module_author.pub_ts = int(datetime.now().timestamp())
post_list.data.items.insert(0, post_0)
post_router.mock(return_value=Response(200, json=model_dump(post_list)))
res2 = await bilibili.fetch_new_post(SubUnit(target, [dummy_user_subinfo]))
assert len(res2[0][1]) == 1
post = res2[0][1][0]
assert post.title == "【Zc】灵异地铁站深夜恐怖档"
assert post.content == "手游 明日方舟"
assert post.images == ["http://i0.hdslb.com/bfs/live/new_room_cover/fdada58af9fdc0068562da17298815de72ec82e0.jpg"]
assert post.url == "https://live.bilibili.com/3044248"
async def test_parse_target(bilibili: "Bilibili"):
from nonebot_bison.platform.platform import Platform
res = await bilibili.parse_target(
"https://space.bilibili.com/161775300?from=search&seid=130517740606234234234&spm_id_from=333.337.0.0"
)
assert res == "161775300"
res2 = await bilibili.parse_target(
"space.bilibili.com/161775300?from=search&seid=130517740606234234234&spm_id_from=333.337.0.0"
)
assert res2 == "161775300"
with pytest.raises(Platform.ParseTargetException):
await bilibili.parse_target("https://www.bilibili.com/video/BV1qP4y1g738?spm_id_from=333.999.0.0")
res3 = await bilibili.parse_target("10086")
assert res3 == "10086"
res4 = await bilibili.parse_target("UID:161775300")
assert res4 == "161775300"
async def test_content_process(bilibili: "Bilibili"):
res = bilibili._text_process(
title="「2024明日方舟音律联觉-不觅浪尘」先导预告公开",
desc=(
"「2024明日方舟音律联觉-不觅浪尘」先导预告公开\n\n"
"“苦难往往相似,邪恶反倒驳杂。”\n"
"“点火者远去了,但火还在燃烧。”\n"
"“没有某种能量和激励,也没有某种责任甚至注定牺牲的命运。”\n"
"“欢迎回家,博士。”\n\n"
"活动详情及票务信息将于近期发布,请持续关注@明日方舟 。"
),
dynamic="投稿了视频",
)
assert res.title == "「2024明日方舟音律联觉-不觅浪尘」先导预告公开"
assert res.content == (
"“苦难往往相似,邪恶反倒驳杂。”\n"
"“点火者远去了,但火还在燃烧。”\n"
"“没有某种能量和激励,也没有某种责任甚至注定牺牲的命运。”\n"
"“欢迎回家,博士。”\n\n"
"活动详情及票务信息将于近期发布,请持续关注@明日方舟 。\n"
"=================\n"
"投稿了视频"
)
res2 = bilibili._text_process(
title="111",
desc="222",
dynamic="2222",
)
assert res2.title == "111"
assert res2.content == "2222"