Merge branch 'dev' into feat/db

This commit is contained in:
felinae98 2022-03-22 20:07:23 +08:00
commit ad80006844
No known key found for this signature in database
GPG Key ID: 00C8B010587FF610
24 changed files with 1123 additions and 261 deletions

View File

@ -14,10 +14,10 @@ workflows:
pre-steps: pre-steps:
- run: - run:
command: | command: |
if [ -z "${CIRCLE_PULL_REQUEST##*/}" ] if [[ -n "${CIRCLE_PULL_REQUEST##*/}" && ${CIRCLE_BRANCH} =~ "pull/" ]]
then then
IS_PR=false
else
IS_PR=true IS_PR=true
else
IS_PR=false
fi fi
echo '{ "is_pr": '$IS_PR' }' >> /home/circleci/params.json echo '{ "is_pr": '$IS_PR' }' >> /home/circleci/params.json

View File

@ -159,9 +159,9 @@ jobs:
# - run: sed -e '41,45d' -i pyproject.toml # - run: sed -e '41,45d' -i pyproject.toml
- python/install-packages: - python/install-packages:
pkg-manager: poetry pkg-manager: poetry
- run: # - run:
name: Install browser # name: Install browser
command: poetry run playwright install-deps && poetry run playwright install chromium # command: poetry run playwright install-deps && poetry run playwright install chromium
- run: - run:
name: Coverage test name: Coverage test
command: poetry run pytest --cov-report html --cov-report xml --cov=./src/plugins/nonebot_bison --junitxml=test-results/junit.xml command: poetry run pytest --cov-report html --cov-report xml --cov=./src/plugins/nonebot_bison --junitxml=test-results/junit.xml

2
.gitignore vendored
View File

@ -22,6 +22,7 @@
__pycache__/ __pycache__/
*.py[cod] *.py[cod]
*$py.class *$py.class
venv_test/
# C extensions # C extensions
*.so *.so
@ -274,6 +275,7 @@ dist
# vuepress build output # vuepress build output
.vuepress/dist .vuepress/dist
docs/.vuepress/.temp/
# Serverless directories # Serverless directories
.serverless/ .serverless/

View File

@ -66,5 +66,8 @@
## [0.5.1] ## [0.5.1]
- 使用了新的私聊进行群管理的方式 - 使用了新的私聊进行群管理的方式:从`管理-*`替换为`群管理`命令
- 默认关闭自动重发功能 - 默认关闭自动重发功能
- 添加了 [推送消息合并转发功能](https://nonebot-bison.vercel.app/usage/#%E9%85%8D%E7%BD%AE)
- 添加了`添加订阅`命令事件的中途取消功能
- 优化了`添加订阅`命令的聊天处理逻辑

View File

@ -64,6 +64,8 @@ yarn && yarn build
本项目使用了 Python 3.9 的语法,请将 Python 版本升级到 3.9 及以上,推荐使用 docker 部署 本项目使用了 Python 3.9 的语法,请将 Python 版本升级到 3.9 及以上,推荐使用 docker 部署
2. bot 不理我 2. bot 不理我
请确认自己是群主或者管理员,并且检查`COMMAND_START`环境变量是否设为`[""]` 请确认自己是群主或者管理员,并且检查`COMMAND_START`环境变量是否设为`[""]`
或者按照`COMMAND_START`中的设置添加命令前缀,例:
`COMMAND_START=["/"]`则应发送`/添加订阅`
3. 微博漏订阅了 3. 微博漏订阅了
微博更新了新的风控措施,某些含有某些关键词的微博会获取不到。 微博更新了新的风控措施,某些含有某些关键词的微博会获取不到。

View File

@ -2,7 +2,7 @@ module.exports = {
title: 'Nonebot Bison', title: 'Nonebot Bison',
description: 'Docs for Nonebot Bison', description: 'Docs for Nonebot Bison',
themeConfig: { themeConfig: {
nav: [ navbar: [
{ text: '主页', link: '/' }, { text: '主页', link: '/' },
{ text: '部署与使用', link: '/usage/' }, { text: '部署与使用', link: '/usage/' },
{ text: '开发', link: '/dev/' }, { text: '开发', link: '/dev/' },

Binary file not shown.

After

Width:  |  Height:  |  Size: 36 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 12 KiB

View File

@ -2,8 +2,9 @@
home: true home: true
heroText: Nonebot Bison heroText: Nonebot Bison
tagline: 本bot励志做全泰拉骑车最快的信使 tagline: 本bot励志做全泰拉骑车最快的信使
actionText: 快速部署 actions:
actionLink: /usage/ - text: 快速部署
link: /usage/
features: features:
- title: 拓展性强 - title: 拓展性强
details: 没有自己想要的网站?只要简单的爬虫知识就可以给它适配一个新的网站 details: 没有自己想要的网站?只要简单的爬虫知识就可以给它适配一个新的网站

View File

@ -2,7 +2,32 @@
sidebar: auto sidebar: auto
--- ---
# 开发指南 # 基本开发须知
## 语言以及工具
1. 本项目使用了`python3.9`的特性进行开发,所以请确保你的 Python 版本>=3.9
2. 本项目使用 poetry 进行依赖管理,请确保开发之前已经进行过`poetry install`,运行时在`poetry shell`的环境中进行运行
3. 本项目使用的 node 项目管理工具是 yarn
## 前端
本项目使用了前端,如果单独 clone 仓库本身,里面是**不包含**编译过的前端的,请使用`yarn && yarn build`进行前端的构建。
如果想要开发前端,推荐在`.env.dev`中加入`BISON_OUTER_URL="http://localhost:3000/bison/"`,然后分别运行 bot 和`yarn dev`
::: warning
请在开发前端的时候删除项目根目录中的`node_modules`,否则编译和运行的时候可能会出现奇怪的问题。
:::
## 文档
文档的相关部分在`docs`目录中,可以在项目根目录执行`yarn docs:dev`预览文件更改效果。
## 代码格式
本项目使用了 pre-commit 来进行代码美化和格式化。在`poetry shell`状态下执行`pre-commit install`来安装 git hook可自动在 commit 时
格式化代码。
# 适配新网站
本插件需要你的帮助!只需要会写简单的爬虫,就能给本插件适配新的网站。 本插件需要你的帮助!只需要会写简单的爬虫,就能给本插件适配新的网站。

View File

@ -115,13 +115,32 @@ sidebar: auto
开启,默认关 开启,默认关
- `BISON_USE_QUEUE`: 是否用队列的方式发送消息,降低发送频率,默认开 - `BISON_USE_QUEUE`: 是否用队列的方式发送消息,降低发送频率,默认开
- `BISON_RESEND_TIMES`: 最大重发次数,默认 0 - `BISON_RESEND_TIMES`: 最大重发次数,默认 0
- `BISON_USE_PIC_MERGE`: 是否启用多图片时合并转发(仅限群)
- `0`: 不启用(默认)
- `1`: 首条消息单独发送,剩余图片合并转发
- `2`: 所有消息全部合并转发
::: details 配置项示例
- 当`BISON_USE_PIC_MERGE=1`时:
![simple1](/images/forward-msg-simple1.png)
- 当`BISON_USE_PIC_MERGE=2`时:
![simple1](/images/forward-msg-simple2.png)
:::
::: warning
启用此功能时,可能会因为待推送图片过大/过多而导致文字消息与合并转发图片消息推送间隔过大(选择模式`1`时),请谨慎考虑开启。或者选择模式`2`,使图文消息一同合并转发(可能会使消息推送延迟过长)
:::
## 使用 ## 使用
::: warning ::: warning
本节假设`COMMAND_START`设置中包含`''`,如果出现 bot 不响应的问题,请先 本节假设`COMMAND_START`设置中包含`''`
排查这个设置
::: - 如果出现 bot 不响应的问题,请先排查这个设置
- 尝试在命令前添加设置的命令前缀,如`COMMAND_START=['/']`,则尝试使用`/添加订阅`
:::
### 命令 ### 命令

View File

@ -13,9 +13,10 @@ from nonebot.log import logger
from nonebot.matcher import Matcher from nonebot.matcher import Matcher
from nonebot.params import Depends, EventPlainText, EventToMe from nonebot.params import Depends, EventPlainText, EventToMe
from nonebot.permission import SUPERUSER from nonebot.permission import SUPERUSER
from nonebot.rule import to_me
from nonebot.typing import T_State from nonebot.typing import T_State
from .config import config from .config import Config
from .platform import check_sub_target, platform_manager from .platform import check_sub_target, platform_manager
from .plugin_config import plugin_config from .plugin_config import plugin_config
from .types import Category, Target, User from .types import Category, Target, User
@ -81,7 +82,7 @@ def do_add_sub(add_sub: Type[Matcher]):
for platform_name in common_platform for platform_name in common_platform
] ]
) )
+ "要查看全部平台请输入:“全部”" + "要查看全部平台请输入:“全部”\n中止订阅过程请输入:“取消”"
) )
async def parse_platform(event: MessageEvent, state: T_State) -> None: async def parse_platform(event: MessageEvent, state: T_State) -> None:
@ -96,6 +97,8 @@ def do_add_sub(add_sub: Type[Matcher]):
] ]
) )
await add_sub.reject(message) await add_sub.reject(message)
elif platform == "取消":
await add_sub.finish("已中止订阅")
elif platform in platform_manager: elif platform in platform_manager:
state["platform"] = platform state["platform"] = platform
else: else:
@ -106,9 +109,7 @@ def do_add_sub(add_sub: Type[Matcher]):
) )
async def init_id(state: T_State): async def init_id(state: T_State):
if platform_manager[state["platform"]].has_target: if platform_manager[state["platform"]].has_target:
state[ state["_prompt"] = "请输入订阅用户的id:\n查询id获取方法请回复:“查询”"
"_prompt"
] = "请输入订阅用户的id详情查阅https://nonebot-bison.vercel.app/usage/#%E6%89%80%E6%94%AF%E6%8C%81%E5%B9%B3%E5%8F%B0%E7%9A%84uid"
else: else:
state["id"] = "default" state["id"] = "default"
state["name"] = await platform_manager[state["platform"]].get_target_name( state["name"] = await platform_manager[state["platform"]].get_target_name(
@ -120,13 +121,32 @@ def do_add_sub(add_sub: Type[Matcher]):
return return
target = str(event.get_message()).strip() target = str(event.get_message()).strip()
try: try:
if target == "查询":
raise LookupError
if target == "取消":
raise KeyboardInterrupt
name = await check_sub_target(state["platform"], target) name = await check_sub_target(state["platform"], target)
if not name: if not name:
raise ValueError raise ValueError
state["id"] = target state["id"] = target
state["name"] = name state["name"] = name
except: except (LookupError):
url = "https://nonebot-bison.vercel.app/usage/#%E6%89%80%E6%94%AF%E6%8C%81%E5%B9%B3%E5%8F%B0%E7%9A%84-uid"
title = "Bison所支持的平台UID"
content = "查询相关平台的uid格式或获取方式"
image = "https://s3.bmp.ovh/imgs/2022/03/ab3cc45d83bd3dd3.jpg"
getId_share = f"[CQ:share,url={url},title={title},content={content},image={image}]" # 缩短字符串格式长度,以及方便后续修改为消息段格式
await add_sub.reject(Message(getId_share))
except (KeyboardInterrupt):
await add_sub.finish("已中止订阅")
except (ValueError):
await add_sub.reject("id输入错误") await add_sub.reject("id输入错误")
else:
await add_sub.send(
"即将订阅的用户为:{} {} {}\n如有错误请输入“取消”重新订阅".format(
state["platform"], state["name"], state["id"]
)
)
@add_sub.got("id", _gen_prompt_template("{_prompt}"), [Depends(parse_id)]) @add_sub.got("id", _gen_prompt_template("{_prompt}"), [Depends(parse_id)])
async def init_cat(state: T_State): async def init_cat(state: T_State):
@ -142,7 +162,9 @@ def do_add_sub(add_sub: Type[Matcher]):
return return
res = [] res = []
for cat in str(event.get_message()).strip().split(): for cat in str(event.get_message()).strip().split():
if cat not in platform_manager[state["platform"]].reverse_category: if cat == "取消":
await add_sub.finish("已中止订阅")
elif cat not in platform_manager[state["platform"]].reverse_category:
await add_sub.reject("不支持 {}".format(cat)) await add_sub.reject("不支持 {}".format(cat))
res.append(platform_manager[state["platform"]].reverse_category[cat]) res.append(platform_manager[state["platform"]].reverse_category[cat])
state["cats"] = res state["cats"] = res
@ -157,6 +179,8 @@ def do_add_sub(add_sub: Type[Matcher]):
async def parser_tags(event: MessageEvent, state: T_State): async def parser_tags(event: MessageEvent, state: T_State):
if not isinstance(state["tags"], Message): if not isinstance(state["tags"], Message):
return return
if str(event.get_message()).strip() == "取消": # 一般不会有叫 取消 的tag吧
await add_sub.finish("已中止订阅")
if str(event.get_message()).strip() == "全部标签": if str(event.get_message()).strip() == "全部标签":
state["tags"] = [] state["tags"] = []
else: else:
@ -164,6 +188,7 @@ def do_add_sub(add_sub: Type[Matcher]):
@add_sub.got("tags", _gen_prompt_template("{_prompt}"), [Depends(parser_tags)]) @add_sub.got("tags", _gen_prompt_template("{_prompt}"), [Depends(parser_tags)])
async def add_sub_process(event: Event, state: T_State): async def add_sub_process(event: Event, state: T_State):
config = Config()
user = state.get("target_user_info") user = state.get("target_user_info")
assert isinstance(user, User) assert isinstance(user, User)
config.add_subscribe( config.add_subscribe(
@ -185,6 +210,7 @@ def do_query_sub(query_sub: Type[Matcher]):
@query_sub.handle() @query_sub.handle()
async def _(state: T_State): async def _(state: T_State):
config: Config = Config()
user_info = state["target_user_info"] user_info = state["target_user_info"]
assert isinstance(user_info, User) assert isinstance(user_info, User)
sub_list = config.list_subscribe( sub_list = config.list_subscribe(
@ -215,6 +241,7 @@ def do_del_sub(del_sub: Type[Matcher]):
@del_sub.handle() @del_sub.handle()
async def send_list(bot: Bot, event: Event, state: T_State): async def send_list(bot: Bot, event: Event, state: T_State):
config: Config = Config()
user_info = state["target_user_info"] user_info = state["target_user_info"]
assert isinstance(user_info, User) assert isinstance(user_info, User)
sub_list = config.list_subscribe( sub_list = config.list_subscribe(
@ -249,6 +276,7 @@ def do_del_sub(del_sub: Type[Matcher]):
async def do_del(event: Event, state: T_State): async def do_del(event: Event, state: T_State):
try: try:
index = int(str(event.get_message()).strip()) index = int(str(event.get_message()).strip())
config = Config()
user_info = state["target_user_info"] user_info = state["target_user_info"]
assert isinstance(user_info, User) assert isinstance(user_info, User)
config.del_subscribe( config.del_subscribe(
@ -288,11 +316,16 @@ del_sub_matcher = on_command(
del_sub_matcher.handle()(set_target_user_info) del_sub_matcher.handle()(set_target_user_info)
do_del_sub(del_sub_matcher) do_del_sub(del_sub_matcher)
group_manage_matcher = on_command("群管理") group_manage_matcher = on_command("群管理", rule=to_me(), permission=SUPERUSER, priority=4)
@group_manage_matcher.handle() @group_manage_matcher.handle()
async def send_group_list(bot: Bot, state: T_State): async def send_group_list_private(bot: Bot, event: GroupMessageEvent, state: T_State):
await group_manage_matcher.finish(Message("该功能只支持私聊使用请私聊Bot"))
@group_manage_matcher.handle()
async def send_group_list(bot: Bot, event: PrivateMessageEvent, state: T_State):
groups = await bot.call_api("get_group_list") groups = await bot.call_api("get_group_list")
res_text = "请选择需要管理的群:\n" res_text = "请选择需要管理的群:\n"
group_number_idx = {} group_number_idx = {}
@ -349,13 +382,13 @@ async def do_dispatch_command(
"message", "message",
Rule(), Rule(),
permission, permission,
None, handlers=None,
True, temp=True,
priority=0, priority=0,
block=True, block=True,
plugin=matcher.plugin, plugin=matcher.plugin,
module=matcher.module, module=matcher.module,
expire_time=datetime.now() + bot.config.session_expire_timeout, expire_time=datetime.now(),
default_state=matcher.state, default_state=matcher.state,
default_type_updater=matcher.__class__._default_type_updater, default_type_updater=matcher.__class__._default_type_updater,
default_permission_updater=matcher.__class__._default_permission_updater, default_permission_updater=matcher.__class__._default_permission_updater,
@ -368,34 +401,3 @@ async def do_dispatch_command(
do_del_sub(new_matcher) do_del_sub(new_matcher)
new_matcher_ins = new_matcher() new_matcher_ins = new_matcher()
asyncio.create_task(new_matcher_ins.run(bot, event, state)) asyncio.create_task(new_matcher_ins.run(bot, event, state))
test_matcher = on_command("testtt")
@test_matcher.handle()
async def _handler(bot: Bot, event: Event, matcher: Matcher, state: T_State):
permission = await matcher.update_permission(bot, event)
new_matcher = Matcher.new(
"message",
Rule(),
permission,
None,
True,
priority=0,
block=True,
plugin=matcher.plugin,
module=matcher.module,
expire_time=datetime.now() + bot.config.session_expire_timeout,
default_state=matcher.state,
default_type_updater=matcher.__class__._default_type_updater,
default_permission_updater=matcher.__class__._default_permission_updater,
)
async def h():
logger.warning("yes")
await new_matcher.send("666")
new_matcher.handle()(h)
new_matcher_ins = new_matcher()
await new_matcher_ins.run(bot, event, state)

View File

@ -12,6 +12,8 @@ class PlugConfig(BaseSettings):
bison_filter_log: bool = False bison_filter_log: bool = False
bison_to_me: bool = True bison_to_me: bool = True
bison_skip_browser_check: bool = False bison_skip_browser_check: bool = False
bison_use_pic_merge: int = 0 # 多图片时启用图片合并转发(仅限群),当bison_use_queue为False时该配置不会生效
# 0不启用1首条消息单独发送剩余照片合并转发2以及以上所有消息全部合并转发
bison_resend_times: int = 0 bison_resend_times: int = 0
class Config: class Config:

View File

@ -24,7 +24,7 @@ class Post:
pics: list[Union[str, bytes]] = field(default_factory=list) pics: list[Union[str, bytes]] = field(default_factory=list)
extra_msg: list[Message] = field(default_factory=list) extra_msg: list[Message] = field(default_factory=list)
_message: Optional[list] = None _message: Optional[list[Message]] = None
def _use_pic(self): def _use_pic(self):
if not self.override_use_pic is None: if not self.override_use_pic is None:
@ -107,10 +107,10 @@ class Post:
self.pics = self.pics[matrix[0] * matrix[1] :] self.pics = self.pics[matrix[0] * matrix[1] :]
self.pics.insert(0, target_io.getvalue()) self.pics.insert(0, target_io.getvalue())
async def generate_messages(self): async def generate_messages(self) -> list[Message]:
if self._message is None: if self._message is None:
await self._pic_merge() await self._pic_merge()
msgs = [] msg_segments: list[MessageSegment] = []
text = "" text = ""
if self.text: if self.text:
if self._use_pic(): if self._use_pic():
@ -123,22 +123,24 @@ class Post:
if self.target_name: if self.target_name:
text += " {}".format(self.target_name) text += " {}".format(self.target_name)
if self._use_pic(): if self._use_pic():
msgs.append(await parse_text(text)) msg_segments.append(await parse_text(text))
if not self.target_type == "rss" and self.url: if not self.target_type == "rss" and self.url:
msgs.append(MessageSegment.text(self.url)) msg_segments.append(MessageSegment.text(self.url))
else: else:
if self.url: if self.url:
text += " \n详情: {}".format(self.url) text += " \n详情: {}".format(self.url)
msgs.append(MessageSegment.text(text)) msg_segments.append(MessageSegment.text(text))
for pic in self.pics: for pic in self.pics:
# if isinstance(pic, bytes): msg_segments.append(MessageSegment.image(pic))
# pic = 'base64://' + base64.b64encode(pic).decode()
# msgs.append(Message("[CQ:image,file={url}]".format(url=pic)))
msgs.append(MessageSegment.image(pic))
if self.compress: if self.compress:
msgs = [reduce(lambda x, y: x.append(y), msgs, Message())] msgs = [reduce(lambda x, y: x.append(y), msg_segments, Message())]
else:
msgs = list(
map(lambda msg_segment: Message([msg_segment]), msg_segments)
)
msgs.extend(self.extra_msg) msgs.extend(self.extra_msg)
self._message = msgs self._message = msgs
assert len(self._message) > 0, f"message list empty, {self}"
return self._message return self._message
def __str__(self): def __str__(self):

View File

@ -1,23 +1,36 @@
import time import time
from typing import Literal, Union from typing import Literal, Union
from nonebot.adapters import Message, MessageSegment
from nonebot.adapters.onebot.v11.bot import Bot from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message, MessageSegment
from nonebot.log import logger from nonebot.log import logger
from .plugin_config import plugin_config from .plugin_config import plugin_config
QUEUE = [] QUEUE: list[
tuple[
Bot,
int,
Literal["private", "group", "group-forward"],
Union[str, Message],
int,
]
] = []
LAST_SEND_TIME = time.time() LAST_SEND_TIME = time.time()
async def _do_send( async def _do_send(
bot: "Bot", user: str, user_type: str, msg: Union[str, Message, MessageSegment] bot: "Bot",
user: int,
user_type: Literal["group", "private", "group-forward"],
msg: Union[str, Message],
): ):
if user_type == "group": if user_type == "group":
await bot.call_api("send_group_msg", group_id=user, message=msg) await bot.send_group_msg(group_id=user, message=msg)
elif user_type == "private": elif user_type == "private":
await bot.call_api("send_private_msg", user_id=user, message=msg) await bot.send_private_msg(user_id=user, message=msg)
elif user_type == "group-forward":
await bot.send_group_forward_msg(group_id=user, messages=msg)
async def do_send_msgs(): async def do_send_msgs():
@ -39,10 +52,57 @@ async def do_send_msgs():
LAST_SEND_TIME = time.time() LAST_SEND_TIME = time.time()
async def send_msgs(bot: Bot, user, user_type: Literal["private", "group"], msgs: list): async def _send_msgs_dispatch(
bot: Bot,
user,
user_type: Literal["private", "group", "group-forward"],
msg: Union[str, Message],
):
if plugin_config.bison_use_queue: if plugin_config.bison_use_queue:
for msg in msgs: QUEUE.append((bot, user, user_type, msg, plugin_config.bison_resend_times))
QUEUE.append((bot, user, user_type, msg, plugin_config.bison_resend_times))
else: else:
await _do_send(bot, user, user_type, msg)
async def send_msgs(
bot: Bot, user, user_type: Literal["private", "group"], msgs: list[Message]
):
if not plugin_config.bison_use_pic_merge or user_type == "private":
for msg in msgs: for msg in msgs:
await _do_send(bot, user, user_type, msg) await _send_msgs_dispatch(bot, user, user_type, msg)
return
msgs = msgs.copy()
if plugin_config.bison_use_pic_merge == 1:
await _send_msgs_dispatch(bot, user, "group", msgs.pop(0))
if msgs:
if len(msgs) == 1: # 只有一条消息序列就不合并转发
await _send_msgs_dispatch(bot, user, "group", msgs.pop(0))
else:
group_bot_info = await bot.get_group_member_info(
group_id=user, user_id=int(bot.self_id), no_cache=True
) # 调用api获取群内bot的相关参数
# forward_msg = Message(
# [
# MessageSegment.node_custom(
# group_bot_info["user_id"],
# nickname=group_bot_info["card"] or group_bot_info["nickname"],
# content=msg,
# )
# for msg in msgs
# ]
# )
# FIXME: Because of https://github.com/nonebot/adapter-onebot/issues/9
forward_msg = [
{
"type": "node",
"data": {
"name": group_bot_info["card"] or group_bot_info["nickname"],
"uin": group_bot_info["user_id"],
"content": msg,
},
}
for msg in msgs
]
await _send_msgs_dispatch(bot, user, "group-forward", forward_msg)

View File

@ -0,0 +1,24 @@
<!doctype html>
<html>
<head>
<meta charset="utf-8" />
<meta http-equiv="X-UA-Compatible" content="IE=edge" />
<meta name="viewport" content="width=device-width, initial-scale=1, minimum-scale=1" />
<meta name="keywords" content="明日方舟,明日方舟官网,明日方舟手游,二次元,明日方舟Arknights,魔物娘,战棋,策略,塔防,塔防RPG,Arknights,人外,Monster" />
<meta name="description" content="《明日方舟》是一款魔物主题的策略手游。在游戏中,玩家将管理一艘满载“ 魔物干员”的方舟,为调查来源神秘的矿石灾难而踏上旅途。在这个宽广而危机四伏的世界中,你或许会看到废土中的城市废墟,或许会看到仿若幻境的亚人国度,或许会遭遇无法解读的神秘,或许参与无比残酷的战争。在有关幻想与异种生命的世界中,体验史诗与想象,情感与牵绊!" />
<link rel="icon" href="data:;base64,=" />
<title>公告</title>
<link rel="stylesheet" href="../../assets/css/announcement.v_0_1_2.css" />
</head>
<body>
<div class="main">
<div class="container">
<div class="banner-image-container cover">
<a class="cover-jumper" href="uniwebview://move?target=recruit&amp;param1=NORM_24_0_1">
<img class="banner-image" src="https://ak.hycdn.cn/announce/images/20211209/ff886833a69b48e513ff911f64f6d881.JPG" />
</a>
</div>
</div>
</div>
</body>
</html>

View File

@ -0,0 +1,91 @@
{
"focusAnnounceId": "816",
"announceList": [
{
"announceId": "809",
"title": "冰原信使系列\n新装限时上架",
"isWebUrl": true,
"webUrl": "https://ak.hycdn.cn/announce/IOS/announcement/809_1640060505.html",
"day": 21,
"month": 12,
"group": "ACTIVITY"
},
{
"announceId": "810",
"title": "寒武纪系列\n限时复刻上架",
"isWebUrl": true,
"webUrl": "https://ak.hycdn.cn/announce/IOS/announcement/810_1640060511.html",
"day": 21,
"month": 12,
"group": "ACTIVITY"
},
{
"announceId": "806",
"title": "跨年欢庆·回首\n限时寻访说明",
"isWebUrl": true,
"webUrl": "https://ak.hycdn.cn/announce/IOS/announcement/806_1639379808.html",
"day": 14,
"month": 12,
"group": "ACTIVITY"
},
{
"announceId": "802",
"title": "「制作组通讯」\n#15期",
"isWebUrl": true,
"webUrl": "https://ak.hycdn.cn/announce/IOS/announcement/802_1638871766.html",
"day": 8,
"month": 12,
"group": "SYSTEM"
},
{
"announceId": "97",
"title": "新人寻访特惠\n必得六星干员",
"isWebUrl": true,
"webUrl": "https://ak.hycdn.cn/announce/IOS/announcement/97_1606379786.html",
"day": 30,
"month": 4,
"group": "ACTIVITY"
},
{
"announceId": "95",
"title": "通关特定关卡\n赠送专属时装",
"isWebUrl": true,
"webUrl": "https://ak.hycdn.cn/announce/IOS/announcement/95_1606379781.html",
"day": 30,
"month": 4,
"group": "ACTIVITY"
},
{
"announceId": "192",
"title": "《明日方舟》\n公测开启说明",
"isWebUrl": true,
"webUrl": "https://ak.hycdn.cn/announce/IOS/announcement/192_1606379744.html",
"day": 30,
"month": 4,
"group": "SYSTEM"
},
{
"announceId": "98",
"title": "《明日方舟》\n公平运营申明",
"isWebUrl": true,
"webUrl": "https://ak.hycdn.cn/announce/IOS/announcement/98_1638970453.html",
"day": 30,
"month": 4,
"group": "SYSTEM"
},
{
"announceId": "94",
"title": "常驻活动介绍",
"isWebUrl": true,
"webUrl": "https://ak.hycdn.cn/announce/IOS/announcement/94_1606379757.html",
"day": 30,
"month": 4,
"group": "ACTIVITY"
}
],
"extra": {
"enable": false,
"name": "额外活动"
}
}

View File

@ -18,6 +18,11 @@ def arknights_list_0():
return get_json("arknights_list_0.json") return get_json("arknights_list_0.json")
@pytest.fixture(scope="module")
def arknights_list__1():
return get_json("arknights_list_-1.json")
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def arknights_list_1(): def arknights_list_1():
return get_json("arknights_list_1.json") return get_json("arknights_list_1.json")
@ -36,6 +41,61 @@ def monster_siren_list_1():
@pytest.mark.asyncio @pytest.mark.asyncio
@respx.mock @respx.mock
async def test_fetch_new( async def test_fetch_new(
arknights,
dummy_user_subinfo,
arknights_list_0,
arknights_list__1,
monster_siren_list_0,
monster_siren_list_1,
):
ak_list_router = respx.get(
"https://ak-conf.hypergryph.com/config/prod/announce_meta/IOS/announcement.meta.json"
)
detail_router = respx.get(
"https://ak.hycdn.cn/announce/IOS/announcement/807_1640060583.html"
)
version_router = respx.get(
"https://ak-conf.hypergryph.com/config/prod/official/IOS/version"
)
preannouncement_router = respx.get(
"https://ak-conf.hypergryph.com/config/prod/announce_meta/IOS/preannouncement.meta.json"
)
monster_siren_router = respx.get("https://monster-siren.hypergryph.com/api/news")
ak_list_router.mock(return_value=Response(200, json=arknights_list__1))
detail_router.mock(
return_value=Response(200, text=get_file("arknights-detail-807"))
)
version_router.mock(
return_value=Response(200, json=get_json("arknights-version-0.json"))
)
preannouncement_router.mock(
return_value=Response(200, json=get_json("arknights-pre-0.json"))
)
monster_siren_router.mock(return_value=Response(200, json=monster_siren_list_0))
target = ""
res = await arknights.fetch_new_post(target, [dummy_user_subinfo])
assert ak_list_router.called
assert len(res) == 0
assert not detail_router.called
mock_data = arknights_list_0
ak_list_router.mock(return_value=Response(200, json=mock_data))
res3 = await arknights.fetch_new_post(target, [dummy_user_subinfo])
assert len(res3[0][1]) == 1
assert detail_router.called
post = res3[0][1][0]
assert post.target_type == "arknights"
assert post.text == ""
assert post.url == ""
assert post.target_name == "明日方舟游戏内公告"
assert len(post.pics) == 1
# assert(post.pics == ['https://ak-fs.hypergryph.com/announce/images/20210623/e6f49aeb9547a2278678368a43b95b07.jpg'])
print(res3[0][1])
r = await post.generate_messages()
@pytest.mark.render
@respx.mock
async def test_send_with_render(
arknights, arknights,
dummy_user_subinfo, dummy_user_subinfo,
arknights_list_0, arknights_list_0,

View File

@ -0,0 +1,283 @@
import pytest
import respx
from httpx import Response
from nonebug.app import App
from .platforms.utils import get_json
from .utils import BotReply, fake_admin_user, fake_group_message_event
# 选择platform阶段中止
@pytest.mark.asyncio
@respx.mock
async def test_abort_add_on_platform(app: App):
from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config import Config
from nonebot_bison.config_manager import add_sub_matcher, common_platform
from nonebot_bison.platform import platform_manager
config = Config()
config.user_target.truncate()
ak_list_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=1005056279793937"
)
ak_list_router.mock(
return_value=Response(200, json=get_json("weibo_ak_profile.json"))
)
ak_list_bad_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=100505000"
)
ak_list_bad_router.mock(
return_value=Response(200, json=get_json("weibo_err_profile.json"))
)
async with app.test_matcher(add_sub_matcher) as ctx:
bot = ctx.create_bot()
event_1 = fake_group_message_event(
message=Message("添加订阅"),
sender=Sender(card="", nickname="test", role="admin"),
to_me=True,
)
ctx.receive_event(bot, event_1)
ctx.should_pass_rule()
ctx.should_call_send(
event_1,
Message(BotReply.add_reply_on_platform(platform_manager, common_platform)),
True,
)
event_abort = fake_group_message_event(
message=Message("取消"), sender=Sender(card="", nickname="test", role="admin")
)
ctx.receive_event(bot, event_abort)
ctx.should_call_send(
event_abort,
BotReply.add_reply_abort,
True,
)
ctx.should_finished()
# 输入id阶段中止
@pytest.mark.asyncio
@respx.mock
async def test_abort_add_on_id(app: App):
from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config import Config
from nonebot_bison.config_manager import add_sub_matcher, common_platform
from nonebot_bison.platform import platform_manager
config = Config()
config.user_target.truncate()
ak_list_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=1005056279793937"
)
ak_list_router.mock(
return_value=Response(200, json=get_json("weibo_ak_profile.json"))
)
ak_list_bad_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=100505000"
)
ak_list_bad_router.mock(
return_value=Response(200, json=get_json("weibo_err_profile.json"))
)
async with app.test_matcher(add_sub_matcher) as ctx:
bot = ctx.create_bot()
event_1 = fake_group_message_event(
message=Message("添加订阅"),
sender=Sender(card="", nickname="test", role="admin"),
to_me=True,
)
ctx.receive_event(bot, event_1)
ctx.should_pass_rule()
ctx.should_call_send(
event_1,
Message(BotReply.add_reply_on_platform(platform_manager, common_platform)),
True,
)
event_2 = fake_group_message_event(
message=Message("weibo"), sender=fake_admin_user
)
ctx.receive_event(bot, event_2)
ctx.should_call_send(
event_2,
Message(BotReply.add_reply_on_id),
True,
)
event_abort = fake_group_message_event(
message=Message("取消"), sender=Sender(card="", nickname="test", role="admin")
)
ctx.receive_event(bot, event_abort)
ctx.should_call_send(
event_abort,
BotReply.add_reply_abort,
True,
)
ctx.should_finished()
# 输入订阅类别阶段中止
@pytest.mark.asyncio
@respx.mock
async def test_abort_add_on_cats(app: App):
from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config import Config
from nonebot_bison.config_manager import add_sub_matcher, common_platform
from nonebot_bison.platform import platform_manager
config = Config()
config.user_target.truncate()
ak_list_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=1005056279793937"
)
ak_list_router.mock(
return_value=Response(200, json=get_json("weibo_ak_profile.json"))
)
ak_list_bad_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=100505000"
)
ak_list_bad_router.mock(
return_value=Response(200, json=get_json("weibo_err_profile.json"))
)
async with app.test_matcher(add_sub_matcher) as ctx:
bot = ctx.create_bot()
event_1 = fake_group_message_event(
message=Message("添加订阅"),
sender=Sender(card="", nickname="test", role="admin"),
to_me=True,
)
ctx.receive_event(bot, event_1)
ctx.should_pass_rule()
ctx.should_call_send(
event_1,
Message(
BotReply.add_reply_on_platform(
platform_manager=platform_manager, common_platform=common_platform
)
),
True,
)
event_2 = fake_group_message_event(
message=Message("weibo"), sender=fake_admin_user
)
ctx.receive_event(bot, event_2)
ctx.should_call_send(
event_2,
Message(BotReply.add_reply_on_id),
True,
)
event_3 = fake_group_message_event(
message=Message("6279793937"), sender=fake_admin_user
)
ctx.receive_event(bot, event_3)
ctx.should_call_send(
event_3,
BotReply.add_reply_on_target_confirm(
"weibo", "明日方舟Arknights", "6279793937"
),
True,
)
ctx.should_call_send(
event_3,
Message(BotReply.add_reply_on_cats(platform_manager, "weibo")),
True,
)
event_abort = fake_group_message_event(
message=Message("取消"), sender=Sender(card="", nickname="test", role="admin")
)
ctx.receive_event(bot, event_abort)
ctx.should_call_send(
event_abort,
BotReply.add_reply_abort,
True,
)
ctx.should_finished()
# 输入标签阶段中止
@pytest.mark.asyncio
@respx.mock
async def test_abort_add_on_tag(app: App):
from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config import Config
from nonebot_bison.config_manager import add_sub_matcher, common_platform
from nonebot_bison.platform import platform_manager
config = Config()
config.user_target.truncate()
ak_list_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=1005056279793937"
)
ak_list_router.mock(
return_value=Response(200, json=get_json("weibo_ak_profile.json"))
)
ak_list_bad_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=100505000"
)
ak_list_bad_router.mock(
return_value=Response(200, json=get_json("weibo_err_profile.json"))
)
async with app.test_matcher(add_sub_matcher) as ctx:
bot = ctx.create_bot()
event_1 = fake_group_message_event(
message=Message("添加订阅"),
sender=Sender(card="", nickname="test", role="admin"),
to_me=True,
)
ctx.receive_event(bot, event_1)
ctx.should_pass_rule()
ctx.should_call_send(
event_1,
Message(
BotReply.add_reply_on_platform(
platform_manager=platform_manager, common_platform=common_platform
)
),
True,
)
event_2 = fake_group_message_event(
message=Message("weibo"), sender=fake_admin_user
)
ctx.receive_event(bot, event_2)
ctx.should_call_send(
event_2,
Message(BotReply.add_reply_on_id),
True,
)
event_3 = fake_group_message_event(
message=Message("6279793937"), sender=fake_admin_user
)
ctx.receive_event(bot, event_3)
ctx.should_call_send(
event_3,
BotReply.add_reply_on_target_confirm(
"weibo", "明日方舟Arknights", "6279793937"
),
True,
)
ctx.should_call_send(
event_3,
Message(BotReply.add_reply_on_cats(platform_manager, "weibo")),
True,
)
event_4 = fake_group_message_event(
message=Message("图文 文字"), sender=fake_admin_user
)
ctx.receive_event(bot, event_4)
ctx.should_call_send(event_4, Message(BotReply.add_reply_on_tags), True)
event_abort = fake_group_message_event(
message=Message("取消"), sender=Sender(card="", nickname="test", role="admin")
)
ctx.receive_event(bot, event_abort)
ctx.should_call_send(
event_abort,
BotReply.add_reply_abort,
True,
)
ctx.should_finished()

View File

@ -4,7 +4,7 @@ from httpx import Response
from nonebug.app import App from nonebug.app import App
from .platforms.utils import get_json from .platforms.utils import get_json
from .utils import fake_admin_user, fake_group_message_event from .utils import BotReply, fake_admin_user, fake_group_message_event
@pytest.mark.asyncio @pytest.mark.asyncio
@ -49,18 +49,7 @@ async def test_configurable_at_me_false(app: App):
ctx.receive_event(bot, event) ctx.receive_event(bot, event)
ctx.should_call_send( ctx.should_call_send(
event, event,
Message( Message(BotReply.add_reply_on_platform(platform_manager, common_platform)),
"请输入想要订阅的平台,目前支持,请输入冒号左边的名称:\n"
+ "".join(
[
"{}{}\n".format(
platform_name, platform_manager[platform_name].name
)
for platform_name in common_platform
]
)
+ "要查看全部平台请输入:“全部”"
),
True, True,
) )
ctx.should_pass_rule() ctx.should_pass_rule()
@ -72,10 +61,11 @@ async def test_configurable_at_me_false(app: App):
async def test_add_with_target(app: App): async def test_add_with_target(app: App):
from nonebot.adapters.onebot.v11.event import Sender from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11.message import Message from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config import config from nonebot_bison.config import Config
from nonebot_bison.config_manager import add_sub_matcher, common_platform from nonebot_bison.config_manager import add_sub_matcher, common_platform
from nonebot_bison.platform import platform_manager from nonebot_bison.platform import platform_manager
config = Config()
config.user_target.truncate() config.user_target.truncate()
ak_list_router = respx.get( ak_list_router = respx.get(
@ -103,16 +93,9 @@ async def test_add_with_target(app: App):
ctx.should_call_send( ctx.should_call_send(
event_1, event_1,
Message( Message(
"请输入想要订阅的平台,目前支持,请输入冒号左边的名称:\n" BotReply.add_reply_on_platform(
+ "".join( platform_manager=platform_manager, common_platform=common_platform
[
"{}{}\n".format(
platform_name, platform_manager[platform_name].name
)
for platform_name in common_platform
]
) )
+ "要查看全部平台请输入:“全部”"
), ),
True, True,
) )
@ -123,15 +106,7 @@ async def test_add_with_target(app: App):
ctx.should_rejected() ctx.should_rejected()
ctx.should_call_send( ctx.should_call_send(
event_2, event_2,
( BotReply.add_reply_on_platform_input_allplatform(platform_manager),
"全部平台\n"
+ "\n".join(
[
"{}{}".format(platform_name, platform.name)
for platform_name, platform in platform_manager.items()
]
)
),
True, True,
) )
event_3 = fake_group_message_event( event_3 = fake_group_message_event(
@ -140,16 +115,14 @@ async def test_add_with_target(app: App):
ctx.receive_event(bot, event_3) ctx.receive_event(bot, event_3)
ctx.should_call_send( ctx.should_call_send(
event_3, event_3,
Message( Message(BotReply.add_reply_on_id),
"请输入订阅用户的id详情查阅https://nonebot-bison.vercel.app/usage/#%E6%89%80%E6%94%AF%E6%8C%81%E5%B9%B3%E5%8F%B0%E7%9A%84uid"
),
True, True,
) )
event_4_err = fake_group_message_event( event_4_err = fake_group_message_event(
message=Message("000"), sender=fake_admin_user message=Message("000"), sender=fake_admin_user
) )
ctx.receive_event(bot, event_4_err) ctx.receive_event(bot, event_4_err)
ctx.should_call_send(event_4_err, "id输入错误", True) ctx.should_call_send(event_4_err, BotReply.add_reply_on_id_input_error, True)
ctx.should_rejected() ctx.should_rejected()
event_4_ok = fake_group_message_event( event_4_ok = fake_group_message_event(
message=Message("6279793937"), sender=fake_admin_user message=Message("6279793937"), sender=fake_admin_user
@ -157,29 +130,36 @@ async def test_add_with_target(app: App):
ctx.receive_event(bot, event_4_ok) ctx.receive_event(bot, event_4_ok)
ctx.should_call_send( ctx.should_call_send(
event_4_ok, event_4_ok,
Message( BotReply.add_reply_on_target_confirm(
"请输入要订阅的类别,以空格分隔,支持的类别有:{}".format( "weibo", "明日方舟Arknights", "6279793937"
" ".join(list(platform_manager["weibo"].categories.values()))
)
), ),
True, True,
) )
ctx.should_call_send(
event_4_ok,
Message(BotReply.add_reply_on_cats(platform_manager, "weibo")),
True,
)
event_5_err = fake_group_message_event( event_5_err = fake_group_message_event(
message=Message("图文 文字 err"), sender=fake_admin_user message=Message("图文 文字 err"), sender=fake_admin_user
) )
ctx.receive_event(bot, event_5_err) ctx.receive_event(bot, event_5_err)
ctx.should_call_send(event_5_err, "不支持 err", True) ctx.should_call_send(
event_5_err, BotReply.add_reply_on_cats_input_error("err"), True
)
ctx.should_rejected() ctx.should_rejected()
event_5_ok = fake_group_message_event( event_5_ok = fake_group_message_event(
message=Message("图文 文字"), sender=fake_admin_user message=Message("图文 文字"), sender=fake_admin_user
) )
ctx.receive_event(bot, event_5_ok) ctx.receive_event(bot, event_5_ok)
ctx.should_call_send(event_5_ok, Message('请输入要订阅的tag订阅所有tag输入"全部标签"'), True) ctx.should_call_send(event_5_ok, Message(BotReply.add_reply_on_tags), True)
event_6 = fake_group_message_event( event_6 = fake_group_message_event(
message=Message("全部标签"), sender=fake_admin_user message=Message("全部标签"), sender=fake_admin_user
) )
ctx.receive_event(bot, event_6) ctx.receive_event(bot, event_6)
ctx.should_call_send(event_6, ("添加 明日方舟Arknights 成功"), True) ctx.should_call_send(
event_6, BotReply.add_reply_subscribe_success("明日方舟Arknights"), True
)
ctx.should_finished() ctx.should_finished()
subs = config.list_subscribe(10000, "group") subs = config.list_subscribe(10000, "group")
assert len(subs) == 1 assert len(subs) == 1
@ -198,10 +178,11 @@ async def test_add_with_target(app: App):
async def test_add_with_target_no_cat(app: App): async def test_add_with_target_no_cat(app: App):
from nonebot.adapters.onebot.v11.event import Sender from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11.message import Message from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config import config from nonebot_bison.config import Config
from nonebot_bison.config_manager import add_sub_matcher, common_platform from nonebot_bison.config_manager import add_sub_matcher, common_platform
from nonebot_bison.platform import platform_manager from nonebot_bison.platform import platform_manager
config = Config()
config.user_target.truncate() config.user_target.truncate()
ncm_router = respx.get("https://music.163.com/api/artist/albums/32540734") ncm_router = respx.get("https://music.163.com/api/artist/albums/32540734")
@ -218,18 +199,7 @@ async def test_add_with_target_no_cat(app: App):
ctx.should_pass_rule() ctx.should_pass_rule()
ctx.should_call_send( ctx.should_call_send(
event_1, event_1,
Message( Message(BotReply.add_reply_on_platform(platform_manager, common_platform)),
"请输入想要订阅的平台,目前支持,请输入冒号左边的名称:\n"
+ "".join(
[
"{}{}\n".format(
platform_name, platform_manager[platform_name].name
)
for platform_name in common_platform
]
)
+ "要查看全部平台请输入:“全部”"
),
True, True,
) )
event_3 = fake_group_message_event( event_3 = fake_group_message_event(
@ -238,16 +208,21 @@ async def test_add_with_target_no_cat(app: App):
ctx.receive_event(bot, event_3) ctx.receive_event(bot, event_3)
ctx.should_call_send( ctx.should_call_send(
event_3, event_3,
Message( Message(BotReply.add_reply_on_id),
"请输入订阅用户的id详情查阅https://nonebot-bison.vercel.app/usage/#%E6%89%80%E6%94%AF%E6%8C%81%E5%B9%B3%E5%8F%B0%E7%9A%84uid"
),
True, True,
) )
event_4_ok = fake_group_message_event( event_4_ok = fake_group_message_event(
message=Message("32540734"), sender=fake_admin_user message=Message("32540734"), sender=fake_admin_user
) )
ctx.receive_event(bot, event_4_ok) ctx.receive_event(bot, event_4_ok)
ctx.should_call_send(event_4_ok, ("添加 塞壬唱片-MSR 成功"), True) ctx.should_call_send(
event_4_ok,
BotReply.add_reply_on_target_confirm("ncm-artist", "塞壬唱片-MSR", "32540734"),
True,
)
ctx.should_call_send(
event_4_ok, BotReply.add_reply_subscribe_success("塞壬唱片-MSR"), True
)
ctx.should_finished() ctx.should_finished()
subs = config.list_subscribe(10000, "group") subs = config.list_subscribe(10000, "group")
assert len(subs) == 1 assert len(subs) == 1
@ -264,10 +239,11 @@ async def test_add_with_target_no_cat(app: App):
async def test_add_no_target(app: App): async def test_add_no_target(app: App):
from nonebot.adapters.onebot.v11.event import Sender from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11.message import Message from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config import config from nonebot_bison.config import Config
from nonebot_bison.config_manager import add_sub_matcher, common_platform from nonebot_bison.config_manager import add_sub_matcher, common_platform
from nonebot_bison.platform import platform_manager from nonebot_bison.platform import platform_manager
config = Config()
config.user_target.truncate() config.user_target.truncate()
async with app.test_matcher(add_sub_matcher) as ctx: async with app.test_matcher(add_sub_matcher) as ctx:
@ -281,18 +257,7 @@ async def test_add_no_target(app: App):
ctx.should_pass_rule() ctx.should_pass_rule()
ctx.should_call_send( ctx.should_call_send(
event_1, event_1,
Message( Message(BotReply.add_reply_on_platform(platform_manager, common_platform)),
"请输入想要订阅的平台,目前支持,请输入冒号左边的名称:\n"
+ "".join(
[
"{}{}\n".format(
platform_name, platform_manager[platform_name].name
)
for platform_name in common_platform
]
)
+ "要查看全部平台请输入:“全部”"
),
True, True,
) )
event_3 = fake_group_message_event( event_3 = fake_group_message_event(
@ -301,18 +266,16 @@ async def test_add_no_target(app: App):
ctx.receive_event(bot, event_3) ctx.receive_event(bot, event_3)
ctx.should_call_send( ctx.should_call_send(
event_3, event_3,
Message( Message(BotReply.add_reply_on_cats(platform_manager, "arknights")),
"请输入要订阅的类别,以空格分隔,支持的类别有:{}".format(
" ".join(list(platform_manager["arknights"].categories.values()))
)
),
True, True,
) )
event_4 = fake_group_message_event( event_4 = fake_group_message_event(
message=Message("游戏公告"), sender=fake_admin_user message=Message("游戏公告"), sender=fake_admin_user
) )
ctx.receive_event(bot, event_4) ctx.receive_event(bot, event_4)
ctx.should_call_send(event_4, ("添加 明日方舟游戏信息 成功"), True) ctx.should_call_send(
event_4, BotReply.add_reply_subscribe_success("明日方舟游戏信息"), True
)
ctx.should_finished() ctx.should_finished()
subs = config.list_subscribe(10000, "group") subs = config.list_subscribe(10000, "group")
assert len(subs) == 1 assert len(subs) == 1
@ -328,10 +291,11 @@ async def test_add_no_target(app: App):
async def test_platform_name_err(app: App): async def test_platform_name_err(app: App):
from nonebot.adapters.onebot.v11.event import Sender from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11.message import Message from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config import config from nonebot_bison.config import Config
from nonebot_bison.config_manager import add_sub_matcher, common_platform from nonebot_bison.config_manager import add_sub_matcher, common_platform
from nonebot_bison.platform import platform_manager from nonebot_bison.platform import platform_manager
config = Config()
config.user_target.truncate() config.user_target.truncate()
async with app.test_matcher(add_sub_matcher) as ctx: async with app.test_matcher(add_sub_matcher) as ctx:
bot = ctx.create_bot() bot = ctx.create_bot()
@ -344,18 +308,7 @@ async def test_platform_name_err(app: App):
ctx.should_pass_rule() ctx.should_pass_rule()
ctx.should_call_send( ctx.should_call_send(
event_1, event_1,
Message( Message(BotReply.add_reply_on_platform(platform_manager, common_platform)),
"请输入想要订阅的平台,目前支持,请输入冒号左边的名称:\n"
+ "".join(
[
"{}{}\n".format(
platform_name, platform_manager[platform_name].name
)
for platform_name in common_platform
]
)
+ "要查看全部平台请输入:“全部”"
),
True, True,
) )
event_2 = fake_group_message_event( event_2 = fake_group_message_event(
@ -366,98 +319,89 @@ async def test_platform_name_err(app: App):
ctx.should_rejected() ctx.should_rejected()
ctx.should_call_send( ctx.should_call_send(
event_2, event_2,
"平台输入错误", BotReply.add_reply_on_platform_input_error,
True, True,
) )
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_query_sub(app: App): @respx.mock
from nonebot.adapters.onebot.v11.message import Message async def test_add_with_get_id(app: App):
from nonebot_bison.config import config from nonebot.adapters.onebot.v11.event import Sender
from nonebot_bison.config_manager import query_sub_matcher from nonebot.adapters.onebot.v11.message import Message, MessageSegment
from nonebot_bison.config import Config
from nonebot_bison.config_manager import add_sub_matcher, common_platform
from nonebot_bison.platform import platform_manager from nonebot_bison.platform import platform_manager
config = Config()
config.user_target.truncate() config.user_target.truncate()
config.add_subscribe(
10000, ak_list_router = respx.get(
"group", "https://m.weibo.cn/api/container/getIndex?containerid=1005056279793937"
"6279793937",
"明日方舟Arknights",
"weibo",
[platform_manager["weibo"].reverse_category["图文"]],
["明日方舟"],
) )
async with app.test_matcher(query_sub_matcher) as ctx: ak_list_router.mock(
return_value=Response(200, json=get_json("weibo_ak_profile.json"))
)
ak_list_bad_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=100505000"
)
ak_list_bad_router.mock(
return_value=Response(200, json=get_json("weibo_err_profile.json"))
)
async with app.test_matcher(add_sub_matcher) as ctx:
bot = ctx.create_bot() bot = ctx.create_bot()
event = fake_group_message_event(message=Message("查询订阅"), to_me=True) event_1 = fake_group_message_event(
ctx.receive_event(bot, event) message=Message("添加订阅"),
ctx.should_pass_rule() sender=Sender(card="", nickname="test", role="admin"),
ctx.should_pass_permission() to_me=True,
ctx.should_call_send(
event, Message("订阅的帐号为:\nweibo 明日方舟Arknights 6279793937 [图文] 明日方舟\n"), True
) )
ctx.receive_event(bot, event_1)
@pytest.mark.asyncio
async def test_del_sub(app: App):
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config import config
from nonebot_bison.config_manager import del_sub_matcher
from nonebot_bison.platform import platform_manager
config.user_target.truncate()
config.add_subscribe(
10000,
"group",
"6279793937",
"明日方舟Arknights",
"weibo",
[platform_manager["weibo"].reverse_category["图文"]],
["明日方舟"],
)
async with app.test_matcher(del_sub_matcher) as ctx:
bot = ctx.create_bot(base=Bot)
assert isinstance(bot, Bot)
event = fake_group_message_event(
message=Message("删除订阅"), to_me=True, sender=fake_admin_user
)
ctx.receive_event(bot, event)
ctx.should_pass_rule() ctx.should_pass_rule()
ctx.should_pass_permission()
ctx.should_call_send( ctx.should_call_send(
event, event_1,
Message( Message(
"订阅的帐号为:\n1 weibo 明日方舟Arknights 6279793937\n [图文] 明日方舟\n请输入要删除的订阅的序号" BotReply.add_reply_on_platform(
platform_manager=platform_manager, common_platform=common_platform
)
), ),
True, True,
) )
event_1_err = fake_group_message_event( event_3 = fake_group_message_event(
message=Message("2"), sender=fake_admin_user message=Message("weibo"), sender=fake_admin_user
) )
ctx.receive_event(bot, event_1_err) ctx.receive_event(bot, event_3)
ctx.should_call_send(event_1_err, "删除错误", True) ctx.should_call_send(
event_3,
Message(BotReply.add_reply_on_id),
True,
)
event_4_query = fake_group_message_event(
message=Message("查询"), sender=fake_admin_user
)
ctx.receive_event(bot, event_4_query)
ctx.should_rejected() ctx.should_rejected()
event_1_ok = fake_group_message_event( ctx.should_call_send(
message=Message("1"), sender=fake_admin_user event_4_query,
Message([MessageSegment(*BotReply.add_reply_on_id_input_search())]),
True,
)
"""
line 362:
鬼知道为什么要在这里这样写
没有[]的话assert不了(should_call_send使用[MessageSegment(...)]的格式进行比较)
不在这里MessageSegment()的话也assert不了(指不能让add_reply_on_id_input_search直接返回一个MessageSegment对象)
amen
"""
event_abort = fake_group_message_event(
message=Message("取消"), sender=Sender(card="", nickname="test", role="admin")
)
ctx.receive_event(bot, event_abort)
ctx.should_call_send(
event_abort,
BotReply.add_reply_abort,
True,
) )
ctx.receive_event(bot, event_1_ok)
ctx.should_call_send(event_1_ok, "删除成功", True)
ctx.should_finished() ctx.should_finished()
subs = config.list_subscribe(10000, "group") subs = config.list_subscribe(10000, "group")
assert len(subs) == 0 assert len(subs) == 0
async def test_test(app: App):
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config_manager import test_matcher
async with app.test_matcher(test_matcher) as ctx:
bot = ctx.create_bot(base=Bot)
event = fake_group_message_event(message=Message("testtt"))
ctx.receive_event(bot, event)
ctx.should_pass_permission()
ctx.should_pass_rule()
ctx.should_call_send(event, "666", True)

View File

@ -1,9 +1,11 @@
import pytest
from nonebug import App from nonebug import App
from .utils import fake_admin_user, fake_private_message_event, fake_superuser from .utils import fake_group_message_event, fake_private_message_event, fake_superuser
async def test_query(app: App): @pytest.mark.asyncio
async def test_query_with_superuser_private(app: App):
from nonebot.adapters.onebot.v11.bot import Bot from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config_manager import group_manage_matcher from nonebot_bison.config_manager import group_manage_matcher
@ -11,7 +13,10 @@ async def test_query(app: App):
async with app.test_matcher(group_manage_matcher) as ctx: async with app.test_matcher(group_manage_matcher) as ctx:
bot = ctx.create_bot(base=Bot) bot = ctx.create_bot(base=Bot)
event = fake_private_message_event( event = fake_private_message_event(
message=Message("群管理"), sender=fake_superuser message=Message("群管理"),
sender=fake_superuser,
to_me=True,
user_id=fake_superuser.user_id,
) )
ctx.receive_event(bot, event) ctx.receive_event(bot, event)
ctx.should_pass_rule() ctx.should_pass_rule()
@ -23,23 +28,57 @@ async def test_query(app: App):
event, Message("请选择需要管理的群:\n1. 101 - test group\n请输入左侧序号"), True event, Message("请选择需要管理的群:\n1. 101 - test group\n请输入左侧序号"), True
) )
event_1_err = fake_private_message_event( event_1_err = fake_private_message_event(
message=Message("0"), sender=fake_superuser message=Message("0"),
sender=fake_superuser,
to_me=True,
user_id=fake_superuser.user_id,
) )
ctx.receive_event(bot, event_1_err) ctx.receive_event(bot, event_1_err)
ctx.should_rejected()
ctx.should_call_send(event_1_err, "请输入正确序号", True) ctx.should_call_send(event_1_err, "请输入正确序号", True)
ctx.should_rejected()
event_1_ok = fake_private_message_event( event_1_ok = fake_private_message_event(
message=Message("1"), sender=fake_superuser message=Message("1"),
sender=fake_superuser,
to_me=True,
user_id=fake_superuser.user_id,
) )
ctx.receive_event(bot, event_1_ok) ctx.receive_event(bot, event_1_ok)
ctx.should_call_send(event_1_ok, "请输入需要使用的命令:添加订阅,查询订阅,删除订阅", True) ctx.should_call_send(event_1_ok, "请输入需要使用的命令:添加订阅,查询订阅,删除订阅", True)
event_2_err = fake_private_message_event( event_2_err = fake_private_message_event(
message=Message("222"), sender=fake_superuser message=Message("222"),
sender=fake_superuser,
to_me=True,
user_id=fake_superuser.user_id,
) )
ctx.receive_event(bot, event_2_err) ctx.receive_event(bot, event_2_err)
ctx.should_rejected()
ctx.should_call_send(event_2_err, "请输入正确的命令", True) ctx.should_call_send(event_2_err, "请输入正确的命令", True)
ctx.should_rejected()
event_2_ok = fake_private_message_event( event_2_ok = fake_private_message_event(
message=Message("查询订阅"), sender=fake_superuser message=Message("查询订阅"),
sender=fake_superuser,
to_me=True,
user_id=fake_superuser.user_id,
) )
ctx.receive_event(bot, event_2_ok) ctx.receive_event(bot, event_2_ok)
ctx.should_pass_rule()
ctx.should_pass_permission()
@pytest.mark.asyncio
async def test_query_with_superuser_group_tome(app: App):
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config_manager import group_manage_matcher
async with app.test_matcher(group_manage_matcher) as ctx:
bot = ctx.create_bot(base=Bot)
event = fake_group_message_event(
message=Message("群管理"),
sender=fake_superuser,
to_me=True,
user_id=fake_superuser.user_id,
)
ctx.receive_event(bot, event)
ctx.should_pass_rule()
ctx.should_pass_permission()
ctx.should_call_send(event, Message("该功能只支持私聊使用请私聊Bot"), True)

View File

@ -0,0 +1,87 @@
import pytest
import respx
from httpx import Response
from nonebug.app import App
from .platforms.utils import get_json
from .utils import fake_admin_user, fake_group_message_event
@pytest.mark.asyncio
async def test_query_sub(app: App):
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config import Config
from nonebot_bison.config_manager import query_sub_matcher
from nonebot_bison.platform import platform_manager
config = Config()
config.user_target.truncate()
config.add_subscribe(
10000,
"group",
"6279793937",
"明日方舟Arknights",
"weibo",
[platform_manager["weibo"].reverse_category["图文"]],
["明日方舟"],
)
async with app.test_matcher(query_sub_matcher) as ctx:
bot = ctx.create_bot()
event = fake_group_message_event(message=Message("查询订阅"), to_me=True)
ctx.receive_event(bot, event)
ctx.should_pass_rule()
ctx.should_pass_permission()
ctx.should_call_send(
event, Message("订阅的帐号为:\nweibo 明日方舟Arknights 6279793937 [图文] 明日方舟\n"), True
)
@pytest.mark.asyncio
async def test_del_sub(app: App):
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config import Config
from nonebot_bison.config_manager import del_sub_matcher
from nonebot_bison.platform import platform_manager
config = Config()
config.user_target.truncate()
config.add_subscribe(
10000,
"group",
"6279793937",
"明日方舟Arknights",
"weibo",
[platform_manager["weibo"].reverse_category["图文"]],
["明日方舟"],
)
async with app.test_matcher(del_sub_matcher) as ctx:
bot = ctx.create_bot(base=Bot)
assert isinstance(bot, Bot)
event = fake_group_message_event(
message=Message("删除订阅"), to_me=True, sender=fake_admin_user
)
ctx.receive_event(bot, event)
ctx.should_pass_rule()
ctx.should_pass_permission()
ctx.should_call_send(
event,
Message(
"订阅的帐号为:\n1 weibo 明日方舟Arknights 6279793937\n [图文] 明日方舟\n请输入要删除的订阅的序号"
),
True,
)
event_1_err = fake_group_message_event(
message=Message("2"), sender=fake_admin_user
)
ctx.receive_event(bot, event_1_err)
ctx.should_call_send(event_1_err, "删除错误", True)
ctx.should_rejected()
event_1_ok = fake_group_message_event(
message=Message("1"), sender=fake_admin_user
)
ctx.receive_event(bot, event_1_ok)
ctx.should_call_send(event_1_ok, "删除成功", True)
ctx.should_finished()
subs = config.list_subscribe(10000, "group")
assert len(subs) == 0

View File

@ -1,13 +1,12 @@
import asyncio
import pytest import pytest
from nonebot.adapters.onebot.v11.bot import Bot from nonebot.adapters.onebot.v11.message import Message
from nonebug import App from nonebug import App
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_send_no_queue(app: App): async def test_send_no_queue(app: App):
import nonebot from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.plugin_config import plugin_config from nonebot_bison.plugin_config import plugin_config
from nonebot_bison.send import send_msgs from nonebot_bison.send import send_msgs
@ -16,25 +15,27 @@ async def test_send_no_queue(app: App):
bot = ctx.create_bot(base=Bot) bot = ctx.create_bot(base=Bot)
assert isinstance(bot, Bot) assert isinstance(bot, Bot)
ctx.should_call_api( ctx.should_call_api(
"send_group_msg", {"group_id": "1233", "message": "msg1"}, True "send_group_msg", {"group_id": "1233", "message": Message("msg1")}, True
) )
ctx.should_call_api( ctx.should_call_api(
"send_group_msg", {"group_id": "1233", "message": "msg2"}, True "send_group_msg", {"group_id": "1233", "message": Message("msg2")}, True
) )
ctx.should_call_api( ctx.should_call_api(
"send_private_msg", {"user_id": "666", "message": "priv"}, True "send_private_msg", {"user_id": "666", "message": Message("priv")}, True
) )
await send_msgs(bot, "1233", "group", ["msg1", "msg2"]) await send_msgs(bot, "1233", "group", [Message("msg1"), Message("msg2")])
await send_msgs(bot, "666", "private", ["priv"]) await send_msgs(bot, "666", "private", [Message("priv")])
assert ctx.wait_list.empty() assert ctx.wait_list.empty()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_send_queue(app: App): async def test_send_queue(app: App):
import nonebot import nonebot
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison import send from nonebot_bison import send
from nonebot_bison.plugin_config import plugin_config from nonebot_bison.plugin_config import plugin_config
from nonebot_bison.send import LAST_SEND_TIME, do_send_msgs, send_msgs from nonebot_bison.send import do_send_msgs, send_msgs
async with app.test_api() as ctx: async with app.test_api() as ctx:
new_bot = ctx.create_bot(base=Bot) new_bot = ctx.create_bot(base=Bot)
@ -47,7 +48,7 @@ async def test_send_queue(app: App):
"send_group_msg", {"group_id": "1233", "message": "test msg"}, True "send_group_msg", {"group_id": "1233", "message": "test msg"}, True
) )
await bot.call_api("send_group_msg", group_id="1233", message="test msg") await bot.call_api("send_group_msg", group_id="1233", message="test msg")
await send_msgs(bot, "1233", "group", ["msg"]) await send_msgs(bot, "1233", "group", [Message("msg")])
ctx.should_call_api( ctx.should_call_api(
"send_group_msg", {"group_id": "1233", "message": "msg"}, True "send_group_msg", {"group_id": "1233", "message": "msg"}, True
) )
@ -56,3 +57,152 @@ async def test_send_queue(app: App):
app.monkeypatch.setattr(send, "LAST_SEND_TIME", 0, True) app.monkeypatch.setattr(send, "LAST_SEND_TIME", 0, True)
await do_send_msgs() await do_send_msgs()
assert ctx.wait_list.empty() assert ctx.wait_list.empty()
def gen_node(id, name, content: Message):
return {"type": "node", "data": {"name": name, "uin": id, "content": content}}
def _merge_messge(nodes):
return nodes
@pytest.mark.asyncio
async def test_send_merge_no_queue(app: App):
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message, MessageSegment
from nonebot_bison.plugin_config import plugin_config
from nonebot_bison.send import send_msgs
plugin_config.bison_use_pic_merge = 1
plugin_config.bison_use_queue = False
async with app.test_api() as ctx:
bot = ctx.create_bot(base=Bot, self_id="8888")
assert isinstance(bot, Bot)
message = [
Message(MessageSegment.text("test msg")),
Message(MessageSegment.image("https://picsum.photos/200/300")),
]
ctx.should_call_api(
"send_group_msg",
{"group_id": 633, "message": Message(MessageSegment.text("test msg"))},
None,
)
ctx.should_call_api(
"send_group_msg",
{"group_id": 633, "message": message[1]},
None,
)
await send_msgs(bot, 633, "group", message)
message = [
Message(MessageSegment.text("test msg")),
Message(MessageSegment.image("https://picsum.photos/200/300")),
Message(MessageSegment.image("https://picsum.photos/200/300")),
]
ctx.should_call_api(
"send_group_msg",
{"group_id": 633, "message": Message(MessageSegment.text("test msg"))},
None,
)
ctx.should_call_api(
"get_group_member_info",
{"group_id": 633, "user_id": 8888, "no_cache": True},
{"user_id": 8888, "card": "admin", "nickname": "adminuser"},
)
merged_message = _merge_messge(
[gen_node(8888, "admin", message[1]), gen_node(8888, "admin", message[2])]
)
ctx.should_call_api(
"send_group_forward_msg",
{"group_id": 633, "messages": merged_message},
None,
)
await send_msgs(bot, 633, "group", message)
message = [
Message(MessageSegment.text("test msg")),
Message(MessageSegment.image("https://picsum.photos/200/300")),
Message(MessageSegment.image("https://picsum.photos/200/300")),
Message(MessageSegment.image("https://picsum.photos/200/300")),
]
ctx.should_call_api(
"send_group_msg",
{"group_id": 633, "message": Message(MessageSegment.text("test msg"))},
None,
)
ctx.should_call_api(
"get_group_member_info",
{"group_id": 633, "user_id": 8888, "no_cache": True},
{"user_id": 8888, "card": None, "nickname": "adminuser"},
)
merged_message = _merge_messge(
[
gen_node(8888, "adminuser", message[1]),
gen_node(8888, "adminuser", message[2]),
gen_node(8888, "adminuser", message[3]),
]
)
ctx.should_call_api(
"send_group_forward_msg",
{"group_id": 633, "messages": merged_message},
None,
)
await send_msgs(bot, 633, "group", message)
# private user should not send in forward
message = [
Message(MessageSegment.text("test msg")),
Message(MessageSegment.image("https://picsum.photos/200/300")),
Message(MessageSegment.image("https://picsum.photos/200/300")),
]
ctx.should_call_api(
"send_private_msg",
{"user_id": 633, "message": Message(MessageSegment.text("test msg"))},
None,
)
ctx.should_call_api(
"send_private_msg", {"user_id": 633, "message": message[1]}, None
)
ctx.should_call_api(
"send_private_msg", {"user_id": 633, "message": message[2]}, None
)
await send_msgs(bot, 633, "private", message)
async def test_send_merge2_no_queue(app: App):
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message, MessageSegment
from nonebot_bison.plugin_config import plugin_config
from nonebot_bison.send import send_msgs
plugin_config.bison_use_pic_merge = 2
plugin_config.bison_use_queue = False
async with app.test_api() as ctx:
bot = ctx.create_bot(base=Bot, self_id="8888")
assert isinstance(bot, Bot)
message = [
Message(MessageSegment.text("test msg")),
Message(MessageSegment.image("https://picsum.photos/200/300")),
Message(MessageSegment.image("https://picsum.photos/200/300")),
]
ctx.should_call_api(
"get_group_member_info",
{"group_id": 633, "user_id": 8888, "no_cache": True},
{"user_id": 8888, "card": "admin", "nickname": "adminuser"},
)
merged_message = _merge_messge(
[
gen_node(8888, "admin", message[0]),
gen_node(8888, "admin", message[1]),
gen_node(8888, "admin", message[2]),
]
)
ctx.should_call_api(
"send_group_forward_msg",
{"group_id": 633, "messages": merged_message},
None,
)
await send_msgs(bot, 633, "group", message)

View File

@ -69,3 +69,69 @@ from nonebot.adapters.onebot.v11.event import Sender
fake_admin_user = Sender(nickname="test", role="admin") fake_admin_user = Sender(nickname="test", role="admin")
fake_superuser = Sender(user_id=10001, nickname="superuser") fake_superuser = Sender(user_id=10001, nickname="superuser")
class BotReply:
@staticmethod
def add_reply_on_platform(platform_manager, common_platform):
return (
"请输入想要订阅的平台,目前支持,请输入冒号左边的名称:\n"
+ "".join(
[
"{}{}\n".format(
platform_name, platform_manager[platform_name].name
)
for platform_name in common_platform
]
)
+ "要查看全部平台请输入:“全部”\n中止订阅过程请输入:“取消”"
)
@staticmethod
def add_reply_on_platform_input_allplatform(platform_manager):
return "全部平台\n" + "\n".join(
[
"{}{}".format(platform_name, platform.name)
for platform_name, platform in platform_manager.items()
]
)
@staticmethod
def add_reply_on_id_input_search():
search_url = "https://nonebot-bison.vercel.app/usage/#%E6%89%80%E6%94%AF%E6%8C%81%E5%B9%B3%E5%8F%B0%E7%9A%84-uid"
search_title = "Bison所支持的平台UID"
search_content = "查询相关平台的uid格式或获取方式"
search_image = "https://s3.bmp.ovh/imgs/2022/03/ab3cc45d83bd3dd3.jpg"
type = "share"
data = {
"url": search_url,
"title": search_title,
"content": search_content,
"image": search_image,
}
msg = [type, data]
return msg
@staticmethod
def add_reply_on_target_confirm(platform, name, id):
return f"即将订阅的用户为:{platform} {name} {id}\n如有错误请输入“取消”重新订阅"
@staticmethod
def add_reply_on_cats(platform_manager, platform: str):
return "请输入要订阅的类别,以空格分隔,支持的类别有:{}".format(
" ".join(list(platform_manager[platform].categories.values()))
)
@staticmethod
def add_reply_on_cats_input_error(cat: str):
return "不支持 {}".format(cat)
@staticmethod
def add_reply_subscribe_success(name):
return "添加 {} 成功".format(name)
add_reply_on_id_input_error = "id输入错误"
add_reply_on_platform_input_error = "平台输入错误"
add_reply_on_id = "请输入订阅用户的id:\n查询id获取方法请回复:“查询”"
add_reply_on_tags = '请输入要订阅的tag订阅所有tag输入"全部标签"'
add_reply_abort = "已中止订阅"