mirror of
https://github.com/suyiiyii/nonebot-bison.git
synced 2025-06-05 19:36:43 +08:00
* 🐛 插入新的Schedulable时应传入use_batch参数 * ✨ 适配ceobecanteen平台 Co-authored-by: phidiaLam <2957035701@qq.com> * ✨ ✨ 明日方舟公告与官网采用截图分享 (#480) * ✨ 明日方舟公告与官网采用截图分享 * 💄 auto fix by pre-commit hooks * 🐛 修复缺少的导入,优化逻辑 --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Azide <rukuy@qq.com> * 🐛 优化截图图片效果 * 🐛 修复错误将转发内图片视作头图的问题 * 🍱 使用正式 Bison Logo * 💄 auto fix by pre-commit hooks * 🐛 请求小刻API时不在headers里添加过多字段 * 🐛 get_comb_id方法删除无用的targets参数 * 💡 get_comb_id方法更新注释 * 🔥 移除发送部分的更改 * ✨ 在命名中明确表示cond_func意图 * ♻️ 拆分get_comb_id功能 * ♻️ 调整缓存逻辑 * ✨ 使用uri在theme中调用platform截图 * ♻️ 重构截图逻辑 * ✨ 添加模糊匹配提示 * ✨ 适配新版Site * 💄 auto fix by pre-commit hooks * 🐛 去掉不必要的排序 * 🐛 修正不应出现的驼峰变量名 * ♻️ 按review意见修改 * ♻️ 调整截图函数逻辑 * 🔊 调低日志等级 * ✏️ 修复一些拼写和格式 --------- Co-authored-by: phidiaLam <2957035701@qq.com> Co-authored-by: 洛梧藤 <67498817+phidiaLam@users.noreply.github.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
145 lines
4.6 KiB
Python
145 lines
4.6 KiB
Python
from io import BytesIO
|
|
from functools import partial
|
|
from typing import Literal, TypeGuard
|
|
|
|
from yarl import URL
|
|
from PIL import Image
|
|
from httpx import AsyncClient
|
|
from nonebot import logger, require
|
|
from PIL.Image import Image as PILImage
|
|
from nonebot_plugin_saa import Text as SaaText
|
|
from nonebot_plugin_saa import Image as SaaImage
|
|
|
|
from ..plugin_config import plugin_config
|
|
|
|
|
|
async def pic_url_to_image(data: str | bytes, http_client: AsyncClient) -> PILImage:
|
|
pic_buffer = BytesIO()
|
|
if isinstance(data, str):
|
|
res = await http_client.get(data)
|
|
pic_buffer.write(res.content)
|
|
else:
|
|
pic_buffer.write(data)
|
|
return Image.open(pic_buffer)
|
|
|
|
|
|
def _check_image_square(size: tuple[int, int]) -> bool:
|
|
return abs(size[0] - size[1]) / size[0] < 0.05
|
|
|
|
|
|
async def pic_merge(pics: list[str | bytes], http_client: AsyncClient) -> list[str | bytes]:
|
|
if len(pics) < 3:
|
|
return pics
|
|
|
|
_pic_url_to_image = partial(pic_url_to_image, http_client=http_client)
|
|
|
|
first_image = await _pic_url_to_image(pics[0])
|
|
if not _check_image_square(first_image.size):
|
|
return pics
|
|
images: list[PILImage] = [first_image]
|
|
# first row
|
|
for i in range(1, 3):
|
|
cur_img = await _pic_url_to_image(pics[i])
|
|
if not _check_image_square(cur_img.size):
|
|
return pics
|
|
if cur_img.size[1] != images[0].size[1]: # height not equal
|
|
return pics
|
|
images.append(cur_img)
|
|
_tmp = 0
|
|
x_coord = [0]
|
|
for i in range(3):
|
|
_tmp += images[i].size[0]
|
|
x_coord.append(_tmp)
|
|
y_coord = [0, first_image.size[1]]
|
|
|
|
async def process_row(row: int) -> bool:
|
|
if len(pics) < (row + 1) * 3:
|
|
return False
|
|
row_first_img = await _pic_url_to_image(pics[row * 3])
|
|
if not _check_image_square(row_first_img.size):
|
|
return False
|
|
if row_first_img.size[0] != images[0].size[0]:
|
|
return False
|
|
image_row: list[PILImage] = [row_first_img]
|
|
for i in range(row * 3 + 1, row * 3 + 3):
|
|
cur_img = await _pic_url_to_image(pics[i])
|
|
if not _check_image_square(cur_img.size):
|
|
return False
|
|
if cur_img.size[1] != row_first_img.size[1]:
|
|
return False
|
|
if cur_img.size[0] != images[i % 3].size[0]:
|
|
return False
|
|
image_row.append(cur_img)
|
|
images.extend(image_row)
|
|
y_coord.append(y_coord[-1] + row_first_img.size[1])
|
|
return True
|
|
|
|
if await process_row(1):
|
|
matrix = (3, 2)
|
|
else:
|
|
matrix = (3, 1)
|
|
if await process_row(2):
|
|
matrix = (3, 3)
|
|
logger.info("trigger merge image")
|
|
target = Image.new("RGB", (x_coord[-1], y_coord[-1]))
|
|
for y in range(matrix[1]):
|
|
for x in range(matrix[0]):
|
|
target.paste(
|
|
images[y * matrix[0] + x],
|
|
(x_coord[x], y_coord[y], x_coord[x + 1], y_coord[y + 1]),
|
|
)
|
|
target_io = BytesIO()
|
|
target.save(target_io, "JPEG")
|
|
pics = pics[matrix[0] * matrix[1] :]
|
|
pics.insert(0, target_io.getvalue())
|
|
|
|
return pics
|
|
|
|
|
|
def is_pics_mergable(imgs: list) -> TypeGuard[list[str | bytes]]:
|
|
if any(not isinstance(img, str | bytes) for img in imgs):
|
|
return False
|
|
|
|
url = [URL(img) for img in imgs if isinstance(img, str)]
|
|
return all(u.scheme in ("http", "https") for u in url)
|
|
|
|
|
|
async def text_to_image(saa_text: SaaText) -> SaaImage:
|
|
"""使用 htmlrender 将 saa.Text 渲染为 saa.Image"""
|
|
if not plugin_config.bison_use_pic:
|
|
raise ValueError("请启用 bison_use_pic")
|
|
require("nonebot_plugin_htmlrender")
|
|
from nonebot_plugin_htmlrender import text_to_pic
|
|
|
|
render_data = await text_to_pic(str(saa_text))
|
|
return SaaImage(render_data)
|
|
|
|
|
|
async def capture_html(
|
|
url: str,
|
|
selector: str,
|
|
timeout: float = 0,
|
|
type: Literal["jpeg", "png"] = "png",
|
|
quality: int | None = None,
|
|
wait_until: Literal["commit", "domcontentloaded", "load", "networkidle"] | None = None,
|
|
viewport: dict = {"width": 1024, "height": 990},
|
|
device_scale_factor: int = 2,
|
|
**page_kwargs,
|
|
) -> bytes | None:
|
|
"""
|
|
将给定的url网页的指定CSS选择器部分渲染成图片
|
|
|
|
timeout: 超时时间,单位毫秒
|
|
"""
|
|
require("nonebot_plugin_htmlrender")
|
|
from nonebot_plugin_htmlrender import get_new_page
|
|
|
|
assert url
|
|
async with get_new_page(device_scale_factor=device_scale_factor, viewport=viewport, **page_kwargs) as page:
|
|
await page.goto(url, timeout=timeout, wait_until=wait_until)
|
|
pic_data = await page.locator(selector).screenshot(
|
|
type=type,
|
|
quality=quality,
|
|
)
|
|
return pic_data
|