mirror of
https://github.com/suyiiyii/nonebot-bison.git
synced 2025-06-04 02:26:11 +08:00
* ✨ 使用新接口 * ♻️ 调整刷新逻辑 * 🐛 调整刷新逻辑 * ♻️ 将单个哔哩哔哩文件拆开 * 🐛 修修补补边界情况 * ✨ 添加UID:xxx匹配 * ✅ 调整测试中的导入 * ✅ 调整测试的断言 * 🐛 添加unicode字符的escape * ✨ 不再主动刷新cookies * 🔀 适配新版Site * 🐛 解析live_rcmd中的json string * 🚨 make ruff happy * 🐛 调整并测试bilibili retry函数 * ✅ 修正测试 * ♻️ 按review意见调整 * ♻️ 清理一些遗留的复杂写法 * ♻️ 移出函数内的NameTuple * 🔇 删除不必要的日志输出 Co-authored-by: felinae98 <731499577@qq.com> * Update nonebot_bison/platform/bilibili/scheduler.py * Update scheduler.py --------- Co-authored-by: felinae98 <731499577@qq.com>
111 lines
3.6 KiB
Python
111 lines
3.6 KiB
Python
import re
|
||
import sys
|
||
import difflib
|
||
|
||
import nonebot
|
||
from nonebot.plugin import require
|
||
from bs4 import BeautifulSoup as bs
|
||
from nonebot.log import logger, default_format
|
||
from nonebot_plugin_saa import Text, Image, MessageSegmentFactory
|
||
|
||
from .site import Site as Site
|
||
from ..plugin_config import plugin_config
|
||
from .image import pic_merge as pic_merge
|
||
from .http import http_client as http_client
|
||
from .site import ClientManager as ClientManager
|
||
from .image import text_to_image as text_to_image
|
||
from .site import anonymous_site as anonymous_site
|
||
from .context import ProcessContext as ProcessContext
|
||
from .image import is_pics_mergable as is_pics_mergable
|
||
from .image import pic_url_to_image as pic_url_to_image
|
||
from .site import DefaultClientManager as DefaultClientManager
|
||
|
||
|
||
class Singleton(type):
|
||
_instances = {}
|
||
|
||
def __call__(cls, *args, **kwargs):
|
||
if cls not in cls._instances:
|
||
cls._instances[cls] = super().__call__(*args, **kwargs)
|
||
return cls._instances[cls]
|
||
|
||
|
||
async def parse_text(text: str) -> MessageSegmentFactory:
|
||
"return raw text if don't use pic, otherwise return rendered opcode"
|
||
if plugin_config.bison_use_pic:
|
||
require("nonebot_plugin_htmlrender")
|
||
from nonebot_plugin_htmlrender import text_to_pic as _text_to_pic
|
||
|
||
return Image(await _text_to_pic(text))
|
||
else:
|
||
return Text(text)
|
||
|
||
|
||
if not plugin_config.bison_skip_browser_check:
|
||
require("nonebot_plugin_htmlrender")
|
||
|
||
|
||
def html_to_text(html: str, query_dict: dict = {}) -> str:
|
||
html = re.sub(r"<br\s*/?>", "<br>\n", html)
|
||
html = html.replace("</p>", "</p>\n")
|
||
soup = bs(html, "html.parser")
|
||
if query_dict:
|
||
node = soup.find(**query_dict)
|
||
else:
|
||
node = soup
|
||
assert node is not None
|
||
return node.text.strip()
|
||
|
||
|
||
class Filter:
|
||
def __init__(self) -> None:
|
||
self.level: int | str = "DEBUG"
|
||
|
||
def __call__(self, record):
|
||
module_name: str = record["name"]
|
||
module = sys.modules.get(module_name)
|
||
if module:
|
||
module_name = getattr(module, "__module_name__", module_name)
|
||
record["name"] = module_name.split(".")[0]
|
||
levelno = logger.level(self.level).no if isinstance(self.level, str) else self.level
|
||
nonebot_warning_level = logger.level("WARNING").no
|
||
return (
|
||
record["level"].no >= levelno
|
||
if record["name"] != "nonebot"
|
||
else record["level"].no >= nonebot_warning_level
|
||
)
|
||
|
||
|
||
if plugin_config.bison_filter_log:
|
||
logger.remove()
|
||
default_filter = Filter()
|
||
logger.add(
|
||
sys.stdout,
|
||
colorize=True,
|
||
diagnose=False,
|
||
filter=default_filter,
|
||
format=default_format,
|
||
)
|
||
config = nonebot.get_driver().config
|
||
logger.success("Muted info & success from nonebot")
|
||
default_filter.level = ("DEBUG" if config.debug else "INFO") if config.log_level is None else config.log_level
|
||
|
||
|
||
def text_similarity(str1: str, str2: str) -> float:
|
||
"""利用最长公共子序列的算法判断两个字符串是否相似,并返回0到1.0的相似度"""
|
||
if len(str1) == 0 or len(str2) == 0:
|
||
raise ValueError("The length of string can not be 0")
|
||
matcher = difflib.SequenceMatcher(None, str1, str2)
|
||
t = sum(temp.size for temp in matcher.get_matching_blocks())
|
||
return t / min(len(str1), len(str2))
|
||
|
||
|
||
def decode_unicode_escapes(s: str):
|
||
"""解码 \\r, \\n, \\t, \\uXXXX 等转义序列"""
|
||
|
||
def decode_match(match: re.Match[str]) -> str:
|
||
return bytes(match.group(0), "utf-8").decode("unicode_escape")
|
||
|
||
regex = re.compile(r"\\[rnt]|\\u[0-9a-fA-F]{4}")
|
||
return regex.sub(decode_match, s)
|