From 8e3de419d37cd24a8988b11846eec19e0a47b760 Mon Sep 17 00:00:00 2001 From: felinae98 <731499577@qq.com> Date: Thu, 10 Feb 2022 16:17:19 +0800 Subject: [PATCH] disable update browser --- src/plugins/nonebot_bison/utils.py | 171 ++++++++++++++++++----------- 1 file changed, 109 insertions(+), 62 deletions(-) diff --git a/src/plugins/nonebot_bison/utils.py b/src/plugins/nonebot_bison/utils.py index 073c62e..1570f5e 100644 --- a/src/plugins/nonebot_bison/utils.py +++ b/src/plugins/nonebot_bison/utils.py @@ -1,36 +1,48 @@ import asyncio import base64 -from html import escape import os import platform import re import subprocess import sys +from html import escape +from pathlib import Path from time import asctime from typing import Awaitable, Callable, Optional, Union -from bs4 import BeautifulSoup as bs import nonebot +from bs4 import BeautifulSoup as bs from nonebot.adapters.cqhttp.message import MessageSegment -from nonebot.log import logger -from nonebot.log import default_format +from nonebot.log import default_format, logger from playwright._impl._driver import compute_driver_executable from playwright.async_api import Browser, Page, Playwright, async_playwright -from uvicorn.loops import asyncio as _asyncio from uvicorn import config +from uvicorn.loops import asyncio as _asyncio from .plugin_config import plugin_config + class Singleton(type): _instances = {} + def __call__(cls, *args, **kwargs): if cls not in cls._instances: cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) return cls._instances[cls] + @nonebot.get_driver().on_startup def download_browser(): if not plugin_config.bison_browser and not plugin_config.bison_use_local: + system = platform.system() + if system == "Linux": + browser_path = Path.home() / ".cache" / "ms-playwright" + elif system == "Windows": + browser_path = Path.home() / "AppData" / "Local" / "ms-playwright" + else: + raise RuntimeError("platform not supported") + if browser_path.exists() and os.listdir(str(browser_path)): + logger.warning("Browser Exists, skip") env = os.environ.copy() driver_executable = compute_driver_executable() env["PW_CLI_TARGET_LANG"] = "python" @@ -38,65 +50,84 @@ def download_browser(): class Render(metaclass=Singleton): - def __init__(self): self.lock = asyncio.Lock() self.browser: Browser - self.interval_log = '' + self.interval_log = "" self.remote_browser = False async def get_browser(self, playwright: Playwright) -> Browser: if plugin_config.bison_browser: - if plugin_config.bison_browser.startswith('local:'): - path = plugin_config.bison_browser.split('local:', 1)[1] + if plugin_config.bison_browser.startswith("local:"): + path = plugin_config.bison_browser.split("local:", 1)[1] return await playwright.chromium.launch( - executable_path=path, args=['--no-sandbox']) - if plugin_config.bison_browser.startswith('ws:'): + executable_path=path, args=["--no-sandbox"] + ) + if plugin_config.bison_browser.startswith("ws:"): self.remote_browser = True return await playwright.chromium.connect(plugin_config.bison_browser) - if plugin_config.bison_browser.startswith('wsc:'): + if plugin_config.bison_browser.startswith("wsc:"): self.remote_browser = True return await playwright.chromium.connect_over_cdp( - 'ws:' + plugin_config.bison_browser[4:] - ) - raise RuntimeError('bison_BROWSER error') + "ws:" + plugin_config.bison_browser[4:] + ) + raise RuntimeError("bison_BROWSER error") if plugin_config.bison_use_local: return await playwright.chromium.launch( - executable_path='/usr/bin/chromium', args=['--no-sandbox']) - return await playwright.chromium.launch(args=['--no-sandbox']) + executable_path="/usr/bin/chromium", args=["--no-sandbox"] + ) + return await playwright.chromium.launch(args=["--no-sandbox"]) async def close_browser(self): if not self.remote_browser: await self.browser.close() - async def render(self, url: str, viewport: Optional[dict] = None, target: Optional[str] = None, - operation: Optional[Callable[[Page], Awaitable[None]]] = None) -> Optional[bytes]: + async def render( + self, + url: str, + viewport: Optional[dict] = None, + target: Optional[str] = None, + operation: Optional[Callable[[Page], Awaitable[None]]] = None, + ) -> Optional[bytes]: retry_times = 0 - self.interval_log = '' + self.interval_log = "" while retry_times < 3: try: - return await asyncio.wait_for(self.do_render(url, viewport, target, operation), 20) + return await asyncio.wait_for( + self.do_render(url, viewport, target, operation), 20 + ) except asyncio.TimeoutError: retry_times += 1 - logger.warning("render error {}\n".format(retry_times) + self.interval_log) - self.interval_log = '' + logger.warning( + "render error {}\n".format(retry_times) + self.interval_log + ) + self.interval_log = "" # if self.browser: # await self.browser.close() # self.lock.release() def _inter_log(self, message: str) -> None: - self.interval_log += asctime() + '' + message + '\n' + self.interval_log += asctime() + "" + message + "\n" - async def do_render(self, url: str, viewport: Optional[dict] = None, target: Optional[str] = None, - operation: Optional[Callable[[Page], Awaitable[None]]] = None) -> Optional[bytes]: + async def do_render( + self, + url: str, + viewport: Optional[dict] = None, + target: Optional[str] = None, + operation: Optional[Callable[[Page], Awaitable[None]]] = None, + ) -> Optional[bytes]: async with self.lock: async with async_playwright() as playwright: self.browser = await self.get_browser(playwright) - self._inter_log('open browser') + self._inter_log("open browser") if viewport: constext = await self.browser.new_context( - viewport={'width': viewport['width'], 'height': viewport['height']}, - device_scale_factor=viewport.get('deviceScaleFactor', 1)) + viewport={ + "width": viewport["width"], + "height": viewport["height"], + }, + device_scale_factor=viewport.get("deviceScaleFactor", 1), + ) page = await constext.new_page() else: page = await self.browser.new_page() @@ -104,51 +135,57 @@ class Render(metaclass=Singleton): await operation(page) else: await page.goto(url) - self._inter_log('open page') + self._inter_log("open page") if target: target_ele = page.locator(target) if not target_ele: return None - data = await target_ele.screenshot(type='jpeg') + data = await target_ele.screenshot(type="jpeg") else: - data = await page.screenshot(type='jpeg') - self._inter_log('screenshot') + data = await page.screenshot(type="jpeg") + self._inter_log("screenshot") await page.close() - self._inter_log('close page') + self._inter_log("close page") await self.close_browser() - self._inter_log('close browser') - assert(isinstance(data, bytes)) + self._inter_log("close browser") + assert isinstance(data, bytes) return data async def text_to_pic(self, text: str) -> Optional[bytes]: - lines = text.split('\n') - parsed_lines = list(map(lambda x: '

{}

'.format(escape(x)), lines)) - html_text = '
{}
'.format(''.join(parsed_lines)) - url = 'data:text/html;charset=UTF-8;base64,{}'.format(base64.b64encode(html_text.encode()).decode()) - data = await self.render(url, target='div') + lines = text.split("\n") + parsed_lines = list(map(lambda x: "

{}

".format(escape(x)), lines)) + html_text = '
{}
'.format( + "".join(parsed_lines) + ) + url = "data:text/html;charset=UTF-8;base64,{}".format( + base64.b64encode(html_text.encode()).decode() + ) + data = await self.render(url, target="div") return data - async def text_to_pic_cqcode(self, text:str) -> MessageSegment: + async def text_to_pic_cqcode(self, text: str) -> MessageSegment: data = await self.text_to_pic(text) # logger.debug('file size: {}'.format(len(data))) if data: # logger.debug(code) return MessageSegment.image(data) else: - return MessageSegment.text('生成图片错误') + return MessageSegment.text("生成图片错误") + async def parse_text(text: str) -> MessageSegment: - 'return raw text if don\'t use pic, otherwise return rendered opcode' + "return raw text if don't use pic, otherwise return rendered opcode" if plugin_config.bison_use_pic: render = Render() return await render.text_to_pic_cqcode(text) else: return MessageSegment.text(text) + def html_to_text(html: str, query_dict: dict = {}) -> str: - html = re.sub(r'', '
\n', html) - html = html.replace('

', '

\n') - soup = bs(html, 'html.parser') + html = re.sub(r"", "
\n", html) + html = html.replace("

", "

\n") + soup = bs(html, "html.parser") if query_dict: node = soup.find(**query_dict) else: @@ -158,7 +195,6 @@ def html_to_text(html: str, query_dict: dict = {}) -> str: class Filter: - def __init__(self) -> None: self.level: Union[int, str] = "DEBUG" @@ -168,36 +204,47 @@ class Filter: if module: module_name = getattr(module, "__module_name__", module_name) record["name"] = module_name.split(".")[0] - levelno = logger.level(self.level).no if isinstance(self.level, - str) else self.level + levelno = ( + logger.level(self.level).no if isinstance(self.level, str) else self.level + ) nonebot_warning_level = logger.level("WARNING").no - return record["level"].no >= levelno if record["name"] != "nonebot" \ - else record["level"].no >= nonebot_warning_level + return ( + record["level"].no >= levelno + if record["name"] != "nonebot" + else record["level"].no >= nonebot_warning_level + ) + if plugin_config.bison_filter_log: logger.remove() default_filter = Filter() - logger.add(sys.stdout, - colorize=True, - diagnose=False, - filter=default_filter, - format=default_format) + logger.add( + sys.stdout, + colorize=True, + diagnose=False, + filter=default_filter, + format=default_format, + ) config = nonebot.get_driver().config - logger.success("Muted info & success from nonebot") + logger.success("Muted info & success from nonebot") default_filter.level = ( - "DEBUG" if config.debug else - "INFO") if config.log_level is None else config.log_level + ("DEBUG" if config.debug else "INFO") + if config.log_level is None + else config.log_level + ) # monkey patch def asyncio_setup(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) + @property def should_reload(self): return False + if platform.system() == "Windows": _asyncio.asyncio_setup = asyncio_setup - config.Config.should_reload = should_reload # type:ignore - logger.warning('检测到当前为 Windows 系统,已自动注入猴子补丁') + config.Config.should_reload = should_reload # type:ignore + logger.warning("检测到当前为 Windows 系统,已自动注入猴子补丁")