mirror of
https://github.com/suyiiyii/nonebot-bison.git
synced 2025-06-08 04:43:00 +08:00
* 🎨 修改 nonebot_bison 目录位置 * auto fix by pre-commit hooks * 🚚 fix frontend build target * 🚚 use soft link * Revert "🚚 use soft link" This reverts commit de21f79d5ae1bd5515b04f42a4138cb25ddf3e62. * 🚚 modify dockerfile --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: felinae98 <731499577@qq.com>
226 lines
8.0 KiB
Python
226 lines
8.0 KiB
Python
import asyncio
|
|
import typing
|
|
|
|
import pytest
|
|
from flaky import flaky
|
|
from nonebug import App
|
|
from pytest_mock.plugin import MockerFixture
|
|
|
|
if typing.TYPE_CHECKING:
|
|
from nonebot.adapters.onebot.v11.message import Message, MessageSegment
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_no_queue(app: App, mocker: MockerFixture):
|
|
from nonebot.adapters.onebot.v11.bot import Bot
|
|
from nonebot.adapters.onebot.v11.message import Message
|
|
|
|
from nonebot_bison.plugin_config import plugin_config
|
|
from nonebot_bison.send import send_msgs
|
|
|
|
mocker.patch.object(plugin_config, "bison_use_queue", False)
|
|
async with app.test_api() as ctx:
|
|
bot = ctx.create_bot(base=Bot)
|
|
assert isinstance(bot, Bot)
|
|
ctx.should_call_api(
|
|
"send_group_msg", {"group_id": "1233", "message": Message("msg1")}, True
|
|
)
|
|
ctx.should_call_api(
|
|
"send_group_msg", {"group_id": "1233", "message": Message("msg2")}, True
|
|
)
|
|
ctx.should_call_api(
|
|
"send_private_msg", {"user_id": "666", "message": Message("priv")}, True
|
|
)
|
|
await send_msgs(bot, "1233", "group", [Message("msg1"), Message("msg2")])
|
|
await send_msgs(bot, "666", "private", [Message("priv")])
|
|
assert ctx.wait_list.empty()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_queue(app: App, mocker: MockerFixture):
|
|
import nonebot
|
|
from nonebot.adapters.onebot.v11.bot import Bot
|
|
from nonebot.adapters.onebot.v11.message import Message, MessageSegment
|
|
|
|
from nonebot_bison import send
|
|
from nonebot_bison.plugin_config import plugin_config
|
|
from nonebot_bison.send import MESSGE_SEND_INTERVAL, do_send_msgs, send_msgs
|
|
|
|
mocker.patch.object(plugin_config, "bison_use_queue", True)
|
|
async with app.test_api() as ctx:
|
|
new_bot = ctx.create_bot(base=Bot)
|
|
mocker.patch.object(nonebot, "get_bot", lambda: new_bot)
|
|
bot = nonebot.get_bot()
|
|
assert isinstance(bot, Bot)
|
|
assert bot == new_bot
|
|
ctx.should_call_api(
|
|
"send_group_msg",
|
|
{"group_id": "1233", "message": [MessageSegment.text("msg")]},
|
|
True,
|
|
)
|
|
ctx.should_call_api(
|
|
"send_group_msg",
|
|
{"group_id": "1233", "message": [MessageSegment.text("msg2")]},
|
|
True,
|
|
)
|
|
await send_msgs(bot, "1233", "group", [Message("msg")])
|
|
await send_msgs(bot, "1233", "group", [Message("msg2")])
|
|
assert not ctx.wait_list.empty()
|
|
await asyncio.sleep(2 * MESSGE_SEND_INTERVAL)
|
|
assert ctx.wait_list.empty()
|
|
|
|
|
|
def gen_node(id, name, content: "Message"):
|
|
from nonebot.adapters.onebot.v11.message import MessageSegment
|
|
|
|
return MessageSegment.node_custom(id, name, content)
|
|
|
|
|
|
def _merge_messge(nodes):
|
|
from nonebot.adapters.onebot.v11.message import Message
|
|
|
|
return Message(nodes)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_merge_no_queue(app: App):
|
|
from nonebot.adapters.onebot.v11.bot import Bot
|
|
from nonebot.adapters.onebot.v11.message import Message, MessageSegment
|
|
|
|
from nonebot_bison.plugin_config import plugin_config
|
|
from nonebot_bison.send import send_msgs
|
|
|
|
plugin_config.bison_use_pic_merge = 1
|
|
plugin_config.bison_use_queue = False
|
|
|
|
async with app.test_api() as ctx:
|
|
bot = ctx.create_bot(base=Bot, self_id="8888")
|
|
assert isinstance(bot, Bot)
|
|
message = [
|
|
Message(MessageSegment.text("test msg")),
|
|
Message(MessageSegment.image("https://picsum.photos/200/300")),
|
|
]
|
|
ctx.should_call_api(
|
|
"send_group_msg",
|
|
{"group_id": 633, "message": Message(MessageSegment.text("test msg"))},
|
|
None,
|
|
)
|
|
ctx.should_call_api(
|
|
"send_group_msg",
|
|
{"group_id": 633, "message": message[1]},
|
|
None,
|
|
)
|
|
await send_msgs(bot, 633, "group", message)
|
|
|
|
message = [
|
|
Message(MessageSegment.text("test msg")),
|
|
Message(MessageSegment.image("https://picsum.photos/200/300")),
|
|
Message(MessageSegment.image("https://picsum.photos/200/300")),
|
|
]
|
|
ctx.should_call_api(
|
|
"send_group_msg",
|
|
{"group_id": 633, "message": Message(MessageSegment.text("test msg"))},
|
|
None,
|
|
)
|
|
ctx.should_call_api(
|
|
"get_group_member_info",
|
|
{"group_id": 633, "user_id": 8888, "no_cache": True},
|
|
{"user_id": 8888, "card": "admin", "nickname": "adminuser"},
|
|
)
|
|
merged_message = _merge_messge(
|
|
[gen_node(8888, "admin", message[1]), gen_node(8888, "admin", message[2])]
|
|
)
|
|
ctx.should_call_api(
|
|
"send_group_forward_msg",
|
|
{"group_id": 633, "messages": merged_message},
|
|
None,
|
|
)
|
|
await send_msgs(bot, 633, "group", message)
|
|
|
|
message = [
|
|
Message(MessageSegment.text("test msg")),
|
|
Message(MessageSegment.image("https://picsum.photos/200/300")),
|
|
Message(MessageSegment.image("https://picsum.photos/200/300")),
|
|
Message(MessageSegment.image("https://picsum.photos/200/300")),
|
|
]
|
|
ctx.should_call_api(
|
|
"send_group_msg",
|
|
{"group_id": 633, "message": Message(MessageSegment.text("test msg"))},
|
|
None,
|
|
)
|
|
ctx.should_call_api(
|
|
"get_group_member_info",
|
|
{"group_id": 633, "user_id": 8888, "no_cache": True},
|
|
{"user_id": 8888, "card": None, "nickname": "adminuser"},
|
|
)
|
|
merged_message = _merge_messge(
|
|
[
|
|
gen_node(8888, "adminuser", message[1]),
|
|
gen_node(8888, "adminuser", message[2]),
|
|
gen_node(8888, "adminuser", message[3]),
|
|
]
|
|
)
|
|
ctx.should_call_api(
|
|
"send_group_forward_msg",
|
|
{"group_id": 633, "messages": merged_message},
|
|
None,
|
|
)
|
|
await send_msgs(bot, 633, "group", message)
|
|
|
|
# private user should not send in forward
|
|
message = [
|
|
Message(MessageSegment.text("test msg")),
|
|
Message(MessageSegment.image("https://picsum.photos/200/300")),
|
|
Message(MessageSegment.image("https://picsum.photos/200/300")),
|
|
]
|
|
ctx.should_call_api(
|
|
"send_private_msg",
|
|
{"user_id": 633, "message": Message(MessageSegment.text("test msg"))},
|
|
None,
|
|
)
|
|
ctx.should_call_api(
|
|
"send_private_msg", {"user_id": 633, "message": message[1]}, None
|
|
)
|
|
ctx.should_call_api(
|
|
"send_private_msg", {"user_id": 633, "message": message[2]}, None
|
|
)
|
|
await send_msgs(bot, 633, "private", message)
|
|
|
|
|
|
async def test_send_merge2_no_queue(app: App):
|
|
from nonebot.adapters.onebot.v11.bot import Bot
|
|
from nonebot.adapters.onebot.v11.message import Message, MessageSegment
|
|
|
|
from nonebot_bison.plugin_config import plugin_config
|
|
from nonebot_bison.send import send_msgs
|
|
|
|
plugin_config.bison_use_pic_merge = 2
|
|
plugin_config.bison_use_queue = False
|
|
|
|
async with app.test_api() as ctx:
|
|
bot = ctx.create_bot(base=Bot, self_id="8888")
|
|
assert isinstance(bot, Bot)
|
|
message = [
|
|
Message(MessageSegment.text("test msg")),
|
|
Message(MessageSegment.image("https://picsum.photos/200/300")),
|
|
Message(MessageSegment.image("https://picsum.photos/200/300")),
|
|
]
|
|
ctx.should_call_api(
|
|
"get_group_member_info",
|
|
{"group_id": 633, "user_id": 8888, "no_cache": True},
|
|
{"user_id": 8888, "card": "admin", "nickname": "adminuser"},
|
|
)
|
|
merged_message = _merge_messge(
|
|
[
|
|
gen_node(8888, "admin", message[0]),
|
|
gen_node(8888, "admin", message[1]),
|
|
gen_node(8888, "admin", message[2]),
|
|
]
|
|
)
|
|
ctx.should_call_api(
|
|
"send_group_forward_msg",
|
|
{"group_id": 633, "messages": merged_message},
|
|
None,
|
|
)
|
|
await send_msgs(bot, 633, "group", message)
|