Azide e2a97a9e56
适配小刻食堂平台 (#379)
* 🐛 插入新的Schedulable时应传入use_batch参数

*  适配ceobecanteen平台

Co-authored-by: phidiaLam <2957035701@qq.com>

*   明日方舟公告与官网采用截图分享 (#480)

*  明日方舟公告与官网采用截图分享

* 💄 auto fix by pre-commit hooks

* 🐛 修复缺少的导入,优化逻辑

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Azide <rukuy@qq.com>

* 🐛 优化截图图片效果

* 🐛 修复错误将转发内图片视作头图的问题

* 🍱 使用正式 Bison Logo

* 💄 auto fix by pre-commit hooks

* 🐛 请求小刻API时不在headers里添加过多字段

* 🐛 get_comb_id方法删除无用的targets参数

* 💡 get_comb_id方法更新注释

* 🔥 移除发送部分的更改

*  在命名中明确表示cond_func意图

* ♻️ 拆分get_comb_id功能

* ♻️ 调整缓存逻辑

*  使用uri在theme中调用platform截图

* ♻️ 重构截图逻辑

*  添加模糊匹配提示

*  适配新版Site

* 💄 auto fix by pre-commit hooks

* 🐛 去掉不必要的排序

* 🐛 修正不应出现的驼峰变量名

* ♻️ 按review意见修改

* ♻️ 调整截图函数逻辑

* 🔊 调低日志等级

* ✏️ 修复一些拼写和格式

---------

Co-authored-by: phidiaLam <2957035701@qq.com>
Co-authored-by: 洛梧藤 <67498817+phidiaLam@users.noreply.github.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
2024-07-13 01:06:42 +08:00

145 lines
4.6 KiB
Python

from io import BytesIO
from functools import partial
from typing import Literal, TypeGuard
from yarl import URL
from PIL import Image
from httpx import AsyncClient
from nonebot import logger, require
from PIL.Image import Image as PILImage
from nonebot_plugin_saa import Text as SaaText
from nonebot_plugin_saa import Image as SaaImage
from ..plugin_config import plugin_config
async def pic_url_to_image(data: str | bytes, http_client: AsyncClient) -> PILImage:
pic_buffer = BytesIO()
if isinstance(data, str):
res = await http_client.get(data)
pic_buffer.write(res.content)
else:
pic_buffer.write(data)
return Image.open(pic_buffer)
def _check_image_square(size: tuple[int, int]) -> bool:
return abs(size[0] - size[1]) / size[0] < 0.05
async def pic_merge(pics: list[str | bytes], http_client: AsyncClient) -> list[str | bytes]:
if len(pics) < 3:
return pics
_pic_url_to_image = partial(pic_url_to_image, http_client=http_client)
first_image = await _pic_url_to_image(pics[0])
if not _check_image_square(first_image.size):
return pics
images: list[PILImage] = [first_image]
# first row
for i in range(1, 3):
cur_img = await _pic_url_to_image(pics[i])
if not _check_image_square(cur_img.size):
return pics
if cur_img.size[1] != images[0].size[1]: # height not equal
return pics
images.append(cur_img)
_tmp = 0
x_coord = [0]
for i in range(3):
_tmp += images[i].size[0]
x_coord.append(_tmp)
y_coord = [0, first_image.size[1]]
async def process_row(row: int) -> bool:
if len(pics) < (row + 1) * 3:
return False
row_first_img = await _pic_url_to_image(pics[row * 3])
if not _check_image_square(row_first_img.size):
return False
if row_first_img.size[0] != images[0].size[0]:
return False
image_row: list[PILImage] = [row_first_img]
for i in range(row * 3 + 1, row * 3 + 3):
cur_img = await _pic_url_to_image(pics[i])
if not _check_image_square(cur_img.size):
return False
if cur_img.size[1] != row_first_img.size[1]:
return False
if cur_img.size[0] != images[i % 3].size[0]:
return False
image_row.append(cur_img)
images.extend(image_row)
y_coord.append(y_coord[-1] + row_first_img.size[1])
return True
if await process_row(1):
matrix = (3, 2)
else:
matrix = (3, 1)
if await process_row(2):
matrix = (3, 3)
logger.info("trigger merge image")
target = Image.new("RGB", (x_coord[-1], y_coord[-1]))
for y in range(matrix[1]):
for x in range(matrix[0]):
target.paste(
images[y * matrix[0] + x],
(x_coord[x], y_coord[y], x_coord[x + 1], y_coord[y + 1]),
)
target_io = BytesIO()
target.save(target_io, "JPEG")
pics = pics[matrix[0] * matrix[1] :]
pics.insert(0, target_io.getvalue())
return pics
def is_pics_mergable(imgs: list) -> TypeGuard[list[str | bytes]]:
if any(not isinstance(img, str | bytes) for img in imgs):
return False
url = [URL(img) for img in imgs if isinstance(img, str)]
return all(u.scheme in ("http", "https") for u in url)
async def text_to_image(saa_text: SaaText) -> SaaImage:
"""使用 htmlrender 将 saa.Text 渲染为 saa.Image"""
if not plugin_config.bison_use_pic:
raise ValueError("请启用 bison_use_pic")
require("nonebot_plugin_htmlrender")
from nonebot_plugin_htmlrender import text_to_pic
render_data = await text_to_pic(str(saa_text))
return SaaImage(render_data)
async def capture_html(
url: str,
selector: str,
timeout: float = 0,
type: Literal["jpeg", "png"] = "png",
quality: int | None = None,
wait_until: Literal["commit", "domcontentloaded", "load", "networkidle"] | None = None,
viewport: dict = {"width": 1024, "height": 990},
device_scale_factor: int = 2,
**page_kwargs,
) -> bytes | None:
"""
将给定的url网页的指定CSS选择器部分渲染成图片
timeout: 超时时间,单位毫秒
"""
require("nonebot_plugin_htmlrender")
from nonebot_plugin_htmlrender import get_new_page
assert url
async with get_new_page(device_scale_factor=device_scale_factor, viewport=viewport, **page_kwargs) as page:
await page.goto(url, timeout=timeout, wait_until=wait_until)
pic_data = await page.locator(selector).screenshot(
type=type,
quality=quality,
)
return pic_data