Merge pull request #72 from felinae98/feat/parse-target

增加Parse Target功能
This commit is contained in:
felinae98 2022-05-23 00:02:10 +08:00 committed by GitHub
commit 5615eaf028
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 293 additions and 18 deletions

View File

@ -2,7 +2,7 @@
## 最近更新 ## 最近更新
* No changes - No changes
## v0.5.3 ## v0.5.3

View File

@ -15,6 +15,7 @@ from nonebot.params import Depends, EventPlainText, EventToMe
from nonebot.permission import SUPERUSER from nonebot.permission import SUPERUSER
from nonebot.rule import to_me from nonebot.rule import to_me
from nonebot.typing import T_State from nonebot.typing import T_State
from nonebot_bison.platform.platform import Platform
from .config import Config from .config import Config
from .platform import check_sub_target, platform_manager from .platform import check_sub_target, platform_manager
@ -108,8 +109,13 @@ def do_add_sub(add_sub: Type[Matcher]):
"platform", _gen_prompt_template("{_prompt}"), [Depends(parse_platform)] "platform", _gen_prompt_template("{_prompt}"), [Depends(parse_platform)]
) )
async def init_id(state: T_State): async def init_id(state: T_State):
if platform_manager[state["platform"]].has_target: cur_platform = platform_manager[state["platform"]]
state["_prompt"] = "请输入订阅用户的id:\n查询id获取方法请回复:“查询”" if cur_platform.has_target:
state["_prompt"] = (
("1." + cur_platform.parse_target_promot + "\n2.")
if cur_platform.parse_target_promot
else ""
) + "请输入订阅用户的id\n查询id获取方法请回复:“查询”"
else: else:
state["id"] = "default" state["id"] = "default"
state["name"] = await platform_manager[state["platform"]].get_target_name( state["name"] = await platform_manager[state["platform"]].get_target_name(
@ -125,6 +131,8 @@ def do_add_sub(add_sub: Type[Matcher]):
raise LookupError raise LookupError
if target == "取消": if target == "取消":
raise KeyboardInterrupt raise KeyboardInterrupt
platform = platform_manager[state["platform"]]
target = await platform.parse_target(target)
name = await check_sub_target(state["platform"], target) name = await check_sub_target(state["platform"], target)
if not name: if not name:
raise ValueError raise ValueError
@ -141,6 +149,8 @@ def do_add_sub(add_sub: Type[Matcher]):
await add_sub.finish("已中止订阅") await add_sub.finish("已中止订阅")
except (ValueError): except (ValueError):
await add_sub.reject("id输入错误") await add_sub.reject("id输入错误")
except (Platform.ParseTargetException):
await add_sub.reject("不能从你的输入中提取出id请检查你输入的内容是否符合预期")
else: else:
await add_sub.send( await add_sub.send(
"即将订阅的用户为:{} {} {}\n如有错误请输入“取消”重新订阅".format( "即将订阅的用户为:{} {} {}\n如有错误请输入“取消”重新订阅".format(

View File

@ -1,11 +1,12 @@
import json import json
import re
from typing import Any, Optional from typing import Any, Optional
import httpx import httpx
from ..post import Post from ..post import Post
from ..types import Category, RawPost, Tag, Target from ..types import Category, RawPost, Tag, Target
from .platform import CategoryNotSupport, NewMessage from .platform import CategoryNotSupport, NewMessage, Platform
class Bilibili(NewMessage): class Bilibili(NewMessage):
@ -26,6 +27,7 @@ class Bilibili(NewMessage):
schedule_kw = {"seconds": 10} schedule_kw = {"seconds": 10}
name = "B站" name = "B站"
has_target = True has_target = True
parse_target_promot = "请输入用户主页的链接"
async def get_target_name(self, target: Target) -> Optional[str]: async def get_target_name(self, target: Target) -> Optional[str]:
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
@ -37,6 +39,16 @@ class Bilibili(NewMessage):
return None return None
return res_data["data"]["name"] return res_data["data"]["name"]
async def parse_target(self, target_text: str) -> Target:
if re.match(r"\d+", target_text):
return Target(target_text)
elif match := re.match(
r"(?:https?://)?space\.bilibili\.com/(\d+)", target_text
):
return Target(match.group(1))
else:
raise Platform.ParseTargetException()
async def get_sub_list(self, target: Target) -> list[RawPost]: async def get_sub_list(self, target: Target) -> list[RawPost]:
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
params = {"host_uid": target, "offset": 0, "need_top": 0} params = {"host_uid": target, "offset": 0, "need_top": 0}

View File

@ -1,3 +1,4 @@
import re
from typing import Any, Optional from typing import Any, Optional
import httpx import httpx
@ -18,6 +19,7 @@ class NcmArtist(NewMessage):
schedule_kw = {"minutes": 1} schedule_kw = {"minutes": 1}
name = "网易云-歌手" name = "网易云-歌手"
has_target = True has_target = True
parse_target_promot = "请输入歌手主页包含数字ID的链接"
async def get_target_name(self, target: Target) -> Optional[str]: async def get_target_name(self, target: Target) -> Optional[str]:
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
@ -30,6 +32,16 @@ class NcmArtist(NewMessage):
return return
return res_data["artist"]["name"] return res_data["artist"]["name"]
async def parse_target(self, target_text: str) -> Target:
if re.match(r"^\d+$", target_text):
return Target(target_text)
elif match := re.match(
r"(?:https?://)?music\.163\.com/#/artist\?id=(\d+)", target_text
):
return Target(match.group(1))
else:
raise self.ParseTargetException()
async def get_sub_list(self, target: Target) -> list[RawPost]: async def get_sub_list(self, target: Target) -> list[RawPost]:
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
res = await client.get( res = await client.get(

View File

@ -1,3 +1,4 @@
import re
from typing import Any, Optional from typing import Any, Optional
import httpx import httpx
@ -18,6 +19,7 @@ class NcmRadio(NewMessage):
schedule_kw = {"minutes": 10} schedule_kw = {"minutes": 10}
name = "网易云-电台" name = "网易云-电台"
has_target = True has_target = True
parse_target_promot = "请输入主播电台主页包含数字ID的链接"
async def get_target_name(self, target: Target) -> Optional[str]: async def get_target_name(self, target: Target) -> Optional[str]:
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
@ -31,6 +33,16 @@ class NcmRadio(NewMessage):
return return
return res_data["programs"][0]["radio"]["name"] return res_data["programs"][0]["radio"]["name"]
async def parse_target(self, target_text: str) -> Target:
if re.match(r"^\d+$", target_text):
return Target(target_text)
elif match := re.match(
r"(?:https?://)?music\.163\.com/#/djradio\?id=(\d+)", target_text
):
return Target(match.group(1))
else:
raise self.ParseTargetException()
async def get_sub_list(self, target: Target) -> list[RawPost]: async def get_sub_list(self, target: Target) -> list[RawPost]:
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
res = await client.post( res = await client.post(

View File

@ -47,6 +47,7 @@ class Platform(metaclass=RegistryABCMeta, base=True):
enable_tag: bool enable_tag: bool
store: dict[Target, Any] store: dict[Target, Any]
platform_name: str platform_name: str
parse_target_promot: Optional[str] = None
@abstractmethod @abstractmethod
async def get_target_name(self, target: Target) -> Optional[str]: async def get_target_name(self, target: Target) -> Optional[str]:
@ -73,6 +74,12 @@ class Platform(metaclass=RegistryABCMeta, base=True):
self.reverse_category[val] = key self.reverse_category[val] = key
self.store = dict() self.store = dict()
class ParseTargetException(Exception):
pass
async def parse_target(self, target_string: str) -> Target:
return Target(target_string)
@abstractmethod @abstractmethod
def get_tags(self, raw_post: RawPost) -> Optional[Collection[Tag]]: def get_tags(self, raw_post: RawPost) -> Optional[Collection[Tag]]:
"Return Tag list of given RawPost" "Return Tag list of given RawPost"

View File

@ -9,7 +9,7 @@ from nonebot.log import logger
from ..post import Post from ..post import Post
from ..types import * from ..types import *
from .platform import NewMessage from .platform import NewMessage, Platform
class Weibo(NewMessage): class Weibo(NewMessage):
@ -28,6 +28,7 @@ class Weibo(NewMessage):
schedule_type = "interval" schedule_type = "interval"
schedule_kw = {"seconds": 3} schedule_kw = {"seconds": 3}
has_target = True has_target = True
parse_target_promot = "请输入用户主页包含数字UID的链接"
async def get_target_name(self, target: Target) -> Optional[str]: async def get_target_name(self, target: Target) -> Optional[str]:
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
@ -41,6 +42,15 @@ class Weibo(NewMessage):
else: else:
return None return None
async def parse_target(self, target_text: str) -> Target:
if re.match(r"\d+", target_text):
return Target(target_text)
elif match := re.match(r"(?:https?://)?weibo\.com/u/(\d+)", target_text):
# 都2202年了应该不会有http了吧不过还是防一手
return Target(match.group(1))
else:
raise Platform.ParseTargetException()
async def get_sub_list(self, target: Target) -> list[RawPost]: async def get_sub_list(self, target: Target) -> list[RawPost]:
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
params = {"containerid": "107603" + target} params = {"containerid": "107603" + target}
@ -133,9 +143,9 @@ class Weibo(NewMessage):
"https://m.weibo.cn/detail/{}".format(info["mid"]), headers=header "https://m.weibo.cn/detail/{}".format(info["mid"]), headers=header
) )
try: try:
full_json_text = re.search( match = re.search(r'"status": ([\s\S]+),\s+"call"', res.text)
r'"status": ([\s\S]+),\s+"call"', res.text assert match
).group(1) full_json_text = match.group(1)
info = json.loads(full_json_text) info = json.loads(full_json_text)
except: except:
logger.info( logger.info(

View File

@ -12,7 +12,7 @@ class PlugConfig(BaseSettings):
bison_filter_log: bool = False bison_filter_log: bool = False
bison_to_me: bool = True bison_to_me: bool = True
bison_skip_browser_check: bool = False bison_skip_browser_check: bool = False
bison_use_pic_merge: int = 0 # 多图片时启用图片合并转发(仅限群),当bison_use_queue为False时该配置不会生效 bison_use_pic_merge: int = 0 # 多图片时启用图片合并转发(仅限群)
# 0不启用1首条消息单独发送剩余照片合并转发2以及以上所有消息全部合并转发 # 0不启用1首条消息单独发送剩余照片合并转发2以及以上所有消息全部合并转发
bison_resend_times: int = 0 bison_resend_times: int = 0

View File

@ -0,0 +1 @@
{"code":0,"message":"0","ttl":1,"data":{"mid":161775300,"name":"明日方舟","sex":"保密","face":"http://i0.hdslb.com/bfs/face/89154378c06a5ed332c40c2ca56f50cd641c0c90.jpg","face_nft":0,"sign":"重铸未来 方舟启航","rank":10000,"level":6,"jointime":0,"moral":0,"silence":0,"coins":0,"fans_badge":true,"fans_medal":{"show":false,"wear":false,"medal":null},"official":{"role":3,"title":"明日方舟官方账号","desc":"","type":1},"vip":{"type":2,"status":1,"due_date":1648828800000,"vip_pay_type":0,"theme_type":0,"label":{"path":"","text":"年度大会员","label_theme":"annual_vip","text_color":"#FFFFFF","bg_style":1,"bg_color":"#FB7299","border_color":""},"avatar_subscript":1,"nickname_color":"#FB7299","role":3,"avatar_subscript_url":"http://i0.hdslb.com/bfs/vip/icon_Certification_big_member_22_3x.png"},"pendant":{"pid":5305,"name":"明日方舟音律系列","image":"http://i0.hdslb.com/bfs/garb/item/615a1653281141ddf64cbb98c792ddaee78f7f40.png","expire":0,"image_enhance":"http://i0.hdslb.com/bfs/garb/item/516ecdf2d495a62f1bac31497c831b711823140c.webp","image_enhance_frame":"http://i0.hdslb.com/bfs/garb/item/c0751afbf950373c260254d02768eabf30ff3906.png"},"nameplate":{"nid":0,"name":"","image":"","image_small":"","level":"","condition":""},"user_honour_info":{"mid":0,"colour":null,"tags":[]},"is_followed":true,"top_photo":"http://i1.hdslb.com/bfs/space/6c6084808ec5bdff1985acc05ce0e126c49ad76e.png","theme":{},"sys_notice":{},"live_room":{"roomStatus":1,"liveStatus":0,"url":"https://live.bilibili.com/5555734?broadcast_type=0\u0026is_room_feed=1","title":"《明日方舟》2022新春前瞻特辑","cover":"http://i0.hdslb.com/bfs/live/new_room_cover/79af83a27f6001c1acfb47d1c0b879290f7c3308.jpg","roomid":5555734,"roundStatus":1,"broadcast_type":0,"watched_show":{"switch":true,"num":13033,"text_small":"1.3万","text_large":"1.3万人看过","icon":"https://i0.hdslb.com/bfs/live/a725a9e61242ef44d764ac911691a7ce07f36c1d.png","icon_location":"","icon_web":"https://i0.hdslb.com/bfs/live/8d9d0f33ef8bf6f308742752d13dd0df731df19c.png"}},"birthday":"","school":null,"profession":{"name":"","department":"","title":"","is_show":0},"tags":null,"series":{"user_upgrade_status":3,"show_upgrade_window":false},"is_senior_member":0}}

View File

@ -1,3 +1,5 @@
import typing
import pytest import pytest
from httpx import Response from httpx import Response
from nonebug.app import App from nonebug.app import App
@ -10,6 +12,10 @@ def bing_dy_list():
return get_json("bilibili_bing_list.json")["data"]["cards"] return get_json("bilibili_bing_list.json")["data"]["cards"]
if typing.TYPE_CHECKING:
from nonebot_bison.platform.bilibili import Bilibili
@pytest.fixture @pytest.fixture
def bilibili(app: App): def bilibili(app: App):
from nonebot_bison.platform import platform_manager from nonebot_bison.platform import platform_manager
@ -46,3 +52,20 @@ async def test_dynamic_forward(bilibili, bing_dy_list):
+ "\n--------------\n" + "\n--------------\n"
+ "#明日方舟#\n【新增服饰】\n//殿堂上的游禽 - 星极\n塞壬唱片偶像企划《闪耀阶梯》特供服饰/殿堂上的游禽。星极自费参加了这项企划,尝试着用大众能接受的方式演绎天空之上的故事。\n\n_____________\n谦逊留给观众,骄傲发自歌喉,此夜,唯我璀璨。 " + "#明日方舟#\n【新增服饰】\n//殿堂上的游禽 - 星极\n塞壬唱片偶像企划《闪耀阶梯》特供服饰/殿堂上的游禽。星极自费参加了这项企划,尝试着用大众能接受的方式演绎天空之上的故事。\n\n_____________\n谦逊留给观众,骄傲发自歌喉,此夜,唯我璀璨。 "
) )
async def test_parse_target(bilibili: "Bilibili"):
from nonebot_bison.platform.platform import Platform
res = await bilibili.parse_target(
"https://space.bilibili.com/161775300?from=search&seid=130517740606234234234&spm_id_from=333.337.0.0"
)
assert res == "161775300"
res2 = await bilibili.parse_target(
"space.bilibili.com/161775300?from=search&seid=130517740606234234234&spm_id_from=333.337.0.0"
)
assert res2 == "161775300"
with pytest.raises(Platform.ParseTargetException):
await bilibili.parse_target(
"https://www.bilibili.com/video/BV1qP4y1g738?spm_id_from=333.999.0.0"
)

View File

@ -1,4 +1,5 @@
import time import time
import typing
import pytest import pytest
import respx import respx
@ -7,6 +8,9 @@ from nonebug.app import App
from .utils import get_json from .utils import get_json
if typing.TYPE_CHECKING:
from nonebot_bison.platform.ncm_artist import NcmArtist
@pytest.fixture @pytest.fixture
def ncm_artist(app: App): def ncm_artist(app: App):
@ -48,3 +52,16 @@ async def test_fetch_new(ncm_artist, ncm_artist_0, ncm_artist_1, dummy_user_subi
assert post.target_type == "ncm-artist" assert post.target_type == "ncm-artist"
assert post.text == "新专辑发布Y1K" assert post.text == "新专辑发布Y1K"
assert post.url == "https://music.163.com/#/album?id=131074504" assert post.url == "https://music.163.com/#/album?id=131074504"
async def test_parse_target(ncm_artist: "NcmArtist"):
from nonebot_bison.platform.platform import Platform
res = await ncm_artist.parse_target("32540734")
assert res == "32540734"
res = await ncm_artist.parse_target("https://music.163.com/#/artist?id=32540734")
assert res == "32540734"
res = await ncm_artist.parse_target("music.163.com/#/artist?id=32540734")
assert res == "32540734"
with pytest.raises(Platform.ParseTargetException):
await ncm_artist.parse_target("music.163.com/#/rad?id=32540734")

View File

@ -1,4 +1,5 @@
import time import time
import typing
import pytest import pytest
import respx import respx
@ -7,6 +8,9 @@ from nonebug.app import App
from .utils import get_json from .utils import get_json
if typing.TYPE_CHECKING:
from nonebot_bison.platform.ncm_radio import NcmRadio
@pytest.fixture @pytest.fixture
def ncm_radio(app: App): def ncm_radio(app: App):
@ -53,3 +57,14 @@ async def test_fetch_new(ncm_radio, ncm_radio_0, ncm_radio_1, dummy_user_subinfo
"http://p1.music.126.net/H5em5xUNIYXcjJhOmeaSqQ==/109951166647436789.jpg" "http://p1.music.126.net/H5em5xUNIYXcjJhOmeaSqQ==/109951166647436789.jpg"
] ]
assert post.target_name == "《明日方舟》游戏原声OST" assert post.target_name == "《明日方舟》游戏原声OST"
async def test_parse_target(ncm_radio: "NcmRadio"):
res = await ncm_radio.parse_target("https://music.163.com/#/djradio?id=793745436")
assert res == "793745436"
res = await ncm_radio.parse_target("music.163.com/#/djradio?id=793745436")
assert res == "793745436"
res = await ncm_radio.parse_target("793745436")
assert res == "793745436"
with pytest.raises(ncm_radio.ParseTargetException):
await ncm_radio.parse_target("music.163.com/#/alm?id=793745436")

View File

@ -1,3 +1,4 @@
import typing
from datetime import datetime from datetime import datetime
import feedparser import feedparser
@ -9,6 +10,9 @@ from pytz import timezone
from .utils import get_file, get_json from .utils import get_file, get_json
if typing.TYPE_CHECKING:
from nonebot_bison.platform.weibo import Weibo
@pytest.fixture @pytest.fixture
def weibo(app: App): def weibo(app: App):
@ -132,3 +136,16 @@ def test_chaohua_tag(weibo):
tags = weibo.get_tags(test_post) tags = weibo.get_tags(test_post)
assert "刚出生的小羊驼长啥样" in tags assert "刚出生的小羊驼长啥样" in tags
assert "小羊驼三三超话" in tags assert "小羊驼三三超话" in tags
async def test_parse_target(weibo: "Weibo"):
from nonebot_bison.platform.platform import Platform
res = await weibo.parse_target("https://weibo.com/u/6441489862")
assert res == "6441489862"
res = await weibo.parse_target("weibo.com/u/6441489862")
assert res == "6441489862"
res = await weibo.parse_target("6441489862")
assert res == "6441489862"
with pytest.raises(Platform.ParseTargetException):
await weibo.parse_target("https://weibo.com/arknights")

View File

@ -67,6 +67,7 @@ async def test_abort_add_on_id(app: App):
from nonebot_bison.config import Config from nonebot_bison.config import Config
from nonebot_bison.config_manager import add_sub_matcher, common_platform from nonebot_bison.config_manager import add_sub_matcher, common_platform
from nonebot_bison.platform import platform_manager from nonebot_bison.platform import platform_manager
from nonebot_bison.platform.weibo import Weibo
config = Config() config = Config()
config.user_target.truncate() config.user_target.truncate()
@ -103,7 +104,7 @@ async def test_abort_add_on_id(app: App):
ctx.receive_event(bot, event_2) ctx.receive_event(bot, event_2)
ctx.should_call_send( ctx.should_call_send(
event_2, event_2,
Message(BotReply.add_reply_on_id), Message(BotReply.add_reply_on_id(Weibo)),
True, True,
) )
event_abort = fake_group_message_event( event_abort = fake_group_message_event(
@ -127,6 +128,7 @@ async def test_abort_add_on_cats(app: App):
from nonebot_bison.config import Config from nonebot_bison.config import Config
from nonebot_bison.config_manager import add_sub_matcher, common_platform from nonebot_bison.config_manager import add_sub_matcher, common_platform
from nonebot_bison.platform import platform_manager from nonebot_bison.platform import platform_manager
from nonebot_bison.platform.weibo import Weibo
config = Config() config = Config()
config.user_target.truncate() config.user_target.truncate()
@ -167,7 +169,7 @@ async def test_abort_add_on_cats(app: App):
ctx.receive_event(bot, event_2) ctx.receive_event(bot, event_2)
ctx.should_call_send( ctx.should_call_send(
event_2, event_2,
Message(BotReply.add_reply_on_id), Message(BotReply.add_reply_on_id(Weibo)),
True, True,
) )
event_3 = fake_group_message_event( event_3 = fake_group_message_event(
@ -207,6 +209,7 @@ async def test_abort_add_on_tag(app: App):
from nonebot_bison.config import Config from nonebot_bison.config import Config
from nonebot_bison.config_manager import add_sub_matcher, common_platform from nonebot_bison.config_manager import add_sub_matcher, common_platform
from nonebot_bison.platform import platform_manager from nonebot_bison.platform import platform_manager
from nonebot_bison.platform.weibo import Weibo
config = Config() config = Config()
config.user_target.truncate() config.user_target.truncate()
@ -247,7 +250,7 @@ async def test_abort_add_on_tag(app: App):
ctx.receive_event(bot, event_2) ctx.receive_event(bot, event_2)
ctx.should_call_send( ctx.should_call_send(
event_2, event_2,
Message(BotReply.add_reply_on_id), Message(BotReply.add_reply_on_id(Weibo)),
True, True,
) )
event_3 = fake_group_message_event( event_3 = fake_group_message_event(

View File

@ -64,6 +64,7 @@ async def test_add_with_target(app: App):
from nonebot_bison.config import Config from nonebot_bison.config import Config
from nonebot_bison.config_manager import add_sub_matcher, common_platform from nonebot_bison.config_manager import add_sub_matcher, common_platform
from nonebot_bison.platform import platform_manager from nonebot_bison.platform import platform_manager
from nonebot_bison.platform.weibo import Weibo
config = Config() config = Config()
config.user_target.truncate() config.user_target.truncate()
@ -115,7 +116,7 @@ async def test_add_with_target(app: App):
ctx.receive_event(bot, event_3) ctx.receive_event(bot, event_3)
ctx.should_call_send( ctx.should_call_send(
event_3, event_3,
Message(BotReply.add_reply_on_id), Message(BotReply.add_reply_on_id(Weibo)),
True, True,
) )
event_4_err = fake_group_message_event( event_4_err = fake_group_message_event(
@ -181,6 +182,7 @@ async def test_add_with_target_no_cat(app: App):
from nonebot_bison.config import Config from nonebot_bison.config import Config
from nonebot_bison.config_manager import add_sub_matcher, common_platform from nonebot_bison.config_manager import add_sub_matcher, common_platform
from nonebot_bison.platform import platform_manager from nonebot_bison.platform import platform_manager
from nonebot_bison.platform.ncm_artist import NcmArtist
config = Config() config = Config()
config.user_target.truncate() config.user_target.truncate()
@ -208,7 +210,7 @@ async def test_add_with_target_no_cat(app: App):
ctx.receive_event(bot, event_3) ctx.receive_event(bot, event_3)
ctx.should_call_send( ctx.should_call_send(
event_3, event_3,
Message(BotReply.add_reply_on_id), Message(BotReply.add_reply_on_id(NcmArtist)),
True, True,
) )
event_4_ok = fake_group_message_event( event_4_ok = fake_group_message_event(
@ -332,6 +334,7 @@ async def test_add_with_get_id(app: App):
from nonebot_bison.config import Config from nonebot_bison.config import Config
from nonebot_bison.config_manager import add_sub_matcher, common_platform from nonebot_bison.config_manager import add_sub_matcher, common_platform
from nonebot_bison.platform import platform_manager from nonebot_bison.platform import platform_manager
from nonebot_bison.platform.weibo import Weibo
config = Config() config = Config()
config.user_target.truncate() config.user_target.truncate()
@ -373,7 +376,7 @@ async def test_add_with_get_id(app: App):
ctx.receive_event(bot, event_3) ctx.receive_event(bot, event_3)
ctx.should_call_send( ctx.should_call_send(
event_3, event_3,
Message(BotReply.add_reply_on_id), Message(BotReply.add_reply_on_id(Weibo)),
True, True,
) )
event_4_query = fake_group_message_event( event_4_query = fake_group_message_event(
@ -387,8 +390,8 @@ async def test_add_with_get_id(app: App):
True, True,
) )
""" """
关于Message([MessageSegment(*BotReply.add_reply_on_id_input_search())]): 关于Message([MessageSegment(*BotReply.add_reply_on_id_input_search())])
异客知道为什么要在这里这样写 知道为什么要在这里这样写
没有[]的话assert不了(should_call_send使用[MessageSegment(...)]的格式进行比较) 没有[]的话assert不了(should_call_send使用[MessageSegment(...)]的格式进行比较)
不在这里MessageSegment()的话也assert不了(指不能让add_reply_on_id_input_search直接返回一个MessageSegment对象) 不在这里MessageSegment()的话也assert不了(指不能让add_reply_on_id_input_search直接返回一个MessageSegment对象)
amen amen
@ -405,3 +408,125 @@ async def test_add_with_get_id(app: App):
ctx.should_finished() ctx.should_finished()
subs = config.list_subscribe(10000, "group") subs = config.list_subscribe(10000, "group")
assert len(subs) == 0 assert len(subs) == 0
@pytest.mark.asyncio
@respx.mock
async def test_add_with_bilibili_target_parser(app: App):
from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config import Config
from nonebot_bison.config_manager import add_sub_matcher, common_platform
from nonebot_bison.platform import platform_manager
from nonebot_bison.platform.bilibili import Bilibili
config = Config()
config.user_target.truncate()
ak_list_router = respx.get(
"https://api.bilibili.com/x/space/acc/info?mid=161775300"
)
ak_list_router.mock(
return_value=Response(200, json=get_json("bilibili_arknights_profile.json"))
)
async with app.test_matcher(add_sub_matcher) as ctx:
bot = ctx.create_bot()
event_1 = fake_group_message_event(
message=Message("添加订阅"),
sender=Sender(card="", nickname="test", role="admin"),
to_me=True,
)
ctx.receive_event(bot, event_1)
ctx.should_pass_rule()
ctx.should_call_send(
event_1,
Message(
BotReply.add_reply_on_platform(
platform_manager=platform_manager, common_platform=common_platform
)
),
True,
)
event_2 = fake_group_message_event(
message=Message("全部"), sender=Sender(card="", nickname="test", role="admin")
)
ctx.receive_event(bot, event_2)
ctx.should_rejected()
ctx.should_call_send(
event_2,
BotReply.add_reply_on_platform_input_allplatform(platform_manager),
True,
)
event_3 = fake_group_message_event(
message=Message("bilibili"), sender=fake_admin_user
)
ctx.receive_event(bot, event_3)
assert Bilibili.parse_target_promot
ctx.should_call_send(
event_3,
Message(BotReply.add_reply_on_id(Bilibili)),
True,
)
event_4_err1 = fake_group_message_event(
message=Message(
"https://live.bilibili.com/5555734?broadcast_type=0&is_room_feed=1&spm_id_from=333.999.0.0"
),
sender=fake_admin_user,
)
ctx.receive_event(bot, event_4_err1)
ctx.should_call_send(
event_4_err1, BotReply.add_reply_on_target_parse_input_error, True
)
ctx.should_rejected()
event_4_err1 = fake_group_message_event(
message=Message(
"https://space.bilibili.com/ark161775300?from=search&seid=13051774060625135297&spm_id_from=333.337.0.0"
),
sender=fake_admin_user,
)
ctx.receive_event(bot, event_4_err1)
ctx.should_call_send(
event_4_err1, BotReply.add_reply_on_target_parse_input_error, True
)
ctx.should_rejected()
event_4_ok = fake_group_message_event(
message=Message(
"https://space.bilibili.com/161775300?from=search&seid=13051774060625135297&spm_id_from=333.337.0.0"
),
sender=fake_admin_user,
)
ctx.receive_event(bot, event_4_ok)
ctx.should_call_send(
event_4_ok,
BotReply.add_reply_on_target_confirm("bilibili", "明日方舟", "161775300"),
True,
)
ctx.should_call_send(
event_4_ok,
Message(BotReply.add_reply_on_cats(platform_manager, "bilibili")),
True,
)
event_5_ok = fake_group_message_event(
message=Message("视频"), sender=fake_admin_user
)
ctx.receive_event(bot, event_5_ok)
ctx.should_call_send(event_5_ok, Message(BotReply.add_reply_on_tags), True)
event_6 = fake_group_message_event(
message=Message("全部标签"), sender=fake_admin_user
)
ctx.receive_event(bot, event_6)
ctx.should_call_send(
event_6, BotReply.add_reply_subscribe_success("明日方舟"), True
)
ctx.should_finished()
subs = config.list_subscribe(10000, "group")
assert len(subs) == 1
sub = subs[0]
assert sub["target"] == "161775300"
assert sub["tags"] == []
assert sub["cats"] == [platform_manager["bilibili"].reverse_category["视频"]]
assert sub["target_type"] == "bilibili"
assert sub["target_name"] == "明日方舟"

View File

@ -1,3 +1,4 @@
from ast import Str
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from typing_extensions import Literal from typing_extensions import Literal
@ -130,8 +131,18 @@ class BotReply:
def add_reply_subscribe_success(name): def add_reply_subscribe_success(name):
return "添加 {} 成功".format(name) return "添加 {} 成功".format(name)
@staticmethod
def add_reply_on_id(platform: object) -> Str:
base_text = "请输入订阅用户的id\n查询id获取方法请回复:“查询”"
extra_text = (
("1." + platform.parse_target_promot + "\n2.")
if platform.parse_target_promot
else ""
)
return extra_text + base_text
add_reply_on_id_input_error = "id输入错误" add_reply_on_id_input_error = "id输入错误"
add_reply_on_target_parse_input_error = "不能从你的输入中提取出id请检查你输入的内容是否符合预期"
add_reply_on_platform_input_error = "平台输入错误" add_reply_on_platform_input_error = "平台输入错误"
add_reply_on_id = "请输入订阅用户的id:\n查询id获取方法请回复:“查询”"
add_reply_on_tags = '请输入要订阅的tag订阅所有tag输入"全部标签"' add_reply_on_tags = '请输入要订阅的tag订阅所有tag输入"全部标签"'
add_reply_abort = "已中止订阅" add_reply_abort = "已中止订阅"