import contextlib from nonebot.typing import T_State from nonebot.matcher import Matcher from nonebot.params import Arg, ArgPlainText from nonebot.adapters import Message, MessageTemplate from nonebot_plugin_saa import Text, PlatformTarget, SupportedAdapters from ..types import Target from ..config import config from ..apis import check_sub_target from ..platform import Platform, platform_manager from ..config.db_config import SubscribeDupException from .utils import common_platform, ensure_user_info, gen_handle_cancel def do_add_sub(add_sub: type[Matcher]): handle_cancel = gen_handle_cancel(add_sub, "已中止订阅") add_sub.handle()(ensure_user_info(add_sub)) @add_sub.handle() async def init_promote(state: T_State): state["_prompt"] = ( "请输入想要订阅的平台,目前支持,请输入冒号左边的名称:\n" + "".join( [f"{platform_name}: {platform_manager[platform_name].name}\n" for platform_name in common_platform] ) + "要查看全部平台请输入:“全部”\n中止订阅过程请输入:“取消”" ) @add_sub.got("platform", MessageTemplate("{_prompt}"), [handle_cancel]) async def parse_platform(state: T_State, platform: str = ArgPlainText()) -> None: if platform == "全部": message = "全部平台\n" + "\n".join( [f"{platform_name}: {platform.name}" for platform_name, platform in platform_manager.items()] ) await add_sub.reject(message) elif platform == "取消": await add_sub.finish("已中止订阅") elif platform in platform_manager: state["platform"] = platform else: await add_sub.reject("平台输入错误") @add_sub.handle() async def prepare_get_id(matcher: Matcher, state: T_State): cur_platform = platform_manager[state["platform"]] if cur_platform.has_target: state["_prompt"] = ( ("1." + cur_platform.parse_target_promot + "\n2.") if cur_platform.parse_target_promot else "" ) + "请输入订阅用户的id\n查询id获取方法请回复:“查询”" else: matcher.set_arg("raw_id", None) # type: ignore state["id"] = "default" state["name"] = await check_sub_target(state["platform"], Target("")) @add_sub.got("raw_id", MessageTemplate("{_prompt}"), [handle_cancel]) async def got_id(state: T_State, raw_id: Message = Arg()): raw_id_text = raw_id.extract_plain_text() try: if raw_id_text == "查询": url = "https://nonebot-bison.netlify.app/usage/#%E6%89%80%E6%94%AF%E6%8C%81%E5%B9%B3%E5%8F%B0%E7%9A%84-uid" msg = Text(url) with contextlib.suppress(ImportError): from nonebot.adapters.onebot.v11 import MessageSegment title = "Bison所支持的平台UID" content = "查询相关平台的uid格式或获取方式" image = "https://s3.bmp.ovh/imgs/2022/03/ab3cc45d83bd3dd3.jpg" msg.overwrite( SupportedAdapters.onebot_v11, MessageSegment.share(url=url, title=title, content=content, image=image), ) await msg.reject() platform = platform_manager[state["platform"]] with contextlib.suppress(ImportError): from nonebot.adapters.onebot.v11 import Message from nonebot.adapters.onebot.v11.utils import unescape if isinstance(raw_id, Message): raw_id_text = unescape(raw_id_text) raw_id_text = await platform.parse_target(raw_id_text) name = await check_sub_target(state["platform"], raw_id_text) if not name: await add_sub.reject("id输入错误") state["id"] = raw_id_text state["name"] = name except Platform.ParseTargetException as e: await add_sub.reject( "不能从你的输入中提取出id,请检查你输入的内容是否符合预期" + (f"\n{e.prompt}" if e.prompt else "") ) else: await add_sub.send( f"即将订阅的用户为:{state['platform']} {state['name']} {state['id']}\n如有错误请输入“取消”重新订阅" ) @add_sub.handle() async def prepare_get_categories(matcher: Matcher, state: T_State): if not platform_manager[state["platform"]].categories: matcher.set_arg("raw_cats", None) # type: ignore state["cats"] = [] return state["_prompt"] = "请输入要订阅的类别,以空格分隔,支持的类别有:{}".format( " ".join(list(platform_manager[state["platform"]].categories.values())) ) @add_sub.got("raw_cats", MessageTemplate("{_prompt}"), [handle_cancel]) async def parser_cats(state: T_State, raw_cats: Message = Arg()): raw_cats_text = raw_cats.extract_plain_text() res = [] if platform_manager[state["platform"]].categories: for cat in raw_cats_text.split(): if cat not in platform_manager[state["platform"]].reverse_category: await add_sub.reject(f"不支持 {cat}") res.append(platform_manager[state["platform"]].reverse_category[cat]) state["cats"] = res @add_sub.handle() async def prepare_get_tags(matcher: Matcher, state: T_State): if not platform_manager[state["platform"]].enable_tag: matcher.set_arg("raw_tags", None) # type: ignore state["tags"] = [] return state["_prompt"] = ( '请输入要订阅/屏蔽的标签(不含#号)\n多个标签请使用空格隔开\n订阅所有标签输入"全部标签"\n具体规则回复"详情"' ) @add_sub.got("raw_tags", MessageTemplate("{_prompt}"), [handle_cancel]) async def parser_tags(state: T_State, raw_tags: Message = Arg()): raw_tags_text = raw_tags.extract_plain_text() if raw_tags_text == "详情": await add_sub.reject( "订阅标签直接输入标签内容\n" "屏蔽标签请在标签名称前添加~号\n" "详见https://nonebot-bison.netlify.app/usage/#%E5%B9%B3%E5%8F%B0%E8%AE%A2%E9%98%85%E6%A0%87%E7%AD%BE-tag" ) if raw_tags_text in ["全部标签", "全部", "全标签"]: state["tags"] = [] else: state["tags"] = raw_tags_text.split() @add_sub.handle() async def add_sub_process(state: T_State, user: PlatformTarget = Arg("target_user_info")): try: await config.add_subscribe( user=user, target=state["id"], target_name=state["name"], platform_name=state["platform"], cats=state.get("cats", []), tags=state.get("tags", []), ) except SubscribeDupException: await add_sub.finish(f"添加 {state['name']} 失败: 已存在该订阅") except Exception as e: await add_sub.finish(f"添加 {state['name']} 失败: {e}") await add_sub.finish("添加 {} 成功".format(state["name"]))