Merge branch 'main' into dev

This commit is contained in:
AzideCupric
2022-03-13 20:20:18 +08:00
committed by GitHub
34 changed files with 2575 additions and 8278 deletions
-2
View File
@@ -1,5 +1,3 @@
import nonebot
from . import (
admin_page,
config,
@@ -45,10 +45,10 @@ class SinglePageApplication(StaticFiles):
self.index = index
super().__init__(directory=directory, packages=None, html=True, check_dir=True)
async def lookup_path(self, path: str) -> tuple[str, Union[os.stat_result, None]]:
full_path, stat_res = await super().lookup_path(path)
def lookup_path(self, path: str) -> tuple[str, Union[os.stat_result, None]]:
full_path, stat_res = super().lookup_path(path)
if stat_res is None:
return await super().lookup_path(self.index)
return super().lookup_path(self.index)
return (full_path, stat_res)
+202 -51
View File
@@ -1,13 +1,18 @@
from typing import Type
import asyncio
from asyncio.tasks import Task
from datetime import datetime
from typing import Optional, Type
from nonebot import on_command
from nonebot.adapters import Event as AbstractEvent
from nonebot.adapters.onebot.v11 import Bot, Event
from nonebot.adapters.onebot.v11 import Bot, Event, MessageEvent
from nonebot.adapters.onebot.v11.event import GroupMessageEvent, PrivateMessageEvent
from nonebot.adapters.onebot.v11.message import Message
from nonebot.adapters.onebot.v11.permission import GROUP_ADMIN, GROUP_OWNER
from nonebot.internal.params import ArgStr
from nonebot.internal.rule import Rule
from nonebot.log import logger
from nonebot.matcher import Matcher
from nonebot.params import Depends, EventToMe
from nonebot.params import Depends, EventMessage, EventPlainText, EventToMe
from nonebot.permission import SUPERUSER
from nonebot.rule import to_me
from nonebot.typing import T_State
@@ -15,7 +20,7 @@ from nonebot.typing import T_State
from .config import Config
from .platform import check_sub_target, platform_manager
from .plugin_config import plugin_config
from .types import Category, Target
from .types import Category, Target, User
from .utils import parse_text
@@ -44,7 +49,28 @@ common_platform = [
]
def ensure_user_info(matcher: Type[Matcher]):
async def _check_user_info(state: T_State):
if not state.get("target_user_info"):
await matcher.finish(
"No target_user_info set, this shouldn't happen, please issue"
)
return _check_user_info
async def set_target_user_info(event: MessageEvent, state: T_State):
if isinstance(event, GroupMessageEvent):
user = User(event.group_id, "group")
state["target_user_info"] = user
elif isinstance(event, PrivateMessageEvent):
user = User(event.user_id, "private")
state["target_user_info"] = user
def do_add_sub(add_sub: Type[Matcher]):
add_sub.handle()(ensure_user_info(add_sub))
@add_sub.handle()
async def init_promote(state: T_State):
state["_prompt"] = (
@@ -57,10 +83,10 @@ def do_add_sub(add_sub: Type[Matcher]):
for platform_name in common_platform
]
)
+ "要查看全部平台请输入:“全部”"
+ "要查看全部平台请输入:“全部”\n中止订阅过程请输入:“取消”"
)
async def parse_platform(event: AbstractEvent, state: T_State) -> None:
async def parse_platform(event: MessageEvent, state: T_State) -> None:
if not isinstance(state["platform"], Message):
return
platform = str(event.get_message()).strip()
@@ -72,6 +98,8 @@ def do_add_sub(add_sub: Type[Matcher]):
]
)
await add_sub.reject(message)
elif platform == "取消":
await add_sub.finish("已中止订阅")
elif platform in platform_manager:
state["platform"] = platform
else:
@@ -82,27 +110,44 @@ def do_add_sub(add_sub: Type[Matcher]):
)
async def init_id(state: T_State):
if platform_manager[state["platform"]].has_target:
state[
"_prompt"
] = "请输入订阅用户的id,详情查阅https://nonebot-bison.vercel.app/usage/#%E6%89%80%E6%94%AF%E6%8C%81%E5%B9%B3%E5%8F%B0%E7%9A%84uid"
state["_prompt"] = "请输入订阅用户的id:\n查询id获取方法请回复:“查询”"
else:
state["id"] = "default"
state["name"] = await platform_manager[state["platform"]].get_target_name(
Target("")
)
async def parse_id(event: AbstractEvent, state: T_State):
async def parse_id(event: MessageEvent, state: T_State):
if not isinstance(state["id"], Message):
return
target = str(event.get_message()).strip()
try:
if target == "查询":
raise LookupError
if target == "取消":
raise KeyboardInterrupt
name = await check_sub_target(state["platform"], target)
if not name:
raise ValueError
state["id"] = target
state["name"] = name
except:
except (LookupError):
url = "https://nonebot-bison.vercel.app/usage/#%E6%89%80%E6%94%AF%E6%8C%81%E5%B9%B3%E5%8F%B0%E7%9A%84-uid"
title = "Bison所支持的平台UID"
content = "查询相关平台的uid格式或获取方式"
image = "https://s3.bmp.ovh/imgs/2022/03/ab3cc45d83bd3dd3.jpg"
getId_share = f"[CQ:share,url={url},title={title},content={content},image={image}]" # 缩短字符串格式长度,以及方便后续修改为消息段格式
await add_sub.reject(Message(getId_share))
except (KeyboardInterrupt):
await add_sub.finish("已中止订阅")
except (ValueError):
await add_sub.reject("id输入错误")
else:
await add_sub.send(
"即将订阅的用户为:{} {} {}\n如有错误请输入“取消”重新订阅".format(
state["platform"], state["name"], state["id"]
)
)
@add_sub.got("id", _gen_prompt_template("{_prompt}"), [Depends(parse_id)])
async def init_cat(state: T_State):
@@ -113,12 +158,14 @@ def do_add_sub(add_sub: Type[Matcher]):
" ".join(list(platform_manager[state["platform"]].categories.values()))
)
async def parser_cats(event: AbstractEvent, state: T_State):
async def parser_cats(event: MessageEvent, state: T_State):
if not isinstance(state["cats"], Message):
return
res = []
for cat in str(event.get_message()).strip().split():
if cat not in platform_manager[state["platform"]].reverse_category:
if cat == "取消":
await add_sub.finish("已中止订阅")
elif cat not in platform_manager[state["platform"]].reverse_category:
await add_sub.reject("不支持 {}".format(cat))
res.append(platform_manager[state["platform"]].reverse_category[cat])
state["cats"] = res
@@ -130,9 +177,11 @@ def do_add_sub(add_sub: Type[Matcher]):
return
state["_prompt"] = '请输入要订阅的tag,订阅所有tag输入"全部标签"'
async def parser_tags(event: AbstractEvent, state: T_State):
async def parser_tags(event: MessageEvent, state: T_State):
if not isinstance(state["tags"], Message):
return
if str(event.get_message()).strip() == "取消": # 一般不会有叫 取消 的tag吧
await add_sub.finish("已中止订阅")
if str(event.get_message()).strip() == "全部标签":
state["tags"] = []
else:
@@ -141,9 +190,13 @@ def do_add_sub(add_sub: Type[Matcher]):
@add_sub.got("tags", _gen_prompt_template("{_prompt}"), [Depends(parser_tags)])
async def add_sub_process(event: Event, state: T_State):
config = Config()
user = state.get("target_user_info")
assert isinstance(user, User)
config.add_subscribe(
state.get("_user_id") or event.group_id,
user_type="group",
# state.get("_user_id") or event.group_id,
# user_type="group",
user=user.user,
user_type=user.user_type,
target=state["id"],
target_name=state["name"],
target_type=state["platform"],
@@ -154,11 +207,17 @@ def do_add_sub(add_sub: Type[Matcher]):
def do_query_sub(query_sub: Type[Matcher]):
query_sub.handle()(ensure_user_info(query_sub))
@query_sub.handle()
async def _(event: Event, state: T_State):
async def _(state: T_State):
config: Config = Config()
user_info = state["target_user_info"]
assert isinstance(user_info, User)
sub_list = config.list_subscribe(
state.get("_user_id") or event.group_id, "group"
# state.get("_user_id") or event.group_id, "group"
user_info.user,
user_info.user_type,
)
res = "订阅的帐号为:\n"
for sub in sub_list:
@@ -179,11 +238,17 @@ def do_query_sub(query_sub: Type[Matcher]):
def do_del_sub(del_sub: Type[Matcher]):
del_sub.handle()(ensure_user_info(del_sub))
@del_sub.handle()
async def send_list(bot: Bot, event: Event, state: T_State):
config: Config = Config()
user_info = state["target_user_info"]
assert isinstance(user_info, User)
sub_list = config.list_subscribe(
state.get("_user_id") or event.group_id, "group"
# state.get("_user_id") or event.group_id, "group"
user_info.user,
user_info.user_type,
)
res = "订阅的帐号为:\n"
state["sub_table"] = {}
@@ -213,9 +278,13 @@ def do_del_sub(del_sub: Type[Matcher]):
try:
index = int(str(event.get_message()).strip())
config = Config()
user_info = state["target_user_info"]
assert isinstance(user_info, User)
config.del_subscribe(
state.get("_user_id") or event.group_id,
"group",
# state.get("_user_id") or event.group_id,
# "group",
user_info.user,
user_info.user_type,
**state["sub_table"][index],
)
except Exception as e:
@@ -224,41 +293,19 @@ def do_del_sub(del_sub: Type[Matcher]):
await del_sub.finish("删除成功")
async def parse_group_number(event: AbstractEvent, state: T_State):
if not isinstance(state["_user_id"], Message):
return
state["_user_id"] = int(str(event.get_message()))
add_sub_matcher = on_command(
"添加订阅",
rule=configurable_to_me,
permission=GROUP_ADMIN | GROUP_OWNER | SUPERUSER,
priority=5,
)
add_sub_matcher.handle()(set_target_user_info)
do_add_sub(add_sub_matcher)
manage_add_sub_matcher = on_command("管理-添加订阅", permission=SUPERUSER, priority=5)
@manage_add_sub_matcher.got("_user_id", "群号", [Depends(parse_group_number)])
async def add_sub_handle():
pass
do_add_sub(manage_add_sub_matcher)
query_sub_matcher = on_command("查询订阅", rule=configurable_to_me, priority=5)
query_sub_matcher.handle()(set_target_user_info)
do_query_sub(query_sub_matcher)
manage_query_sub_matcher = on_command("管理-查询订阅", permission=SUPERUSER, priority=5)
@manage_query_sub_matcher.got("_user_id", "群号", [Depends(parse_group_number)])
async def query_sub_handle():
pass
do_query_sub(manage_query_sub_matcher)
del_sub_matcher = on_command(
@@ -267,13 +314,117 @@ del_sub_matcher = on_command(
permission=GROUP_ADMIN | GROUP_OWNER | SUPERUSER,
priority=5,
)
del_sub_matcher.handle()(set_target_user_info)
do_del_sub(del_sub_matcher)
manage_del_sub_matcher = on_command("管理-删除订阅", permission=SUPERUSER, priority=5)
group_manage_matcher = on_command("群管理")
@manage_del_sub_matcher.got("_user_id", "群号", [Depends(parse_group_number)])
async def del_sub_handle():
pass
@group_manage_matcher.handle()
async def send_group_list(bot: Bot, state: T_State):
groups = await bot.call_api("get_group_list")
res_text = "请选择需要管理的群:\n"
group_number_idx = {}
for idx, group in enumerate(groups, 1):
group_number_idx[idx] = group["group_id"]
res_text += f'{idx}. {group["group_id"]} - {group["group_name"]}\n'
res_text += "请输入左侧序号"
# await group_manage_matcher.send(res_text)
state["_prompt"] = res_text
state["group_number_idx"] = group_number_idx
do_del_sub(manage_del_sub_matcher)
async def _parse_group_idx(state: T_State, event_msg: str = EventPlainText()):
if not isinstance(state["group_idx"], Message):
return
group_number_idx: Optional[dict[int, int]] = state.get("group_number_idx")
assert group_number_idx
try:
idx = int(event_msg)
assert idx in group_number_idx.keys()
state["group_idx"] = idx
except:
await group_manage_matcher.reject("请输入正确序号")
@group_manage_matcher.got(
"group_idx", _gen_prompt_template("{_prompt}"), [Depends(_parse_group_idx)]
)
async def do_choose_group_number(state: T_State):
group_number_idx: dict[int, int] = state["group_number_idx"]
idx: int = state["group_idx"]
group_id = group_number_idx[idx]
state["target_user_info"] = User(user=group_id, user_type="group")
async def _check_command(event_msg: str = EventPlainText()):
if event_msg not in {"添加订阅", "查询订阅", "删除订阅"}:
await group_manage_matcher.reject("请输入正确的命令")
return
@group_manage_matcher.got(
"command", "请输入需要使用的命令:添加订阅,查询订阅,删除订阅", [Depends(_check_command)]
)
async def do_dispatch_command(
bot: Bot,
event: MessageEvent,
state: T_State,
matcher: Matcher,
command: str = ArgStr(),
):
permission = await matcher.update_permission(bot, event)
new_matcher = Matcher.new(
"message",
Rule(),
permission,
None,
True,
priority=0,
block=True,
plugin=matcher.plugin,
module=matcher.module,
expire_time=datetime.now() + bot.config.session_expire_timeout,
default_state=matcher.state,
default_type_updater=matcher.__class__._default_type_updater,
default_permission_updater=matcher.__class__._default_permission_updater,
)
if command == "查询订阅":
do_query_sub(new_matcher)
elif command == "添加订阅":
do_add_sub(new_matcher)
else:
do_del_sub(new_matcher)
new_matcher_ins = new_matcher()
asyncio.create_task(new_matcher_ins.run(bot, event, state))
test_matcher = on_command("testtt")
@test_matcher.handle()
async def _handler(bot: Bot, event: Event, matcher: Matcher, state: T_State):
permission = await matcher.update_permission(bot, event)
new_matcher = Matcher.new(
"message",
Rule(),
permission,
None,
True,
priority=0,
block=True,
plugin=matcher.plugin,
module=matcher.module,
expire_time=datetime.now() + bot.config.session_expire_timeout,
default_state=matcher.state,
default_type_updater=matcher.__class__._default_type_updater,
default_permission_updater=matcher.__class__._default_permission_updater,
)
async def h():
logger.warning("yes")
await new_matcher.send("666")
new_matcher.handle()(h)
new_matcher_ins = new_matcher()
await new_matcher_ins.run(bot, event, state)
@@ -3,10 +3,10 @@ from typing import Any
import httpx
from bs4 import BeautifulSoup as bs
from nonebot.plugin import require
from ..post import Post
from ..types import Category, RawPost, Target
from ..utils import Render
from .platform import CategoryNotSupport, NewMessage, StatusChange
@@ -46,21 +46,30 @@ class Arknights(NewMessage):
text = ""
async with httpx.AsyncClient() as client:
raw_html = await client.get(announce_url)
soup = bs(raw_html, "html.parser")
soup = bs(raw_html.text, "html.parser")
pics = []
if soup.find("div", class_="standerd-container"):
# 图文
render = Render()
viewport = {"width": 320, "height": 6400, "deviceScaleFactor": 3}
pic_data = await render.render(
announce_url, viewport=viewport, target="div.main"
require("nonebot_plugin_htmlrender")
from nonebot_plugin_htmlrender import capture_element
pic_data = await capture_element(
announce_url,
"div.main",
viewport={"width": 320, "height": 6400},
device_scale_factor=3,
)
# render = Render()
# viewport = {"width": 320, "height": 6400, "deviceScaleFactor": 3}
# pic_data = await render.render(
# announce_url, viewport=viewport, target="div.main"
# )
if pic_data:
pics.append(pic_data)
else:
text = "图片渲染失败"
elif pic := soup.find("img", class_="banner-image"):
pics.append(pic["src"])
pics.append(pic["src"]) # type: ignore
else:
raise CategoryNotSupport()
return Post(
@@ -1,77 +0,0 @@
import hashlib
import json
import re
from datetime import datetime
from typing import Any, Optional
import httpx
from bs4 import BeautifulSoup as bs
from ..types import *
# from .platform import Platform
# class Wechat(Platform):
# categories = {}
# enable_tag = False
# platform_name = 'wechat'
# enabled = False
# is_common = False
# name = '微信公众号'
# @classmethod
# def _get_query_url(cls, target: Target):
# return 'https://weixin.sogou.com/weixin?type=1&s_from=input&query={}&ie=utf8&_sug_=n&_sug_type_='.format(target)
# @classmethod
# async def _get_target_soup(cls, target: Target) -> Optional[bs]:
# target_url = cls._get_query_url(target)
# async with httpx.AsyncClient() as client:
# res = await client.get(target_url)
# soup = bs(res.text, 'html.parser')
# blocks = soup.find(class_='news-list2').find_all('li',recursive=False)
# for block in blocks:
# if block.find(string=[target]):
# return block
# @classmethod
# async def get_account_name(cls, target: Target) -> Optional[str]:
# if not (block := await cls._get_target_soup(target)):
# return None
# return block.find('p', class_='tit').find('a').text
# async def get_sub_list(self, target: Target) -> list[RawPost]:
# block = await self._get_target_soup(target)
# if (last_post_dt := block.find('dt', string='最近文章:')):
# post = {
# 'title': last_post_dt.find_parent().find('a').text,
# 'target': target,
# 'page_url': self._get_query_url(target),
# 'name': block.find('p', class_='tit').find('a').text
# }
# return [post]
# else:
# return []
# def get_id(self, post: RawPost) -> Any:
# return post['title']
# def get_date(self, post: RawPost):
# return None
# def get_tags(self, post: RawPost):
# return None
# def get_category(self, post: RawPost):
# return None
# async def parse(self, raw_post: RawPost) -> Post:
# # TODO get content of post
# return Post(target_type='wechat',
# text='{}\n详细内容请自行查看公众号'.format(raw_post['title']),
# target_name=raw_post['name'],
# pics=[],
# url=''
# )
+4 -8
View File
@@ -1,5 +1,3 @@
import warnings
import nonebot
from pydantic import BaseSettings
@@ -8,15 +6,15 @@ class PlugConfig(BaseSettings):
bison_config_path: str = ""
bison_use_pic: bool = False
bison_use_local: bool = False
bison_use_forward_pic = True#当图片超过1张时改为合并消息发送
#bison_browser: str = "local:C:\Program Files (x86)\Microsoft\Edge\Application\msedge.exe"
bison_browser: str = ""
bison_init_filter: bool = True
bison_use_queue: bool = True
bison_outer_url: str = "http://localhost:15556/bison/"
bison_filter_log: bool = False
bison_to_me: bool = True
bison_skip_browser_check: bool = False
bison_use_pic_merge: int = 0 # 多图片时启用图片合并转发(仅限群),当bison_use_queue为False时该配置不会生效
# 0:不启用;1:首条消息单独发送,剩余照片合并转发;2以及以上:所有消息全部合并转发
bison_resend_times: int = 0
class Config:
extra = "ignore"
@@ -24,5 +22,3 @@ class PlugConfig(BaseSettings):
global_config = nonebot.get_driver().config
plugin_config = PlugConfig(**global_config.dict())
if plugin_config.bison_use_local:
warnings.warn("BISON_USE_LOCAL is deprecated, please use BISON_BROWSER")
+53 -13
View File
@@ -2,23 +2,36 @@ from email import message
import time
from typing import List, Literal, Union
from nonebot.adapters import Message, MessageSegment
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message, MessageSegment
from nonebot.log import logger
from .plugin_config import plugin_config
QUEUE = []
QUEUE: list[
tuple[
Bot,
int,
Literal["private", "group", "group-forward"],
Union[str, Message],
int,
]
] = []
LAST_SEND_TIME = time.time()
async def _do_send(
bot: "Bot", user: str, user_type: str, msg: Union[str, Message, MessageSegment]
bot: "Bot",
user: int,
user_type: Literal["group", "private", "group-forward"],
msg: Union[str, Message],
):
if user_type == "group":
await bot.call_api("send_group_msg", group_id=user, message=msg)
await bot.send_group_msg(group_id=user, message=msg)
elif user_type == "private":
await bot.call_api("send_private_msg", user_id=user, message=msg)
await bot.send_private_msg(user_id=user, message=msg)
elif user_type == "group-forward":
await bot.send_group_forward_msg(group_id=user, messages=msg)
async def _do_send_forward(
bot: "Bot", user: str, msgs: list
@@ -56,13 +69,40 @@ async def do_send_msgs():
LAST_SEND_TIME = time.time()
async def send_msgs(bot: Bot, user, user_type: Literal["private", "group"], msgs: list):
if plugin_config.bison_use_forward_pic and user_type!="private":
if msgs:
_do_send_forward(bot, user, msgs)
elif plugin_config.bison_use_queue:
for msg in msgs:
QUEUE.append((bot, user, user_type, msg, 2))
async def _send_msgs_dispatch(
bot: Bot,
user,
user_type: Literal["private", "group", "group-forward"],
msg: Union[str, Message],
):
if plugin_config.bison_use_queue:
QUEUE.append((bot, user, user_type, msg, plugin_config.bison_resend_times))
else:
await _do_send(bot, user, user_type, msg)
async def send_msgs(bot: Bot, user, user_type: Literal["private", "group"], msgs: list):
if not plugin_config.bison_use_pic_merge or user_type == "private":
for msg in msgs:
await _do_send(bot, user, user_type, msg)
await _send_msgs_dispatch(bot, user, user_type, msg)
return
if plugin_config.bison_use_pic_merge == 1:
await _send_msgs_dispatch(bot, user, "group", msgs.pop(0))
if msgs:
if len(msgs) == 1: # 只有一条消息序列就不合并转发
await _send_msgs_dispatch(bot, user, "group", msgs.pop(0))
else:
group_bot_info = await bot.get_group_member_info(
group_id=user, user_id=int(bot.self_id), no_cache=True
) # 调用api获取群内bot的相关参数
forward_msg = Message(
[
MessageSegment.node_custom(
group_bot_info["user_id"],
nickname=group_bot_info["card"] or group_bot_info["nickname"],
content=msg,
)
for msg in msgs
]
)
await _send_msgs_dispatch(bot, user, "group-forward", forward_msg)
+1 -1
View File
@@ -9,7 +9,7 @@ Tag = str
@dataclass(eq=True, frozen=True)
class User:
user: str
user: int
user_type: Literal["group", "private"]
+10 -174
View File
@@ -1,23 +1,12 @@
import asyncio
import base64
import os
import platform
import re
import subprocess
import sys
from html import escape
from pathlib import Path
from time import asctime
from typing import Awaitable, Callable, Optional, Union
from typing import Union
import nonebot
from bs4 import BeautifulSoup as bs
from nonebot.adapters.onebot.v11.message import MessageSegment
from nonebot.log import default_format, logger
from playwright._impl._driver import compute_driver_executable
from playwright.async_api import Browser, Page, Playwright, async_playwright
from uvicorn import config
from uvicorn.loops import asyncio as _asyncio
from nonebot.plugin import require
from .plugin_config import plugin_config
@@ -31,158 +20,21 @@ class Singleton(type):
return cls._instances[cls]
@nonebot.get_driver().on_startup
def download_browser():
if not plugin_config.bison_browser and not plugin_config.bison_use_local:
system = platform.system()
if system == "Linux":
browser_path = Path.home() / ".cache" / "ms-playwright"
elif system == "Windows":
browser_path = Path.home() / "AppData" / "Local" / "ms-playwright"
else:
raise RuntimeError("platform not supported")
if browser_path.exists() and os.listdir(str(browser_path)):
logger.warning("Browser Exists, skip")
return
env = os.environ.copy()
driver_executable = compute_driver_executable()
env["PW_CLI_TARGET_LANG"] = "python"
subprocess.run([str(driver_executable), "install", "chromium"], env=env)
class Render(metaclass=Singleton):
def __init__(self):
self.lock = asyncio.Lock()
self.browser: Browser
self.interval_log = ""
self.remote_browser = False
async def get_browser(self, playwright: Playwright) -> Browser:
if plugin_config.bison_browser:
if plugin_config.bison_browser.startswith("local:"):
path = plugin_config.bison_browser.split("local:", 1)[1]
return await playwright.chromium.launch(
executable_path=path, args=["--no-sandbox"]
)
if plugin_config.bison_browser.startswith("ws:"):
self.remote_browser = True
return await playwright.chromium.connect(plugin_config.bison_browser)
if plugin_config.bison_browser.startswith("wsc:"):
self.remote_browser = True
return await playwright.chromium.connect_over_cdp(
"ws:" + plugin_config.bison_browser[4:]
)
raise RuntimeError("bison_BROWSER error")
if plugin_config.bison_use_local:
return await playwright.chromium.launch(
executable_path="/usr/bin/chromium", args=["--no-sandbox"]
)
return await playwright.chromium.launch(args=["--no-sandbox"])
async def close_browser(self):
if not self.remote_browser:
await self.browser.close()
async def render(
self,
url: str,
viewport: Optional[dict] = None,
target: Optional[str] = None,
operation: Optional[Callable[[Page], Awaitable[None]]] = None,
) -> Optional[bytes]:
retry_times = 0
self.interval_log = ""
while retry_times < 3:
try:
return await asyncio.wait_for(
self.do_render(url, viewport, target, operation), 20
)
except asyncio.TimeoutError:
retry_times += 1
logger.warning(
"render error {}\n".format(retry_times) + self.interval_log
)
self.interval_log = ""
# if self.browser:
# await self.browser.close()
# self.lock.release()
def _inter_log(self, message: str) -> None:
self.interval_log += asctime() + "" + message + "\n"
async def do_render(
self,
url: str,
viewport: Optional[dict] = None,
target: Optional[str] = None,
operation: Optional[Callable[[Page], Awaitable[None]]] = None,
) -> Optional[bytes]:
async with self.lock:
async with async_playwright() as playwright:
self.browser = await self.get_browser(playwright)
self._inter_log("open browser")
if viewport:
constext = await self.browser.new_context(
viewport={
"width": viewport["width"],
"height": viewport["height"],
},
device_scale_factor=viewport.get("deviceScaleFactor", 1),
)
page = await constext.new_page()
else:
page = await self.browser.new_page()
if operation:
await operation(page)
else:
await page.goto(url)
self._inter_log("open page")
if target:
target_ele = page.locator(target)
if not target_ele:
return None
data = await target_ele.screenshot(type="jpeg")
else:
data = await page.screenshot(type="jpeg")
self._inter_log("screenshot")
await page.close()
self._inter_log("close page")
await self.close_browser()
self._inter_log("close browser")
assert isinstance(data, bytes)
return data
async def text_to_pic(self, text: str) -> Optional[bytes]:
lines = text.split("\n")
parsed_lines = list(map(lambda x: "<p>{}</p>".format(escape(x)), lines))
html_text = '<div style="width:17em;padding:1em">{}</div>'.format(
"".join(parsed_lines)
)
url = "data:text/html;charset=UTF-8;base64,{}".format(
base64.b64encode(html_text.encode()).decode()
)
data = await self.render(url, target="div")
return data
async def text_to_pic_cqcode(self, text: str) -> MessageSegment:
data = await self.text_to_pic(text)
# logger.debug('file size: {}'.format(len(data)))
if data:
# logger.debug(code)
return MessageSegment.image(data)
else:
return MessageSegment.text("生成图片错误")
async def parse_text(text: str) -> MessageSegment:
"return raw text if don't use pic, otherwise return rendered opcode"
if plugin_config.bison_use_pic:
render = Render()
return await render.text_to_pic_cqcode(text)
require("nonebot_plugin_htmlrender")
from nonebot_plugin_htmlrender import text_to_pic as _text_to_pic
return MessageSegment.image(await _text_to_pic(text))
else:
return MessageSegment.text(text)
if not plugin_config.bison_skip_browser_check:
require("nonebot_plugin_htmlrender")
def html_to_text(html: str, query_dict: dict = {}) -> str:
html = re.sub(r"<br\s*/?>", "<br>\n", html)
html = html.replace("</p>", "</p>\n")
@@ -233,19 +85,3 @@ if plugin_config.bison_filter_log:
if config.log_level is None
else config.log_level
)
# monkey patch
def asyncio_setup():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
@property
def should_reload(self):
return False
if platform.system() == "Windows":
_asyncio.asyncio_setup = asyncio_setup
config.Config.should_reload = should_reload # type:ignore
logger.warning("检测到当前为 Windows 系统,已自动注入猴子补丁")