Merge pull request #35 from felinae98/AzideCupric-recover

Azide cupric recover
This commit is contained in:
felinae98 2022-03-17 11:52:02 +08:00 committed by GitHub
commit 491d39a961
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 814 additions and 172 deletions

1
.gitignore vendored
View File

@ -275,6 +275,7 @@ dist
# vuepress build output
.vuepress/dist
docs/.vuepress/.temp/
# Serverless directories
.serverless/

View File

@ -66,5 +66,8 @@
## [0.5.1]
- 使用了新的私聊进行群管理的方式
- 使用了新的私聊进行群管理的方式:从`管理-*`替换为`群管理`命令
- 默认关闭自动重发功能
- 添加了 [推送消息合并转发功能](https://nonebot-bison.vercel.app/usage/#%E9%85%8D%E7%BD%AE)
- 添加了`添加订阅`命令事件的中途取消功能
- 优化了`添加订阅`命令的聊天处理逻辑

View File

@ -64,6 +64,8 @@ yarn && yarn build
本项目使用了 Python 3.9 的语法,请将 Python 版本升级到 3.9 及以上,推荐使用 docker 部署
2. bot 不理我
请确认自己是群主或者管理员,并且检查`COMMAND_START`环境变量是否设为`[""]`
或者按照`COMMAND_START`中的设置添加命令前缀,例:
`COMMAND_START=["/"]`则应发送`/添加订阅`
3. 微博漏订阅了
微博更新了新的风控措施,某些含有某些关键词的微博会获取不到。

View File

@ -2,7 +2,7 @@ module.exports = {
title: 'Nonebot Bison',
description: 'Docs for Nonebot Bison',
themeConfig: {
nav: [
navbar: [
{ text: '主页', link: '/' },
{ text: '部署与使用', link: '/usage/' },
{ text: '开发', link: '/dev/' },

Binary file not shown.

After

Width:  |  Height:  |  Size: 36 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 12 KiB

View File

@ -2,8 +2,9 @@
home: true
heroText: Nonebot Bison
tagline: 本bot励志做全泰拉骑车最快的信使
actionText: 快速部署
actionLink: /usage/
actions:
- text: 快速部署
link: /usage/
features:
- title: 拓展性强
details: 没有自己想要的网站?只要简单的爬虫知识就可以给它适配一个新的网站

View File

@ -115,13 +115,32 @@ sidebar: auto
开启,默认关
- `BISON_USE_QUEUE`: 是否用队列的方式发送消息,降低发送频率,默认开
- `BISON_RESEND_TIMES`: 最大重发次数,默认 0
- `BISON_USE_PIC_MERGE`: 是否启用多图片时合并转发(仅限群)
- `0`: 不启用(默认)
- `1`: 首条消息单独发送,剩余图片合并转发
- `2`: 所有消息全部合并转发
::: details 配置项示例
- 当`BISON_USE_PIC_MERGE=1`时:
![simple1](/images/forward-msg-simple1.png)
- 当`BISON_USE_PIC_MERGE=2`时:
![simple1](/images/forward-msg-simple2.png)
:::
::: warning
启用此功能时,可能会因为待推送图片过大/过多而导致文字消息与合并转发图片消息推送间隔过大(选择模式`1`时),请谨慎考虑开启。或者选择模式`2`,使图文消息一同合并转发(可能会使消息推送延迟过长)
:::
## 使用
::: warning
本节假设`COMMAND_START`设置中包含`''`,如果出现 bot 不响应的问题,请先
排查这个设置
:::
本节假设`COMMAND_START`设置中包含`''`
- 如果出现 bot 不响应的问题,请先排查这个设置
- 尝试在命令前添加设置的命令前缀,如`COMMAND_START=['/']`,则尝试使用`/添加订阅`
:::
### 命令

View File

@ -1,5 +1,4 @@
import asyncio
from asyncio.tasks import Task
from datetime import datetime
from typing import Optional, Type
@ -12,7 +11,7 @@ from nonebot.internal.params import ArgStr
from nonebot.internal.rule import Rule
from nonebot.log import logger
from nonebot.matcher import Matcher
from nonebot.params import Depends, EventMessage, EventPlainText, EventToMe, EventType
from nonebot.params import Depends, EventPlainText, EventToMe
from nonebot.permission import SUPERUSER
from nonebot.rule import to_me
from nonebot.typing import T_State
@ -83,7 +82,7 @@ def do_add_sub(add_sub: Type[Matcher]):
for platform_name in common_platform
]
)
+ "要查看全部平台请输入:“全部”"
+ "要查看全部平台请输入:“全部”\n中止订阅过程请输入:“取消”"
)
async def parse_platform(event: MessageEvent, state: T_State) -> None:
@ -98,6 +97,8 @@ def do_add_sub(add_sub: Type[Matcher]):
]
)
await add_sub.reject(message)
elif platform == "取消":
await add_sub.finish("已中止订阅")
elif platform in platform_manager:
state["platform"] = platform
else:
@ -108,9 +109,7 @@ def do_add_sub(add_sub: Type[Matcher]):
)
async def init_id(state: T_State):
if platform_manager[state["platform"]].has_target:
state[
"_prompt"
] = "请输入订阅用户的id详情查阅https://nonebot-bison.vercel.app/usage/#%E6%89%80%E6%94%AF%E6%8C%81%E5%B9%B3%E5%8F%B0%E7%9A%84uid"
state["_prompt"] = "请输入订阅用户的id:\n查询id获取方法请回复:“查询”"
else:
state["id"] = "default"
state["name"] = await platform_manager[state["platform"]].get_target_name(
@ -122,13 +121,32 @@ def do_add_sub(add_sub: Type[Matcher]):
return
target = str(event.get_message()).strip()
try:
if target == "查询":
raise LookupError
if target == "取消":
raise KeyboardInterrupt
name = await check_sub_target(state["platform"], target)
if not name:
raise ValueError
state["id"] = target
state["name"] = name
except:
except (LookupError):
url = "https://nonebot-bison.vercel.app/usage/#%E6%89%80%E6%94%AF%E6%8C%81%E5%B9%B3%E5%8F%B0%E7%9A%84-uid"
title = "Bison所支持的平台UID"
content = "查询相关平台的uid格式或获取方式"
image = "https://s3.bmp.ovh/imgs/2022/03/ab3cc45d83bd3dd3.jpg"
getId_share = f"[CQ:share,url={url},title={title},content={content},image={image}]" # 缩短字符串格式长度,以及方便后续修改为消息段格式
await add_sub.reject(Message(getId_share))
except (KeyboardInterrupt):
await add_sub.finish("已中止订阅")
except (ValueError):
await add_sub.reject("id输入错误")
else:
await add_sub.send(
"即将订阅的用户为:{} {} {}\n如有错误请输入“取消”重新订阅".format(
state["platform"], state["name"], state["id"]
)
)
@add_sub.got("id", _gen_prompt_template("{_prompt}"), [Depends(parse_id)])
async def init_cat(state: T_State):
@ -144,7 +162,9 @@ def do_add_sub(add_sub: Type[Matcher]):
return
res = []
for cat in str(event.get_message()).strip().split():
if cat not in platform_manager[state["platform"]].reverse_category:
if cat == "取消":
await add_sub.finish("已中止订阅")
elif cat not in platform_manager[state["platform"]].reverse_category:
await add_sub.reject("不支持 {}".format(cat))
res.append(platform_manager[state["platform"]].reverse_category[cat])
state["cats"] = res
@ -159,6 +179,8 @@ def do_add_sub(add_sub: Type[Matcher]):
async def parser_tags(event: MessageEvent, state: T_State):
if not isinstance(state["tags"], Message):
return
if str(event.get_message()).strip() == "取消": # 一般不会有叫 取消 的tag吧
await add_sub.finish("已中止订阅")
if str(event.get_message()).strip() == "全部标签":
state["tags"] = []
else:
@ -298,7 +320,7 @@ group_manage_matcher = on_command("群管理", rule=to_me(), permission=SUPERUSE
@group_manage_matcher.handle()
async def send_group_list(bot: Bot, event: GroupMessageEvent, state: T_State):
async def send_group_list_private(bot: Bot, event: GroupMessageEvent, state: T_State):
await group_manage_matcher.finish(Message("该功能只支持私聊使用请私聊Bot"))

View File

@ -12,6 +12,8 @@ class PlugConfig(BaseSettings):
bison_filter_log: bool = False
bison_to_me: bool = True
bison_skip_browser_check: bool = False
bison_use_pic_merge: int = 0 # 多图片时启用图片合并转发(仅限群),当bison_use_queue为False时该配置不会生效
# 0不启用1首条消息单独发送剩余照片合并转发2以及以上所有消息全部合并转发
bison_resend_times: int = 0
class Config:

View File

@ -1,23 +1,36 @@
import time
from typing import Literal, Union
from nonebot.adapters import Message, MessageSegment
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message, MessageSegment
from nonebot.log import logger
from .plugin_config import plugin_config
QUEUE = []
QUEUE: list[
tuple[
Bot,
int,
Literal["private", "group", "group-forward"],
Union[str, Message],
int,
]
] = []
LAST_SEND_TIME = time.time()
async def _do_send(
bot: "Bot", user: str, user_type: str, msg: Union[str, Message, MessageSegment]
bot: "Bot",
user: int,
user_type: Literal["group", "private", "group-forward"],
msg: Union[str, Message],
):
if user_type == "group":
await bot.call_api("send_group_msg", group_id=user, message=msg)
await bot.send_group_msg(group_id=user, message=msg)
elif user_type == "private":
await bot.call_api("send_private_msg", user_id=user, message=msg)
await bot.send_private_msg(user_id=user, message=msg)
elif user_type == "group-forward":
await bot.send_group_forward_msg(group_id=user, messages=msg)
async def do_send_msgs():
@ -39,10 +52,40 @@ async def do_send_msgs():
LAST_SEND_TIME = time.time()
async def send_msgs(bot: Bot, user, user_type: Literal["private", "group"], msgs: list):
async def _send_msgs_dispatch(
bot: Bot,
user,
user_type: Literal["private", "group", "group-forward"],
msg: Union[str, Message],
):
if plugin_config.bison_use_queue:
for msg in msgs:
QUEUE.append((bot, user, user_type, msg, plugin_config.bison_resend_times))
QUEUE.append((bot, user, user_type, msg, plugin_config.bison_resend_times))
else:
await _do_send(bot, user, user_type, msg)
async def send_msgs(bot: Bot, user, user_type: Literal["private", "group"], msgs: list):
if not plugin_config.bison_use_pic_merge or user_type == "private":
for msg in msgs:
await _do_send(bot, user, user_type, msg)
await _send_msgs_dispatch(bot, user, user_type, msg)
return
if plugin_config.bison_use_pic_merge == 1:
await _send_msgs_dispatch(bot, user, "group", msgs.pop(0))
if msgs:
if len(msgs) == 1: # 只有一条消息序列就不合并转发
await _send_msgs_dispatch(bot, user, "group", msgs.pop(0))
else:
group_bot_info = await bot.get_group_member_info(
group_id=user, user_id=int(bot.self_id), no_cache=True
) # 调用api获取群内bot的相关参数
forward_msg = Message(
[
MessageSegment.node_custom(
group_bot_info["user_id"],
nickname=group_bot_info["card"] or group_bot_info["nickname"],
content=msg,
)
for msg in msgs
]
)
await _send_msgs_dispatch(bot, user, "group-forward", forward_msg)

View File

@ -0,0 +1,283 @@
import pytest
import respx
from httpx import Response
from nonebug.app import App
from .platforms.utils import get_json
from .utils import BotReply, fake_admin_user, fake_group_message_event
# 选择platform阶段中止
@pytest.mark.asyncio
@respx.mock
async def test_abort_add_on_platform(app: App):
from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config import Config
from nonebot_bison.config_manager import add_sub_matcher, common_platform
from nonebot_bison.platform import platform_manager
config = Config()
config.user_target.truncate()
ak_list_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=1005056279793937"
)
ak_list_router.mock(
return_value=Response(200, json=get_json("weibo_ak_profile.json"))
)
ak_list_bad_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=100505000"
)
ak_list_bad_router.mock(
return_value=Response(200, json=get_json("weibo_err_profile.json"))
)
async with app.test_matcher(add_sub_matcher) as ctx:
bot = ctx.create_bot()
event_1 = fake_group_message_event(
message=Message("添加订阅"),
sender=Sender(card="", nickname="test", role="admin"),
to_me=True,
)
ctx.receive_event(bot, event_1)
ctx.should_pass_rule()
ctx.should_call_send(
event_1,
Message(BotReply.add_reply_on_platform(platform_manager, common_platform)),
True,
)
event_abort = fake_group_message_event(
message=Message("取消"), sender=Sender(card="", nickname="test", role="admin")
)
ctx.receive_event(bot, event_abort)
ctx.should_call_send(
event_abort,
BotReply.add_reply_abort,
True,
)
ctx.should_finished()
# 输入id阶段中止
@pytest.mark.asyncio
@respx.mock
async def test_abort_add_on_id(app: App):
from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config import Config
from nonebot_bison.config_manager import add_sub_matcher, common_platform
from nonebot_bison.platform import platform_manager
config = Config()
config.user_target.truncate()
ak_list_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=1005056279793937"
)
ak_list_router.mock(
return_value=Response(200, json=get_json("weibo_ak_profile.json"))
)
ak_list_bad_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=100505000"
)
ak_list_bad_router.mock(
return_value=Response(200, json=get_json("weibo_err_profile.json"))
)
async with app.test_matcher(add_sub_matcher) as ctx:
bot = ctx.create_bot()
event_1 = fake_group_message_event(
message=Message("添加订阅"),
sender=Sender(card="", nickname="test", role="admin"),
to_me=True,
)
ctx.receive_event(bot, event_1)
ctx.should_pass_rule()
ctx.should_call_send(
event_1,
Message(BotReply.add_reply_on_platform(platform_manager, common_platform)),
True,
)
event_2 = fake_group_message_event(
message=Message("weibo"), sender=fake_admin_user
)
ctx.receive_event(bot, event_2)
ctx.should_call_send(
event_2,
Message(BotReply.add_reply_on_id),
True,
)
event_abort = fake_group_message_event(
message=Message("取消"), sender=Sender(card="", nickname="test", role="admin")
)
ctx.receive_event(bot, event_abort)
ctx.should_call_send(
event_abort,
BotReply.add_reply_abort,
True,
)
ctx.should_finished()
# 输入订阅类别阶段中止
@pytest.mark.asyncio
@respx.mock
async def test_abort_add_on_cats(app: App):
from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config import Config
from nonebot_bison.config_manager import add_sub_matcher, common_platform
from nonebot_bison.platform import platform_manager
config = Config()
config.user_target.truncate()
ak_list_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=1005056279793937"
)
ak_list_router.mock(
return_value=Response(200, json=get_json("weibo_ak_profile.json"))
)
ak_list_bad_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=100505000"
)
ak_list_bad_router.mock(
return_value=Response(200, json=get_json("weibo_err_profile.json"))
)
async with app.test_matcher(add_sub_matcher) as ctx:
bot = ctx.create_bot()
event_1 = fake_group_message_event(
message=Message("添加订阅"),
sender=Sender(card="", nickname="test", role="admin"),
to_me=True,
)
ctx.receive_event(bot, event_1)
ctx.should_pass_rule()
ctx.should_call_send(
event_1,
Message(
BotReply.add_reply_on_platform(
platform_manager=platform_manager, common_platform=common_platform
)
),
True,
)
event_2 = fake_group_message_event(
message=Message("weibo"), sender=fake_admin_user
)
ctx.receive_event(bot, event_2)
ctx.should_call_send(
event_2,
Message(BotReply.add_reply_on_id),
True,
)
event_3 = fake_group_message_event(
message=Message("6279793937"), sender=fake_admin_user
)
ctx.receive_event(bot, event_3)
ctx.should_call_send(
event_3,
BotReply.add_reply_on_target_confirm(
"weibo", "明日方舟Arknights", "6279793937"
),
True,
)
ctx.should_call_send(
event_3,
Message(BotReply.add_reply_on_cats(platform_manager, "weibo")),
True,
)
event_abort = fake_group_message_event(
message=Message("取消"), sender=Sender(card="", nickname="test", role="admin")
)
ctx.receive_event(bot, event_abort)
ctx.should_call_send(
event_abort,
BotReply.add_reply_abort,
True,
)
ctx.should_finished()
# 输入标签阶段中止
@pytest.mark.asyncio
@respx.mock
async def test_abort_add_on_tag(app: App):
from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config import Config
from nonebot_bison.config_manager import add_sub_matcher, common_platform
from nonebot_bison.platform import platform_manager
config = Config()
config.user_target.truncate()
ak_list_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=1005056279793937"
)
ak_list_router.mock(
return_value=Response(200, json=get_json("weibo_ak_profile.json"))
)
ak_list_bad_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=100505000"
)
ak_list_bad_router.mock(
return_value=Response(200, json=get_json("weibo_err_profile.json"))
)
async with app.test_matcher(add_sub_matcher) as ctx:
bot = ctx.create_bot()
event_1 = fake_group_message_event(
message=Message("添加订阅"),
sender=Sender(card="", nickname="test", role="admin"),
to_me=True,
)
ctx.receive_event(bot, event_1)
ctx.should_pass_rule()
ctx.should_call_send(
event_1,
Message(
BotReply.add_reply_on_platform(
platform_manager=platform_manager, common_platform=common_platform
)
),
True,
)
event_2 = fake_group_message_event(
message=Message("weibo"), sender=fake_admin_user
)
ctx.receive_event(bot, event_2)
ctx.should_call_send(
event_2,
Message(BotReply.add_reply_on_id),
True,
)
event_3 = fake_group_message_event(
message=Message("6279793937"), sender=fake_admin_user
)
ctx.receive_event(bot, event_3)
ctx.should_call_send(
event_3,
BotReply.add_reply_on_target_confirm(
"weibo", "明日方舟Arknights", "6279793937"
),
True,
)
ctx.should_call_send(
event_3,
Message(BotReply.add_reply_on_cats(platform_manager, "weibo")),
True,
)
event_4 = fake_group_message_event(
message=Message("图文 文字"), sender=fake_admin_user
)
ctx.receive_event(bot, event_4)
ctx.should_call_send(event_4, Message(BotReply.add_reply_on_tags), True)
event_abort = fake_group_message_event(
message=Message("取消"), sender=Sender(card="", nickname="test", role="admin")
)
ctx.receive_event(bot, event_abort)
ctx.should_call_send(
event_abort,
BotReply.add_reply_abort,
True,
)
ctx.should_finished()

View File

@ -4,7 +4,7 @@ from httpx import Response
from nonebug.app import App
from .platforms.utils import get_json
from .utils import fake_admin_user, fake_group_message_event
from .utils import BotReply, fake_admin_user, fake_group_message_event
@pytest.mark.asyncio
@ -49,18 +49,7 @@ async def test_configurable_at_me_false(app: App):
ctx.receive_event(bot, event)
ctx.should_call_send(
event,
Message(
"请输入想要订阅的平台,目前支持,请输入冒号左边的名称:\n"
+ "".join(
[
"{}{}\n".format(
platform_name, platform_manager[platform_name].name
)
for platform_name in common_platform
]
)
+ "要查看全部平台请输入:“全部”"
),
Message(BotReply.add_reply_on_platform(platform_manager, common_platform)),
True,
)
ctx.should_pass_rule()
@ -104,16 +93,9 @@ async def test_add_with_target(app: App):
ctx.should_call_send(
event_1,
Message(
"请输入想要订阅的平台,目前支持,请输入冒号左边的名称:\n"
+ "".join(
[
"{}{}\n".format(
platform_name, platform_manager[platform_name].name
)
for platform_name in common_platform
]
BotReply.add_reply_on_platform(
platform_manager=platform_manager, common_platform=common_platform
)
+ "要查看全部平台请输入:“全部”"
),
True,
)
@ -124,15 +106,7 @@ async def test_add_with_target(app: App):
ctx.should_rejected()
ctx.should_call_send(
event_2,
(
"全部平台\n"
+ "\n".join(
[
"{}{}".format(platform_name, platform.name)
for platform_name, platform in platform_manager.items()
]
)
),
BotReply.add_reply_on_platform_input_allplatform(platform_manager),
True,
)
event_3 = fake_group_message_event(
@ -141,16 +115,14 @@ async def test_add_with_target(app: App):
ctx.receive_event(bot, event_3)
ctx.should_call_send(
event_3,
Message(
"请输入订阅用户的id详情查阅https://nonebot-bison.vercel.app/usage/#%E6%89%80%E6%94%AF%E6%8C%81%E5%B9%B3%E5%8F%B0%E7%9A%84uid"
),
Message(BotReply.add_reply_on_id),
True,
)
event_4_err = fake_group_message_event(
message=Message("000"), sender=fake_admin_user
)
ctx.receive_event(bot, event_4_err)
ctx.should_call_send(event_4_err, "id输入错误", True)
ctx.should_call_send(event_4_err, BotReply.add_reply_on_id_input_error, True)
ctx.should_rejected()
event_4_ok = fake_group_message_event(
message=Message("6279793937"), sender=fake_admin_user
@ -158,29 +130,36 @@ async def test_add_with_target(app: App):
ctx.receive_event(bot, event_4_ok)
ctx.should_call_send(
event_4_ok,
Message(
"请输入要订阅的类别,以空格分隔,支持的类别有:{}".format(
" ".join(list(platform_manager["weibo"].categories.values()))
)
BotReply.add_reply_on_target_confirm(
"weibo", "明日方舟Arknights", "6279793937"
),
True,
)
ctx.should_call_send(
event_4_ok,
Message(BotReply.add_reply_on_cats(platform_manager, "weibo")),
True,
)
event_5_err = fake_group_message_event(
message=Message("图文 文字 err"), sender=fake_admin_user
)
ctx.receive_event(bot, event_5_err)
ctx.should_call_send(event_5_err, "不支持 err", True)
ctx.should_call_send(
event_5_err, BotReply.add_reply_on_cats_input_error("err"), True
)
ctx.should_rejected()
event_5_ok = fake_group_message_event(
message=Message("图文 文字"), sender=fake_admin_user
)
ctx.receive_event(bot, event_5_ok)
ctx.should_call_send(event_5_ok, Message('请输入要订阅的tag订阅所有tag输入"全部标签"'), True)
ctx.should_call_send(event_5_ok, Message(BotReply.add_reply_on_tags), True)
event_6 = fake_group_message_event(
message=Message("全部标签"), sender=fake_admin_user
)
ctx.receive_event(bot, event_6)
ctx.should_call_send(event_6, ("添加 明日方舟Arknights 成功"), True)
ctx.should_call_send(
event_6, BotReply.add_reply_subscribe_success("明日方舟Arknights"), True
)
ctx.should_finished()
subs = config.list_subscribe(10000, "group")
assert len(subs) == 1
@ -220,18 +199,7 @@ async def test_add_with_target_no_cat(app: App):
ctx.should_pass_rule()
ctx.should_call_send(
event_1,
Message(
"请输入想要订阅的平台,目前支持,请输入冒号左边的名称:\n"
+ "".join(
[
"{}{}\n".format(
platform_name, platform_manager[platform_name].name
)
for platform_name in common_platform
]
)
+ "要查看全部平台请输入:“全部”"
),
Message(BotReply.add_reply_on_platform(platform_manager, common_platform)),
True,
)
event_3 = fake_group_message_event(
@ -240,16 +208,21 @@ async def test_add_with_target_no_cat(app: App):
ctx.receive_event(bot, event_3)
ctx.should_call_send(
event_3,
Message(
"请输入订阅用户的id详情查阅https://nonebot-bison.vercel.app/usage/#%E6%89%80%E6%94%AF%E6%8C%81%E5%B9%B3%E5%8F%B0%E7%9A%84uid"
),
Message(BotReply.add_reply_on_id),
True,
)
event_4_ok = fake_group_message_event(
message=Message("32540734"), sender=fake_admin_user
)
ctx.receive_event(bot, event_4_ok)
ctx.should_call_send(event_4_ok, ("添加 塞壬唱片-MSR 成功"), True)
ctx.should_call_send(
event_4_ok,
BotReply.add_reply_on_target_confirm("ncm-artist", "塞壬唱片-MSR", "32540734"),
True,
)
ctx.should_call_send(
event_4_ok, BotReply.add_reply_subscribe_success("塞壬唱片-MSR"), True
)
ctx.should_finished()
subs = config.list_subscribe(10000, "group")
assert len(subs) == 1
@ -284,18 +257,7 @@ async def test_add_no_target(app: App):
ctx.should_pass_rule()
ctx.should_call_send(
event_1,
Message(
"请输入想要订阅的平台,目前支持,请输入冒号左边的名称:\n"
+ "".join(
[
"{}{}\n".format(
platform_name, platform_manager[platform_name].name
)
for platform_name in common_platform
]
)
+ "要查看全部平台请输入:“全部”"
),
Message(BotReply.add_reply_on_platform(platform_manager, common_platform)),
True,
)
event_3 = fake_group_message_event(
@ -304,18 +266,16 @@ async def test_add_no_target(app: App):
ctx.receive_event(bot, event_3)
ctx.should_call_send(
event_3,
Message(
"请输入要订阅的类别,以空格分隔,支持的类别有:{}".format(
" ".join(list(platform_manager["arknights"].categories.values()))
)
),
Message(BotReply.add_reply_on_cats(platform_manager, "arknights")),
True,
)
event_4 = fake_group_message_event(
message=Message("游戏公告"), sender=fake_admin_user
)
ctx.receive_event(bot, event_4)
ctx.should_call_send(event_4, ("添加 明日方舟游戏信息 成功"), True)
ctx.should_call_send(
event_4, BotReply.add_reply_subscribe_success("明日方舟游戏信息"), True
)
ctx.should_finished()
subs = config.list_subscribe(10000, "group")
assert len(subs) == 1
@ -348,18 +308,7 @@ async def test_platform_name_err(app: App):
ctx.should_pass_rule()
ctx.should_call_send(
event_1,
Message(
"请输入想要订阅的平台,目前支持,请输入冒号左边的名称:\n"
+ "".join(
[
"{}{}\n".format(
platform_name, platform_manager[platform_name].name
)
for platform_name in common_platform
]
)
+ "要查看全部平台请输入:“全部”"
),
Message(BotReply.add_reply_on_platform(platform_manager, common_platform)),
True,
)
event_2 = fake_group_message_event(
@ -370,86 +319,89 @@ async def test_platform_name_err(app: App):
ctx.should_rejected()
ctx.should_call_send(
event_2,
"平台输入错误",
BotReply.add_reply_on_platform_input_error,
True,
)
@pytest.mark.asyncio
async def test_query_sub(app: App):
from nonebot.adapters.onebot.v11.message import Message
@respx.mock
async def test_add_with_get_id(app: App):
from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11.message import Message, MessageSegment
from nonebot_bison.config import Config
from nonebot_bison.config_manager import query_sub_matcher
from nonebot_bison.config_manager import add_sub_matcher, common_platform
from nonebot_bison.platform import platform_manager
config = Config()
config.user_target.truncate()
config.add_subscribe(
10000,
"group",
"6279793937",
"明日方舟Arknights",
"weibo",
[platform_manager["weibo"].reverse_category["图文"]],
["明日方舟"],
ak_list_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=1005056279793937"
)
async with app.test_matcher(query_sub_matcher) as ctx:
ak_list_router.mock(
return_value=Response(200, json=get_json("weibo_ak_profile.json"))
)
ak_list_bad_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=100505000"
)
ak_list_bad_router.mock(
return_value=Response(200, json=get_json("weibo_err_profile.json"))
)
async with app.test_matcher(add_sub_matcher) as ctx:
bot = ctx.create_bot()
event = fake_group_message_event(message=Message("查询订阅"), to_me=True)
ctx.receive_event(bot, event)
ctx.should_pass_rule()
ctx.should_pass_permission()
ctx.should_call_send(
event, Message("订阅的帐号为:\nweibo 明日方舟Arknights 6279793937 [图文] 明日方舟\n"), True
event_1 = fake_group_message_event(
message=Message("添加订阅"),
sender=Sender(card="", nickname="test", role="admin"),
to_me=True,
)
@pytest.mark.asyncio
async def test_del_sub(app: App):
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config import Config
from nonebot_bison.config_manager import del_sub_matcher
from nonebot_bison.platform import platform_manager
config = Config()
config.user_target.truncate()
config.add_subscribe(
10000,
"group",
"6279793937",
"明日方舟Arknights",
"weibo",
[platform_manager["weibo"].reverse_category["图文"]],
["明日方舟"],
)
async with app.test_matcher(del_sub_matcher) as ctx:
bot = ctx.create_bot(base=Bot)
assert isinstance(bot, Bot)
event = fake_group_message_event(
message=Message("删除订阅"), to_me=True, sender=fake_admin_user
)
ctx.receive_event(bot, event)
ctx.receive_event(bot, event_1)
ctx.should_pass_rule()
ctx.should_pass_permission()
ctx.should_call_send(
event,
event_1,
Message(
"订阅的帐号为:\n1 weibo 明日方舟Arknights 6279793937\n [图文] 明日方舟\n请输入要删除的订阅的序号"
BotReply.add_reply_on_platform(
platform_manager=platform_manager, common_platform=common_platform
)
),
True,
)
event_1_err = fake_group_message_event(
message=Message("2"), sender=fake_admin_user
event_3 = fake_group_message_event(
message=Message("weibo"), sender=fake_admin_user
)
ctx.receive_event(bot, event_1_err)
ctx.should_call_send(event_1_err, "删除错误", True)
ctx.receive_event(bot, event_3)
ctx.should_call_send(
event_3,
Message(BotReply.add_reply_on_id),
True,
)
event_4_query = fake_group_message_event(
message=Message("查询"), sender=fake_admin_user
)
ctx.receive_event(bot, event_4_query)
ctx.should_rejected()
event_1_ok = fake_group_message_event(
message=Message("1"), sender=fake_admin_user
ctx.should_call_send(
event_4_query,
Message([MessageSegment(*BotReply.add_reply_on_id_input_search())]),
True,
)
"""
line 362:
鬼知道为什么要在这里这样写
没有[]的话assert不了(should_call_send使用[MessageSegment(...)]的格式进行比较)
不在这里MessageSegment()的话也assert不了(指不能让add_reply_on_id_input_search直接返回一个MessageSegment对象)
amen
"""
event_abort = fake_group_message_event(
message=Message("取消"), sender=Sender(card="", nickname="test", role="admin")
)
ctx.receive_event(bot, event_abort)
ctx.should_call_send(
event_abort,
BotReply.add_reply_abort,
True,
)
ctx.receive_event(bot, event_1_ok)
ctx.should_call_send(event_1_ok, "删除成功", True)
ctx.should_finished()
subs = config.list_subscribe(10000, "group")
assert len(subs) == 0

View File

@ -0,0 +1,87 @@
import pytest
import respx
from httpx import Response
from nonebug.app import App
from .platforms.utils import get_json
from .utils import fake_admin_user, fake_group_message_event
@pytest.mark.asyncio
async def test_query_sub(app: App):
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config import Config
from nonebot_bison.config_manager import query_sub_matcher
from nonebot_bison.platform import platform_manager
config = Config()
config.user_target.truncate()
config.add_subscribe(
10000,
"group",
"6279793937",
"明日方舟Arknights",
"weibo",
[platform_manager["weibo"].reverse_category["图文"]],
["明日方舟"],
)
async with app.test_matcher(query_sub_matcher) as ctx:
bot = ctx.create_bot()
event = fake_group_message_event(message=Message("查询订阅"), to_me=True)
ctx.receive_event(bot, event)
ctx.should_pass_rule()
ctx.should_pass_permission()
ctx.should_call_send(
event, Message("订阅的帐号为:\nweibo 明日方舟Arknights 6279793937 [图文] 明日方舟\n"), True
)
@pytest.mark.asyncio
async def test_del_sub(app: App):
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config import Config
from nonebot_bison.config_manager import del_sub_matcher
from nonebot_bison.platform import platform_manager
config = Config()
config.user_target.truncate()
config.add_subscribe(
10000,
"group",
"6279793937",
"明日方舟Arknights",
"weibo",
[platform_manager["weibo"].reverse_category["图文"]],
["明日方舟"],
)
async with app.test_matcher(del_sub_matcher) as ctx:
bot = ctx.create_bot(base=Bot)
assert isinstance(bot, Bot)
event = fake_group_message_event(
message=Message("删除订阅"), to_me=True, sender=fake_admin_user
)
ctx.receive_event(bot, event)
ctx.should_pass_rule()
ctx.should_pass_permission()
ctx.should_call_send(
event,
Message(
"订阅的帐号为:\n1 weibo 明日方舟Arknights 6279793937\n [图文] 明日方舟\n请输入要删除的订阅的序号"
),
True,
)
event_1_err = fake_group_message_event(
message=Message("2"), sender=fake_admin_user
)
ctx.receive_event(bot, event_1_err)
ctx.should_call_send(event_1_err, "删除错误", True)
ctx.should_rejected()
event_1_ok = fake_group_message_event(
message=Message("1"), sender=fake_admin_user
)
ctx.receive_event(bot, event_1_ok)
ctx.should_call_send(event_1_ok, "删除成功", True)
ctx.should_finished()
subs = config.list_subscribe(10000, "group")
assert len(subs) == 0

View File

@ -1,13 +1,13 @@
import asyncio
import pytest
from nonebot.adapters.onebot.v11.bot import Bot
from nonebug import App
@pytest.mark.asyncio
async def test_send_no_queue(app: App):
import nonebot
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot_bison.plugin_config import plugin_config
from nonebot_bison.send import send_msgs
@ -32,6 +32,7 @@ async def test_send_no_queue(app: App):
@pytest.mark.asyncio
async def test_send_queue(app: App):
import nonebot
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot_bison import send
from nonebot_bison.plugin_config import plugin_config
from nonebot_bison.send import LAST_SEND_TIME, do_send_msgs, send_msgs
@ -56,3 +57,163 @@ async def test_send_queue(app: App):
app.monkeypatch.setattr(send, "LAST_SEND_TIME", 0, True)
await do_send_msgs()
assert ctx.wait_list.empty()
@pytest.mark.asyncio
async def test_send_merge_no_queue(app: App):
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message, MessageSegment
from nonebot_bison.plugin_config import plugin_config
from nonebot_bison.send import send_msgs
plugin_config.bison_use_pic_merge = 1
plugin_config.bison_use_queue = False
async with app.test_api() as ctx:
bot = ctx.create_bot(base=Bot, self_id="8888")
assert isinstance(bot, Bot)
message = [
Message(MessageSegment.text("test msg")),
Message(MessageSegment.image("https://picsum.photos/200/300")),
]
ctx.should_call_api(
"send_group_msg",
{"group_id": 633, "message": Message(MessageSegment.text("test msg"))},
None,
)
ctx.should_call_api(
"send_group_msg",
{"group_id": 633, "message": message[1]},
None,
)
await send_msgs(bot, 633, "group", message)
message = [
Message(MessageSegment.text("test msg")),
Message(MessageSegment.image("https://picsum.photos/200/300")),
Message(MessageSegment.image("https://picsum.photos/200/300")),
]
ctx.should_call_api(
"send_group_msg",
{"group_id": 633, "message": Message(MessageSegment.text("test msg"))},
None,
)
ctx.should_call_api(
"get_group_member_info",
{"group_id": 633, "user_id": 8888, "no_cache": True},
{"user_id": 8888, "card": "admin", "nickname": "adminuser"},
)
merged_message = Message(
[
MessageSegment.node_custom(
user_id=8888, nickname="admin", content=message[1]
),
MessageSegment.node_custom(
user_id=8888, nickname="admin", content=message[2]
),
]
)
ctx.should_call_api(
"send_group_forward_msg",
{"group_id": 633, "messages": merged_message},
None,
)
await send_msgs(bot, 633, "group", message)
message = [
Message(MessageSegment.text("test msg")),
Message(MessageSegment.image("https://picsum.photos/200/300")),
Message(MessageSegment.image("https://picsum.photos/200/300")),
Message(MessageSegment.image("https://picsum.photos/200/300")),
]
ctx.should_call_api(
"send_group_msg",
{"group_id": 633, "message": Message(MessageSegment.text("test msg"))},
None,
)
ctx.should_call_api(
"get_group_member_info",
{"group_id": 633, "user_id": 8888, "no_cache": True},
{"user_id": 8888, "card": None, "nickname": "adminuser"},
)
merged_message = Message(
[
MessageSegment.node_custom(
user_id=8888, nickname="adminuser", content=message[1]
),
MessageSegment.node_custom(
user_id=8888, nickname="adminuser", content=message[2]
),
MessageSegment.node_custom(
user_id=8888, nickname="adminuser", content=message[3]
),
]
)
ctx.should_call_api(
"send_group_forward_msg",
{"group_id": 633, "messages": merged_message},
None,
)
await send_msgs(bot, 633, "group", message)
# private user should not send in forward
message = [
Message(MessageSegment.text("test msg")),
Message(MessageSegment.image("https://picsum.photos/200/300")),
Message(MessageSegment.image("https://picsum.photos/200/300")),
]
ctx.should_call_api(
"send_private_msg",
{"user_id": 633, "message": Message(MessageSegment.text("test msg"))},
None,
)
ctx.should_call_api(
"send_private_msg", {"user_id": 633, "message": message[1]}, None
)
ctx.should_call_api(
"send_private_msg", {"user_id": 633, "message": message[2]}, None
)
await send_msgs(bot, 633, "private", message)
async def test_send_merge2_no_queue(app: App):
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message, MessageSegment
from nonebot_bison.plugin_config import plugin_config
from nonebot_bison.send import send_msgs
plugin_config.bison_use_pic_merge = 2
plugin_config.bison_use_queue = False
async with app.test_api() as ctx:
bot = ctx.create_bot(base=Bot, self_id="8888")
assert isinstance(bot, Bot)
message = [
Message(MessageSegment.text("test msg")),
Message(MessageSegment.image("https://picsum.photos/200/300")),
Message(MessageSegment.image("https://picsum.photos/200/300")),
]
ctx.should_call_api(
"get_group_member_info",
{"group_id": 633, "user_id": 8888, "no_cache": True},
{"user_id": 8888, "card": "admin", "nickname": "adminuser"},
)
merged_message = Message(
[
MessageSegment.node_custom(
user_id=8888, nickname="admin", content=message[0]
),
MessageSegment.node_custom(
user_id=8888, nickname="admin", content=message[1]
),
MessageSegment.node_custom(
user_id=8888, nickname="admin", content=message[2]
),
]
)
ctx.should_call_api(
"send_group_forward_msg",
{"group_id": 633, "messages": merged_message},
None,
)
await send_msgs(bot, 633, "group", message)

View File

@ -69,3 +69,69 @@ from nonebot.adapters.onebot.v11.event import Sender
fake_admin_user = Sender(nickname="test", role="admin")
fake_superuser = Sender(user_id=10001, nickname="superuser")
class BotReply:
@staticmethod
def add_reply_on_platform(platform_manager, common_platform):
return (
"请输入想要订阅的平台,目前支持,请输入冒号左边的名称:\n"
+ "".join(
[
"{}{}\n".format(
platform_name, platform_manager[platform_name].name
)
for platform_name in common_platform
]
)
+ "要查看全部平台请输入:“全部”\n中止订阅过程请输入:“取消”"
)
@staticmethod
def add_reply_on_platform_input_allplatform(platform_manager):
return "全部平台\n" + "\n".join(
[
"{}{}".format(platform_name, platform.name)
for platform_name, platform in platform_manager.items()
]
)
@staticmethod
def add_reply_on_id_input_search():
search_url = "https://nonebot-bison.vercel.app/usage/#%E6%89%80%E6%94%AF%E6%8C%81%E5%B9%B3%E5%8F%B0%E7%9A%84-uid"
search_title = "Bison所支持的平台UID"
search_content = "查询相关平台的uid格式或获取方式"
search_image = "https://s3.bmp.ovh/imgs/2022/03/ab3cc45d83bd3dd3.jpg"
type = "share"
data = {
"url": search_url,
"title": search_title,
"content": search_content,
"image": search_image,
}
msg = [type, data]
return msg
@staticmethod
def add_reply_on_target_confirm(platform, name, id):
return f"即将订阅的用户为:{platform} {name} {id}\n如有错误请输入“取消”重新订阅"
@staticmethod
def add_reply_on_cats(platform_manager, platform: str):
return "请输入要订阅的类别,以空格分隔,支持的类别有:{}".format(
" ".join(list(platform_manager[platform].categories.values()))
)
@staticmethod
def add_reply_on_cats_input_error(cat: str):
return "不支持 {}".format(cat)
@staticmethod
def add_reply_subscribe_success(name):
return "添加 {} 成功".format(name)
add_reply_on_id_input_error = "id输入错误"
add_reply_on_platform_input_error = "平台输入错误"
add_reply_on_id = "请输入订阅用户的id:\n查询id获取方法请回复:“查询”"
add_reply_on_tags = '请输入要订阅的tag订阅所有tag输入"全部标签"'
add_reply_abort = "已中止订阅"