♻️ refactor send logic (#185)

This commit is contained in:
felinae98 2023-02-05 23:36:21 +08:00 committed by GitHub
parent 2e9ca5636e
commit 64eb427c51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 63 additions and 74 deletions

View File

@ -1,31 +1,8 @@
import logging
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from nonebot.log import LoguruHandler
from ..plugin_config import plugin_config
from ..send import do_send_msgs
aps = AsyncIOScheduler(timezone="Asia/Shanghai")
class CustomLogHandler(LoguruHandler):
def filter(self, record: logging.LogRecord):
return record.msg != (
'Execution of job "%s" '
"skipped: maximum number of running instances reached (%d)"
)
if plugin_config.bison_use_queue:
aps.add_job(do_send_msgs, "interval", seconds=0.3, coalesce=True)
aps_logger = logging.getLogger("apscheduler")
aps_logger.setLevel(30)
aps_logger.handlers.clear()
aps_logger.addHandler(CustomLogHandler())
def start_scheduler():
aps.configure({"apscheduler.timezone": "Asia/Shanghai"})
aps.start()

View File

@ -1,15 +1,17 @@
import time
from typing import Literal, Union
import asyncio
from asyncio.tasks import sleep
from collections import deque
from typing import Deque, Literal, Union
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.exception import ActionFailed
from nonebot.adapters.onebot.v11.message import Message
from nonebot.adapters.onebot.v11.message import Message, MessageSegment
from nonebot.log import logger
from .plugin_config import plugin_config
from .utils.get_bot import refresh_bots
QUEUE: list[
QUEUE: Deque[
tuple[
Bot,
int,
@ -17,8 +19,9 @@ QUEUE: list[
Union[str, Message],
int,
]
] = []
LAST_SEND_TIME = time.time()
] = deque()
MESSGE_SEND_INTERVAL = 1.5
async def _do_send(
@ -40,22 +43,34 @@ async def _do_send(
async def do_send_msgs():
global LAST_SEND_TIME
if time.time() - LAST_SEND_TIME < 1.5:
if not QUEUE:
return
if QUEUE:
bot, user, user_type, msg, retry_time = QUEUE.pop(0)
while True:
# why read from queue then pop item from queue?
# if there is only 1 item in queue, pop it and await send
# the length of queue will be 0.
# At that time, adding items to queue will trigger a new execution of this func, which is wrong.
# So, read from queue first then pop from it
bot, user, user_type, msg, retry_time = QUEUE[0]
try:
await _do_send(bot, user, user_type, msg)
except Exception as e:
await asyncio.sleep(MESSGE_SEND_INTERVAL)
QUEUE.popleft()
if retry_time > 0:
QUEUE.insert(0, (bot, user, user_type, msg, retry_time - 1))
QUEUE.appendleft((bot, user, user_type, msg, retry_time - 1))
else:
msg_str = str(msg)
if len(msg_str) > 50:
msg_str = msg_str[:50] + "..."
logger.warning(f"send msg err {e} {msg_str}")
LAST_SEND_TIME = time.time()
else:
# sleeping after popping may also cause re-execution error like above mentioned
await asyncio.sleep(MESSGE_SEND_INTERVAL)
QUEUE.popleft()
finally:
if not QUEUE:
return
async def _send_msgs_dispatch(
@ -66,6 +81,9 @@ async def _send_msgs_dispatch(
):
if plugin_config.bison_use_queue:
QUEUE.append((bot, user, user_type, msg, plugin_config.bison_resend_times))
# len(QUEUE) before append was 0
if len(QUEUE) == 1:
asyncio.create_task(do_send_msgs())
else:
await _do_send(bot, user, user_type, msg)
@ -87,28 +105,15 @@ async def send_msgs(
group_bot_info = await bot.get_group_member_info(
group_id=user, user_id=int(bot.self_id), no_cache=True
) # 调用api获取群内bot的相关参数
# forward_msg = Message(
# [
# MessageSegment.node_custom(
# group_bot_info["user_id"],
# nickname=group_bot_info["card"] or group_bot_info["nickname"],
# content=msg,
# )
# for msg in msgs
# ]
# )
# FIXME: Because of https://github.com/nonebot/adapter-onebot/issues/9
forward_msg = [
{
"type": "node",
"data": {
"name": group_bot_info["card"] or group_bot_info["nickname"],
"uin": group_bot_info["user_id"],
"content": msg,
},
}
for msg in msgs
]
forward_msg = Message(
[
MessageSegment.node_custom(
group_bot_info["user_id"],
nickname=group_bot_info["card"] or group_bot_info["nickname"],
content=msg,
)
for msg in msgs
]
)
await _send_msgs_dispatch(bot, user, "group-forward", forward_msg)

View File

@ -1,8 +1,13 @@
import asyncio
import typing
import pytest
from flaky import flaky
from nonebot.adapters.onebot.v11.message import Message
from nonebug import App
if typing.TYPE_CHECKING:
from nonebot.adapters.onebot.v11.message import Message, MessageSegment
@pytest.mark.asyncio
async def test_send_no_queue(app: App):
@ -29,46 +34,48 @@ async def test_send_no_queue(app: App):
assert ctx.wait_list.empty()
@flaky
@pytest.mark.asyncio
@pytest.mark.parametrize("nonebug_init", [{"bison_use_queue": True}], indirect=True)
async def test_send_queue(app: App):
import nonebot
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message, MessageSegment
from nonebot_bison import send
from nonebot_bison.plugin_config import plugin_config
from nonebot_bison.send import do_send_msgs, send_msgs
from nonebot_bison.send import MESSGE_SEND_INTERVAL, do_send_msgs, send_msgs
async with app.test_api() as ctx:
new_bot = ctx.create_bot(base=Bot)
app.monkeypatch.setattr(nonebot, "get_bot", lambda: new_bot, True)
app.monkeypatch.setattr(plugin_config, "bison_use_queue", True, True)
bot = nonebot.get_bot()
assert isinstance(bot, Bot)
assert bot == new_bot
ctx.should_call_api(
"send_group_msg", {"group_id": "1233", "message": "test msg"}, True
)
await bot.call_api("send_group_msg", group_id="1233", message="test msg")
await send_msgs(bot, "1233", "group", [Message("msg")])
ctx.should_call_api(
"send_group_msg",
{"group_id": "1233", "message": [MessageSegment.text("msg")]},
True,
)
await do_send_msgs()
ctx.should_call_api(
"send_group_msg",
{"group_id": "1233", "message": [MessageSegment.text("msg2")]},
True,
)
await send_msgs(bot, "1233", "group", [Message("msg")])
await send_msgs(bot, "1233", "group", [Message("msg2")])
assert not ctx.wait_list.empty()
app.monkeypatch.setattr(send, "LAST_SEND_TIME", 0, True)
await do_send_msgs()
await asyncio.sleep(2 * MESSGE_SEND_INTERVAL)
assert ctx.wait_list.empty()
def gen_node(id, name, content: Message):
return {"type": "node", "data": {"name": name, "uin": id, "content": content}}
def gen_node(id, name, content: "Message"):
from nonebot.adapters.onebot.v11.message import MessageSegment
return MessageSegment.node_custom(id, name, content)
def _merge_messge(nodes):
return nodes
from nonebot.adapters.onebot.v11.message import Message
return Message(nodes)
@pytest.mark.asyncio