Cateon Huo 9e5dcb3912
实现 Post.content 相关扩展协议 (#553)
*  `post` 新增 `get_content()` 方法

* 🚨 make linter happy

* 💄 auto fix by pre-commit hooks

* 🐛 fix: 调整函数使用

* 💄 auto fix by pre-commit hooks

*  转用函数处理文本

* 💄 auto fix by pre-commit hooks

* 🔨 使用`Dict`存储`content_handlers`

* 💄 auto fix by pre-commit hooks

* 🎨 fix

* :arts: 简化函数使用

* 🐛 移除`Theme`的过时参数

* 🗑️ 复用 `self.plain_content`

* 💄 auto fix by pre-commit hooks

*  注册式装饰器写法

* 💄 auto fix by pre-commit hooks

* 🐛 fix

* 💄 auto fix by pre-commit hooks

* :feat: 通用纯文本处理函数

* 💄 auto fix by pre-commit hooks

* :downgrade: 复用`==`处理标题

* 🎨 简化(?)写法

*  测试修复

* ♻️ via ContentSupport extensions

* 🐛 fix test

* 💄 auto fix by pre-commit hooks

* 🐛 for clean text

* 🐛 fix

* 💄 auto fix by pre-commit hooks

* fix: for xml

* 💄 auto fix by pre-commit hooks

* chore: art

* 💄 auto fix by pre-commit hooks

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
2024-08-04 18:39:12 +08:00

240 lines
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from io import BytesIO
from pathlib import Path
from datetime import datetime
from typing import TYPE_CHECKING, Literal
import jinja2
from yarl import URL
from httpx import AsyncClient
from pydantic import BaseModel
from PIL import Image as PILImage
from nonebot_plugin_saa import Text, Image, MessageSegmentFactory
from nonebot_bison.compat import model_validator
from nonebot_bison.utils import pic_merge, is_pics_mergable
from nonebot_bison.theme.utils import convert_to_qr, web_embed_image
from nonebot_bison.theme import Theme, ThemeRenderError, ThemeRenderUnsupportError
if TYPE_CHECKING:
from nonebot_bison.post import Post
class CeobeInfo(BaseModel):
"""卡片的信息部分
datasource: 数据来源
time: 时间
"""
datasource: str
time: str
class CeoboContent(BaseModel):
"""卡片的内容部分
image: 图片链接
text: 文字内容
"""
image: str | None = None
text: str
class CeoboRetweet(BaseModel):
"""卡片的转发部分
author: 原作者
image: 图片链接
content: 文字内容
"""
image: str | None
content: str | None
author: str
@model_validator(mode="before")
def check(cls, values):
if values["image"] is None and values["content"] is None:
raise ValueError("image and content cannot be both None")
return values
class CeobeCard(BaseModel):
info: CeobeInfo
content: CeoboContent
qr: str | None
retweet: CeoboRetweet | None
class CeobeCanteenTheme(Theme):
"""小刻食堂 分享卡片风格主题
需要安装`nonebot_plugin_htmlrender`插件
"""
name: Literal["ceobecanteen"] = "ceobecanteen"
need_browser: bool = True
template_path: Path = Path(__file__).parent / "templates"
template_name: str = "ceobe_canteen.html.jinja"
async def parse(self, post: "Post") -> tuple[CeobeCard, list[str | bytes | Path | BytesIO]]:
"""解析 Post 为 CeobeCard与处理好的图片列表"""
if not post.nickname:
raise ThemeRenderUnsupportError("post.nickname is None")
if not post.timestamp:
raise ThemeRenderUnsupportError("post.timestamp is None")
info = CeobeInfo(
datasource=post.nickname, time=datetime.fromtimestamp(post.timestamp).strftime("%Y-%m-%d %H:%M:%S")
)
http_client = await post.platform.ctx.get_client_for_static()
images: list[str | bytes | Path | BytesIO] = []
if post.images:
images = await self.merge_pics(post.images, http_client)
content = CeoboContent(text=await post.get_content())
retweet: CeoboRetweet | None = None
if post.repost:
repost_head_pic: str | None = None
if post.repost.images:
repost_images = await self.merge_pics(post.repost.images, http_client)
repost_head_pic = self.extract_head_pic(repost_images)
images.extend(repost_images)
repost_nickname = f"@{post.repost.nickname}:" if post.repost.nickname else ""
retweet = CeoboRetweet(
image=repost_head_pic, content=await post.repost.get_content(), author=repost_nickname
)
return (
CeobeCard(
info=info,
content=content,
qr=web_embed_image(convert_to_qr(post.url or "No URL", back_color=(240, 236, 233))),
retweet=retweet,
),
images,
)
@staticmethod
async def merge_pics(
images: list[str | bytes | Path | BytesIO],
client: AsyncClient,
) -> list[str | bytes | Path | BytesIO]:
if is_pics_mergable(images):
pics = await pic_merge(images, client)
else:
pics = images
return list(pics)
@staticmethod
def extract_head_pic(pics: list[str | bytes | Path | BytesIO]) -> str:
head_pic = web_embed_image(pics[0]) if not isinstance(pics[0], str) else pics[0]
return head_pic
@staticmethod
def card_link(head_pic: PILImage.Image, card_body: PILImage.Image) -> PILImage.Image:
"""将头像与卡片合并"""
def resize_image(img: PILImage.Image, size: tuple[int, int]) -> PILImage.Image:
return img.resize(size)
# 统一图片宽度
head_pic_w, head_pic_h = head_pic.size
card_body_w, card_body_h = card_body.size
if head_pic_w > card_body_w:
head_pic = resize_image(head_pic, (card_body_w, int(head_pic_h * card_body_w / head_pic_w)))
else:
card_body = resize_image(card_body, (head_pic_w, int(card_body_h * head_pic_w / card_body_w)))
# 合并图片
card = PILImage.new("RGBA", (head_pic.width, head_pic.height + card_body.height))
card.paste(head_pic, (0, 0))
card.paste(card_body, (0, head_pic.height))
return card
async def render(self, post: "Post") -> list[MessageSegmentFactory]:
ceobe_card, merged_images = await self.parse(post)
need_card_link: bool = True
head_pic = None
# 如果没有 post.images则全部都是转发里的图片不需要头图
if post.images:
match merged_images[0]:
case bytes():
head_pic = merged_images[0]
merged_images = merged_images[1:]
case BytesIO():
head_pic = merged_images[0].getvalue()
merged_images = merged_images[1:]
case str(s) if URL(s).scheme in ("http", "https"):
ceobe_card.content.image = merged_images[0]
need_card_link = False
case Path():
ceobe_card.content.image = merged_images[0].as_uri()
need_card_link = False
case _:
raise ThemeRenderError(f"Unknown image type: {type(merged_images[0])}")
from nonebot_plugin_htmlrender import get_new_page
template_env = jinja2.Environment(
loader=jinja2.FileSystemLoader(self.template_path),
enable_async=True,
)
template = template_env.get_template(self.template_name)
html = await template.render_async(card=ceobe_card)
pages = {
"device_scale_factor": 2,
"viewport": {"width": 512, "height": 455},
"base_url": self.template_path.as_uri(),
}
try:
async with get_new_page(**pages) as page:
await page.goto(self.template_path.as_uri())
await page.set_content(html)
await page.wait_for_timeout(1)
card_body = await page.locator("#ceobecanteen-card").screenshot(
type="jpeg",
quality=90,
)
except Exception as e:
raise ThemeRenderError(f"Render error: {e}") from e
msgs: list[MessageSegmentFactory] = []
if need_card_link and head_pic:
card_pil = self.card_link(
head_pic=PILImage.open(BytesIO(head_pic)),
card_body=PILImage.open(BytesIO(card_body)),
)
card_data = BytesIO()
card_pil.save(card_data, format="PNG")
msgs.append(Image(card_data.getvalue()))
else:
msgs.append(Image(card_body))
text = f"来源: {post.platform.name} {post.nickname or ''}\n"
if post.url:
text += f"详情: {post.url}"
msgs.append(Text(text))
pics_group: list[list[str | bytes | Path | BytesIO]] = []
if post.images:
pics_group.append(post.images)
if post.repost and post.repost.images:
pics_group.append(post.repost.images)
client = await post.platform.ctx.get_client_for_static()
for pics in pics_group:
if is_pics_mergable(pics):
pics = await pic_merge(list(pics), client)
msgs.extend(map(Image, pics))
return msgs