test for send.py

This commit is contained in:
felinae98 2022-02-13 20:31:33 +08:00
parent 5a492aceb3
commit 6a8b2e2d15
No known key found for this signature in database
GPG Key ID: 00C8B010587FF610
4 changed files with 63 additions and 6 deletions

View File

@ -3,6 +3,7 @@ import logging
import nonebot import nonebot
from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.schedulers.asyncio import AsyncIOScheduler
from nonebot import get_driver, logger from nonebot import get_driver, logger
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.log import LoguruHandler from nonebot.log import LoguruHandler
from .config import Config from .config import Config
@ -46,8 +47,8 @@ async def fetch_and_send(target_type: str):
send_user_list, send_user_list,
) )
) )
bot_list = list(nonebot.get_bots().values()) bot = nonebot.get_bot()
bot = bot_list[0] if bot_list else None assert isinstance(bot, Bot)
to_send = await platform_manager[target_type].fetch_new_post( to_send = await platform_manager[target_type].fetch_new_post(
target, send_userinfo_list target, send_userinfo_list
) )

View File

@ -1,6 +1,8 @@
import time import time
from typing import Literal, Union
from nonebot import logger from nonebot import logger
from nonebot.adapters import Message, MessageSegment
from nonebot.adapters.onebot.v11.bot import Bot from nonebot.adapters.onebot.v11.bot import Bot
from .plugin_config import plugin_config from .plugin_config import plugin_config
@ -9,7 +11,9 @@ QUEUE = []
LAST_SEND_TIME = time.time() LAST_SEND_TIME = time.time()
async def _do_send(bot: "Bot", user: str, user_type: str, msg): async def _do_send(
bot: "Bot", user: str, user_type: str, msg: Union[str, Message, MessageSegment]
):
if user_type == "group": if user_type == "group":
await bot.call_api("send_group_msg", group_id=user, message=msg) await bot.call_api("send_group_msg", group_id=user, message=msg)
elif user_type == "private": elif user_type == "private":
@ -21,6 +25,9 @@ async def do_send_msgs():
if time.time() - LAST_SEND_TIME < 1.5: if time.time() - LAST_SEND_TIME < 1.5:
return return
if QUEUE: if QUEUE:
import ipdb
ipdb.set_trace()
bot, user, user_type, msg, retry_time = QUEUE.pop(0) bot, user, user_type, msg, retry_time = QUEUE.pop(0)
try: try:
await _do_send(bot, user, user_type, msg) await _do_send(bot, user, user_type, msg)
@ -35,7 +42,7 @@ async def do_send_msgs():
LAST_SEND_TIME = time.time() LAST_SEND_TIME = time.time()
async def send_msgs(bot, user, user_type, msgs): async def send_msgs(bot: Bot, user, user_type: Literal["private", "group"], msgs: list):
if plugin_config.bison_use_queue: if plugin_config.bison_use_queue:
for msg in msgs: for msg in msgs:
QUEUE.append((bot, user, user_type, msg, 2)) QUEUE.append((bot, user, user_type, msg, 2))

View File

@ -1,5 +1,5 @@
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any, Callable, NamedTuple, NewType from typing import Any, Callable, Literal, NamedTuple, NewType
RawPost = NewType("RawPost", Any) RawPost = NewType("RawPost", Any)
Target = NewType("Target", str) Target = NewType("Target", str)
@ -10,7 +10,7 @@ Tag = NewType("Tag", str)
@dataclass(eq=True, frozen=True) @dataclass(eq=True, frozen=True)
class User: class User:
user: str user: str
user_type: str user_type: Literal["group", "private"]
class UserSubInfo(NamedTuple): class UserSubInfo(NamedTuple):

49
tests/test_send.py Normal file
View File

@ -0,0 +1,49 @@
import asyncio
import pytest
from nonebot.adapters.onebot.v11.bot import Bot
from nonebug import App
@pytest.mark.asyncio
async def test_send_no_queue(app: App):
import nonebot
from nonebot_bison.plugin_config import plugin_config
from nonebot_bison.send import send_msgs
async with app.test_api() as ctx:
app.monkeypatch.setattr(plugin_config, "bison_use_queue", False, True)
bot = ctx.create_bot(base=Bot)
assert isinstance(bot, Bot)
ctx.should_call_api(
"send_group_msg", {"group_id": "1233", "message": "msg1"}, True
)
ctx.should_call_api(
"send_group_msg", {"group_id": "1233", "message": "msg2"}, True
)
await send_msgs(bot, "1233", "group", ["msg1", "msg2"])
@pytest.mark.asyncio
async def test_send_queue(app: App):
import nonebot
from nonebot_bison.plugin_config import plugin_config
from nonebot_bison.send import LAST_SEND_TIME, do_send_msgs, send_msgs
async with app.test_api() as ctx:
new_bot = ctx.create_bot(base=Bot)
app.monkeypatch.setattr(nonebot, "get_bot", lambda: new_bot, True)
app.monkeypatch.setattr(plugin_config, "bison_use_queue", True, True)
bot = nonebot.get_bot()
assert isinstance(bot, Bot)
assert bot == new_bot
ctx.should_call_api(
"send_group_msg", {"group_id": "1233", "message": "test msg"}, True
)
await bot.call_api("send_group_msg", group_id="1233", message="test msg")
await send_msgs(bot, "1233", "group", ["msg"])
ctx.should_call_api(
"send_group_msg", {"group_id": "1233", "message": "msg"}, True
)
LAST_SEND_TIME = 0
await do_send_msgs()