diff --git a/nonebot_bison/admin_page/token_manager.py b/nonebot_bison/admin_page/token_manager.py
index bb62d0a..365ee64 100644
--- a/nonebot_bison/admin_page/token_manager.py
+++ b/nonebot_bison/admin_page/token_manager.py
@@ -1,12 +1,13 @@
import random
import string
+from datetime import timedelta
-from expiringdict import ExpiringDict
+from expiringdictx import ExpiringDict
class TokenManager:
def __init__(self):
- self.token_manager = ExpiringDict(max_len=100, max_age_seconds=60 * 10)
+ self.token_manager = ExpiringDict[str, tuple](capacity=100, default_age=timedelta(minutes=10))
def get_user(self, token: str) -> tuple | None:
res = self.token_manager.get(token)
diff --git a/nonebot_bison/platform/ceobecanteen/__init__.py b/nonebot_bison/platform/ceobecanteen/__init__.py
new file mode 100644
index 0000000..3b8c65e
--- /dev/null
+++ b/nonebot_bison/platform/ceobecanteen/__init__.py
@@ -0,0 +1 @@
+from .platform import CeobeCanteen as CeobeCanteen
diff --git a/nonebot_bison/platform/ceobecanteen/cache.py b/nonebot_bison/platform/ceobecanteen/cache.py
new file mode 100644
index 0000000..b157d56
--- /dev/null
+++ b/nonebot_bison/platform/ceobecanteen/cache.py
@@ -0,0 +1,119 @@
+from typing import TypeAlias
+from functools import partial
+from datetime import timedelta
+from types import MappingProxyType
+from collections.abc import Callable
+
+from httpx import AsyncClient, AsyncHTTPTransport
+from expiringdictx import SimpleCache, ExpiringDict
+from hishel import Controller, AsyncCacheTransport, AsyncInMemoryStorage
+
+from .const import DATASOURCE_URL
+from .utils import process_response
+from .models import CeobeSource, CeobeTarget, DataSourceResponse
+
+cache_transport = AsyncCacheTransport(
+ AsyncHTTPTransport(),
+ storage=AsyncInMemoryStorage(),
+ controller=Controller(
+ always_revalidate=True,
+ ),
+)
+
+CeobeClient = partial(
+ AsyncClient,
+ transport=cache_transport,
+)
+
+UniqueId: TypeAlias = str
+
+
+class CeobeCache:
+ # 不在 __init__ 中初始化,让多个实例共享一个缓存
+ _cache = SimpleCache()
+
+ def __init__(self, lifetime: timedelta, store_key: str | None = None):
+ self.store_key = store_key
+ self.lifetime = lifetime
+
+ def __set_name__(self, owner, name: str):
+ self.key = self.store_key or name
+
+ def __get__(self, instance, owner):
+ return self._cache.get(self.key)
+
+ def __set__(self, instance, value):
+ self._cache[self.key, self.lifetime] = value
+
+
+class CeobeDataSourceCache:
+ """数据源缓存, 以unique_id为key存储数据源"""
+
+ def __init__(self):
+ self._cache = ExpiringDict[UniqueId, CeobeTarget](capacity=100, default_age=timedelta(days=1))
+ self.client = CeobeClient()
+ self.url = DATASOURCE_URL
+
+ @property
+ def cache(self) -> MappingProxyType[str, CeobeTarget]:
+ return MappingProxyType(self._cache)
+
+ async def refresh_data_sources(self) -> MappingProxyType[UniqueId, CeobeTarget]:
+ """请求数据源API刷新缓存"""
+ data_sources_resp = await self.client.get(self.url)
+ data_sources = process_response(data_sources_resp, DataSourceResponse).data
+ for ds in data_sources:
+ self._cache[ds.unique_id] = ds
+ return self.cache
+
+ async def get_all(self, force_refresh: bool = False) -> MappingProxyType[UniqueId, CeobeTarget]:
+ """获取所有数据源, 如果缓存为空则尝试刷新缓存"""
+ if not self.cache or force_refresh:
+ await self.refresh_data_sources()
+ return self.cache
+
+ def select_one(self, cond_func: Callable[[CeobeTarget], bool]) -> CeobeTarget | None:
+ """根据条件获取数据源
+
+ 不会刷新缓存
+ """
+ cache = self._cache.values()
+ return next(filter(cond_func, cache), None)
+
+ async def get_by_unique_id(self, unique_id: str) -> CeobeTarget | None:
+ """根据unique_id获取数据源
+
+ 如果在缓存中找不到,会刷新缓存
+ """
+ if target := self._cache.get(unique_id):
+ return target
+ await self.refresh_data_sources()
+ return self._cache.get(unique_id)
+
+ async def get_by_nickname(self, nickname: str) -> CeobeTarget | None:
+ """根据nickname获取数据源
+
+ 如果在缓存中找不到,会刷新缓存
+ """
+
+ def select_by_nickname(target: CeobeTarget):
+ return target.nickname == nickname
+
+ if target := self.select_one(select_by_nickname):
+ return target
+ await self.refresh_data_sources()
+ return self.select_one(select_by_nickname)
+
+ async def get_by_source(self, source: CeobeSource) -> CeobeTarget | None:
+ """根据source获取数据源
+
+ 如果在缓存中找不到,会刷新缓存
+ """
+
+ def select_by_source(target: CeobeTarget):
+ return target.db_unique_key == source.data and target.datasource == source.type
+
+ if target := self.select_one(select_by_source):
+ return target
+ await self.refresh_data_sources()
+ return self.select_one(select_by_source)
diff --git a/nonebot_bison/platform/ceobecanteen/const.py b/nonebot_bison/platform/ceobecanteen/const.py
new file mode 100644
index 0000000..ef970a4
--- /dev/null
+++ b/nonebot_bison/platform/ceobecanteen/const.py
@@ -0,0 +1,4 @@
+DATASOURCE_URL = "https://server.ceobecanteen.top/api/v1/canteen/config/datasource/list"
+COMB_ID_URL = "https://server.ceobecanteen.top/api/v1/canteen/user/getDatasourceComb"
+COOKIE_ID_URL = "http://cdn.ceobecanteen.top/datasource-comb"
+COOKIES_URL = "https://server-cdn.ceobecanteen.top/api/v1/cdn/cookie/mainList/cookieList"
diff --git a/nonebot_bison/platform/ceobecanteen/exception.py b/nonebot_bison/platform/ceobecanteen/exception.py
new file mode 100644
index 0000000..7dfdccb
--- /dev/null
+++ b/nonebot_bison/platform/ceobecanteen/exception.py
@@ -0,0 +1,10 @@
+class CeobeResponseError(Exception): ...
+
+
+class CeobeSnapshotException(Exception): ...
+
+
+class CeobeSnapshotSkip(CeobeSnapshotException): ...
+
+
+class CeobeSnapshotFailed(CeobeSnapshotException): ...
diff --git a/nonebot_bison/platform/ceobecanteen/models.py b/nonebot_bison/platform/ceobecanteen/models.py
new file mode 100644
index 0000000..1987948
--- /dev/null
+++ b/nonebot_bison/platform/ceobecanteen/models.py
@@ -0,0 +1,127 @@
+from typing import Literal, TypeVar, NamedTuple
+
+from pydantic import BaseModel
+
+
+class CeobeTextPic(NamedTuple):
+ text: str
+ pics: list[bytes | str]
+
+
+class CeobeTarget(BaseModel):
+ """账户结构"""
+
+ avatar: str
+ """数据源头像"""
+ datasource: str
+ """数据源类型"""
+ db_unique_key: str
+ """数据源相关唯一id"""
+ nickname: str
+ """数据源昵称"""
+ platform: str
+ """平台代码"""
+ unique_id: str
+ """数据源唯一标识(用于前后端交互标识)"""
+ jump_url: str | None = None
+ """跳转url(null就是没办法跳转)"""
+
+
+class DataSourceResponse(BaseModel):
+ code: int
+ message: str
+ data: list[CeobeTarget]
+
+
+class CeobeImage(BaseModel):
+ origin_url: str
+ """原图"""
+ compress_url: str | None = None
+ """压缩图,为null就是没有原图对应压缩图"""
+
+
+class CeobeDefaultCookie(BaseModel):
+ text: str
+ images: list[CeobeImage] | None
+
+
+class CeobeRetweeted(BaseModel):
+ author_name: str
+ author_avatar: str
+ text: str
+ images: list[CeobeImage] | None = None
+
+
+class CeobeItem(BaseModel):
+ id: str
+ """单条id"""
+ url: str
+ """跳转链接"""
+ type: str | None = None
+ """类型"""
+ is_long_text: bool | None = None
+ """是否长文"""
+ is_retweeted: bool = False
+ """是否转发"""
+ retweeted: CeobeRetweeted | None = None
+ """展示类型,公告类型的数据源有这个字段"""
+ display_type: int | None = None
+
+ class Config:
+ extra = "allow"
+
+
+class CeobeSource(BaseModel):
+ data: str
+ """数据源id"""
+ type: str
+ """数据源类型"""
+
+
+class CeobeTimestamp(BaseModel):
+ fetcher: int
+ """蹲饼时间,毫秒"""
+ platform_precision: Literal["none", "day", "hour", "minute", "second", "ms"]
+ """平台时间精度,不足的长度补0"""
+ platform: int | None = None
+ """平台时间戳,毫秒"""
+
+
+class CeobeCookie(BaseModel):
+ datasource: str
+ """数据源名字"""
+ icon: str
+ """数据源头像"""
+ timestamp: CeobeTimestamp
+ """时间戳"""
+ default_cookie: CeobeDefaultCookie
+ """原始饼"""
+ item: CeobeItem
+ """数据源信息,有平台的特殊字段"""
+ source: CeobeSource
+ """数据源"""
+
+
+class CeobeData(BaseModel):
+ cookies: list[CeobeCookie]
+ next_page_id: str | None = None
+
+
+class CookiesResponse(BaseModel):
+ code: int
+ message: str
+ data: CeobeData
+
+
+class CombIdResponse(BaseModel):
+ code: int
+ message: str
+ data: dict[Literal["datasource_comb_id"], str]
+
+
+class CookieIdResponse(BaseModel):
+ cookie_id: str
+ update_cookie_id: str
+
+
+ResponseModel = TypeVar("ResponseModel", bound=CookiesResponse | CombIdResponse | CookieIdResponse | DataSourceResponse)
diff --git a/nonebot_bison/platform/ceobecanteen/platform.py b/nonebot_bison/platform/ceobecanteen/platform.py
new file mode 100644
index 0000000..a3674be
--- /dev/null
+++ b/nonebot_bison/platform/ceobecanteen/platform.py
@@ -0,0 +1,324 @@
+from typing import ParamSpec
+from functools import partial
+from datetime import timedelta
+from collections import defaultdict
+
+from httpx import AsyncClient
+from nonebot import logger, require
+from rapidfuzz import fuzz, process
+
+from nonebot_bison.post import Post
+from nonebot_bison.plugin_config import plugin_config
+from nonebot_bison.types import Target, RawPost, Category
+from nonebot_bison.utils import Site, ClientManager, capture_html
+
+from ..platform import NewMessage
+from .utils import process_response
+from .const import COMB_ID_URL, COOKIES_URL, COOKIE_ID_URL
+from .exception import CeobeSnapshotSkip, CeobeSnapshotFailed
+from .cache import CeobeCache, CeobeClient, CeobeDataSourceCache
+from .models import CeobeImage, CeobeCookie, CeobeTextPic, CombIdResponse, CookiesResponse, CookieIdResponse
+
+P = ParamSpec("P")
+
+
+class CeobeCanteenClientManager(ClientManager):
+ _client: AsyncClient
+
+ def __init__(self):
+ self._client = CeobeClient(
+ headers={
+ "User-Agent": "MountainDash/Nonebot-Bison",
+ }
+ )
+
+ async def get_client(self, target: Target | None) -> AsyncClient:
+ return self._client
+
+ async def get_client_for_static(self) -> AsyncClient:
+ return self._client
+
+ async def get_query_name_client(self) -> AsyncClient:
+ return self._client
+
+ async def refresh_client(self):
+ raise NotImplementedError("refresh_client is not implemented")
+
+
+class CeobeCanteenSite(Site):
+ name = "ceobe_canteen"
+ schedule_type = "interval"
+ # lwt の 推荐间隔
+ schedule_setting = {"seconds": 15}
+ client_mgr = CeobeCanteenClientManager
+
+
+class CeobeCanteen(NewMessage):
+ enable_tag: bool = False
+ platform_name: str = "ceobecanteen"
+ name: str = "小刻食堂"
+ enabled: bool = True
+ is_common: bool = False
+ site = CeobeCanteenSite
+ has_target: bool = True
+ use_batch: bool = True
+ default_theme: str = "ceobecanteen"
+
+ categories: dict[Category, str] = {1: "普通", 2: "转发"}
+
+ data_source_cache = CeobeDataSourceCache()
+
+ comb_id = CeobeCache(timedelta(hours=12))
+ cookie_id = CeobeCache(timedelta(hours=1))
+ cookies = CeobeCache(timedelta(hours=1))
+
+ async def get_comb_id(self, target_uuids: list[str]):
+ """获取数据源的组合id"""
+ payload = {"datasource_push": target_uuids}
+ logger.trace(payload)
+ client = await self.ctx.get_client()
+ resp = await client.post(
+ COMB_ID_URL,
+ json=payload,
+ )
+ comb_id = process_response(resp, CombIdResponse).data["datasource_comb_id"]
+ logger.trace(f"get comb_id: {comb_id}")
+ return comb_id
+
+ async def get_comb_id_of_all(self):
+ """获取 "全部数据源" 的组合id,获取到的comb_id会缓存12小时"""
+ logger.trace("no comb_id, request")
+ target_uuids = (await self.data_source_cache.get_all()).keys()
+ comb_id = await self.get_comb_id(list(target_uuids))
+
+ logger.trace(f"use comb_id: {comb_id}")
+ return comb_id
+
+ async def get_latest_cookie_id(self, comb_id: str):
+ """根据comb_id获取最新cookie_id"""
+ client = await self.ctx.get_client()
+ resp = await client.get(f"{COOKIE_ID_URL}/{comb_id}")
+ cookie_id = process_response(resp, CookieIdResponse).cookie_id
+ logger.trace(f"get cookie_id: {cookie_id}")
+ return cookie_id
+
+ async def get_cookies(self, cookie_id: str, comb_id: str | None = None):
+ """根据cookie_id、comb_id组合获取cookies"""
+ client = await self.ctx.get_client()
+ parmas = {
+ "datasource_comb_id": comb_id,
+ "cookie_id": cookie_id,
+ }
+ logger.trace(f"will reuquest: {parmas}")
+ resp = await client.get(COOKIES_URL, params=parmas)
+ return process_response(resp, CookiesResponse).data.cookies
+
+ async def fetch_ceobe_cookies(self) -> list[CeobeCookie]:
+ if not self.comb_id:
+ self.comb_id = await self.get_comb_id_of_all()
+
+ latest_cookie_id = await self.get_latest_cookie_id(self.comb_id)
+ if not latest_cookie_id:
+ return []
+
+ if latest_cookie_id != self.cookie_id:
+ self.cookie_id = latest_cookie_id
+ self.cookies = await self.get_cookies(latest_cookie_id, self.comb_id)
+
+ return self.cookies or []
+
+ async def batch_get_sub_list(self, targets: list[Target]) -> list[list[CeobeCookie]]:
+ cookies = await self.fetch_ceobe_cookies()
+
+ dispatched_cookies: defaultdict[Target, list[CeobeCookie]] = defaultdict(list)
+ for cookie in cookies:
+ if ceobe_target := await self.data_source_cache.get_by_source(cookie.source):
+ dispatched_cookies[Target(ceobe_target.unique_id)].append(cookie)
+
+ return [dispatched_cookies[target] for target in targets]
+
+ @classmethod
+ async def get_target_name(cls, _, uuid_target: Target) -> str:
+ ceobe_target = await cls.data_source_cache.get_by_unique_id(uuid_target)
+ return ceobe_target.nickname if ceobe_target else "UNKNOWN"
+
+ @classmethod
+ async def parse_target(cls, nickname: str) -> Target:
+ ceobe_target = await cls.data_source_cache.get_by_nickname(nickname)
+ if not ceobe_target:
+ all_targets_name = [target.nickname for target in (await cls.data_source_cache.get_all()).values()]
+ matched_targets_name = process.extract(nickname, all_targets_name, scorer=fuzz.token_sort_ratio, limit=3)
+ logger.debug(f"possible targets: {matched_targets_name}")
+ raise cls.ParseTargetException(
+ prompt="未能匹配到对应的小刻食堂数据源,可能的选择有: \n"
+ + "\n".join([name for name, *_ in matched_targets_name])
+ + f"\n\n请检查原输入是否正确: {nickname}"
+ )
+ return Target(ceobe_target.unique_id)
+
+ def get_tags(self, _: RawPost) -> None:
+ return
+
+ def get_category(self, post: CeobeCookie) -> Category:
+ if post.item.is_retweeted:
+ return Category(2)
+ return Category(1)
+
+ def get_id(self, post: CeobeCookie) -> str:
+ return post.item.id
+
+ def get_date(self, post: CeobeCookie) -> int:
+ return post.timestamp.fetcher
+
+ async def parse(self, raw_post: CeobeCookie) -> Post:
+ target = await self.data_source_cache.get_by_source(raw_post.source)
+ assert target, "target not found"
+
+ content, pics = await self.take_snapshot(raw_post)
+
+ timestamp = raw_post.timestamp.platform or raw_post.timestamp.fetcher
+ if timestamp:
+ timestamp /= 1000 # 从毫秒级转换到秒级
+
+ retweet: Post | None = None
+ if raw_post.item.is_retweeted and raw_post.item.retweeted:
+ raw_retweet_pics = raw_post.item.retweeted.images or []
+ retweet_pics = await self.parse_retweet_images(raw_retweet_pics, raw_post.source.type)
+
+ retweet = Post(
+ self,
+ nickname=raw_post.item.retweeted.author_name,
+ avatar=raw_post.item.retweeted.author_avatar,
+ images=list(retweet_pics),
+ content=raw_post.item.retweeted.text,
+ )
+
+ return Post(
+ self,
+ content,
+ url=raw_post.item.url,
+ nickname=raw_post.datasource,
+ images=list(pics),
+ timestamp=timestamp,
+ avatar=target.avatar,
+ description=target.platform,
+ repost=retweet,
+ )
+
+ async def snapshot_official_website(self, url: str) -> bytes:
+ """截取小刻官网的截图"""
+ require("nonebot_plugin_htmlrender")
+ from nonebot_plugin_htmlrender import get_new_page
+
+ logger.debug(f"snapshot official website url: {url}")
+
+ # /html/body/div[1]/div[1]/div/div[1]/div[1]/div
+ snapshot_selector = "html > body > div:nth-child(1) > div:nth-child(1) > div > div:nth-child(1) > div:nth-child(1) > div" # noqa: E501
+ # /html/body/div[1]/div[1]/div/div[1]/div[1]/div/div[4]/div/div/div
+ calculate_selector = "html > body > div:nth-child(1) > div:nth-child(1) > div > div:nth-child(1) > div:nth-child(1) > div > div:nth-child(4) > div > div > div" # noqa: E501
+ viewport = {"width": 1024, "height": 19990}
+
+ try:
+ async with get_new_page(viewport=viewport) as page:
+ await page.goto(url, wait_until="networkidle")
+ element_width = await page.evaluate(
+ "(selector) => document.querySelector(selector).offsetWidth", calculate_selector
+ )
+ logger.debug(f"element width: {element_width}")
+ element_height = await page.evaluate(
+ "(selector) => document.querySelector(selector).offsetHeight", calculate_selector
+ )
+ logger.debug(f"element height: {element_height}")
+ element_height += 1000
+
+ await page.set_viewport_size({"width": 1024, "height": element_height})
+
+ element = await page.locator(snapshot_selector).element_handle()
+ # add padding to make the screenshot more beautiful
+ await element.evaluate("(element) => {element.style.padding = '20px';}", element)
+
+ pic_data = await element.screenshot(
+ type="png",
+ )
+ except Exception as e:
+ raise CeobeSnapshotFailed("渲染错误") from e
+ else:
+ return pic_data
+
+ async def snapshot_bulletin_list(self, url: str) -> bytes:
+ """截取小刻公告列表的截图"""
+ selector = "body > div.main > div.container"
+ viewport = {"width": 1024, "height": 19990}
+
+ try:
+ pic_data = await capture_html(
+ url,
+ selector,
+ timeout=30000,
+ wait_until="networkidle",
+ viewport=viewport,
+ )
+ assert pic_data
+ except Exception:
+ raise CeobeSnapshotFailed("渲染错误")
+ else:
+ return pic_data
+
+ async def take_snapshot(
+ self,
+ raw_post: CeobeCookie,
+ ) -> CeobeTextPic:
+ """判断数据源类型,判断是否需要截图"""
+
+ match raw_post.source.type:
+ case "arknights-website:official-website":
+
+ async def owss(url: str) -> CeobeTextPic:
+ return CeobeTextPic(text="", pics=[await self.snapshot_official_website(url)])
+
+ snapshot_func = partial(owss, raw_post.item.url)
+ case "arknights-game:bulletin-list" if raw_post.item.display_type != 2:
+
+ async def blss(url: str) -> CeobeTextPic:
+ return CeobeTextPic(text="", pics=[await self.snapshot_bulletin_list(url)])
+
+ snapshot_func = partial(blss, raw_post.item.url)
+ case _:
+
+ async def npss() -> CeobeTextPic:
+ raise CeobeSnapshotSkip("无需截图的数据源")
+
+ snapshot_func = partial(npss)
+
+ raw_pics = raw_post.default_cookie.images or []
+ try:
+ if not plugin_config.bison_use_browser:
+ raise CeobeSnapshotSkip("未启用浏览器")
+ res = await snapshot_func()
+ except CeobeSnapshotSkip as e:
+ logger.info(f"skip snapshot: {e}")
+ pics = await self.parse_retweet_images(raw_pics, raw_post.source.type)
+ res = CeobeTextPic(text=raw_post.default_cookie.text, pics=list(pics))
+ except CeobeSnapshotFailed:
+ logger.exception("snapshot failed")
+ pics = await self.parse_retweet_images(raw_pics, raw_post.source.type)
+ res = CeobeTextPic(text=raw_post.default_cookie.text, pics=list(pics))
+
+ return res
+
+ async def parse_retweet_images(self, images: list[CeobeImage], source_type: str) -> list[bytes] | list[str]:
+ if source_type.startswith("weibo"):
+ retweet_pics = await self.download_weibo_image([image.origin_url for image in images])
+ else:
+ retweet_pics = [image.origin_url for image in images]
+ return retweet_pics
+
+ async def download_weibo_image(self, image_urls: list[str]) -> list[bytes]:
+ headers = {"referer": "https://weibo.cn/"}
+ pics = []
+ async with CeobeClient(headers=headers) as client:
+ for url in image_urls:
+ resp = await client.get(url)
+ resp.raise_for_status()
+ pics.append(resp.content)
+ return pics
diff --git a/nonebot_bison/platform/ceobecanteen/utils.py b/nonebot_bison/platform/ceobecanteen/utils.py
new file mode 100644
index 0000000..83667d5
--- /dev/null
+++ b/nonebot_bison/platform/ceobecanteen/utils.py
@@ -0,0 +1,20 @@
+from httpx import Response
+from nonebot import logger
+from nonebot.compat import type_validate_python
+
+from .exception import CeobeResponseError
+from .models import ResponseModel, CookieIdResponse
+
+
+def process_response(response: Response, parse_model: type[ResponseModel]) -> ResponseModel:
+ response.raise_for_status()
+ logger.trace(f"小刻食堂请求结果: {response.json().get('message')} {parse_model=}")
+
+ try:
+ data = type_validate_python(parse_model, response.json())
+ except Exception as e:
+ raise CeobeResponseError(f"解析小刻食堂响应失败: {e}")
+
+ if not isinstance(data, CookieIdResponse) and data.code != 0:
+ raise CeobeResponseError(f"获取饼数据失败: {data.message}")
+ return data
diff --git a/nonebot_bison/post/post.py b/nonebot_bison/post/post.py
index 5e68925..e7bbd07 100644
--- a/nonebot_bison/post/post.py
+++ b/nonebot_bison/post/post.py
@@ -31,8 +31,8 @@ class Post(AbstractPost):
"""标题"""
images: list[str | bytes | Path | BytesIO] | None = None
"""图片列表"""
- timestamp: int | None = None
- """发布/获取时间戳"""
+ timestamp: float | None = None
+ """发布/获取时间戳, 秒"""
url: str | None = None
"""来源链接"""
avatar: str | bytes | Path | BytesIO | None = None
diff --git a/nonebot_bison/theme/themes/arknights/build.py b/nonebot_bison/theme/themes/arknights/build.py
index f126859..01c932e 100644
--- a/nonebot_bison/theme/themes/arknights/build.py
+++ b/nonebot_bison/theme/themes/arknights/build.py
@@ -1,9 +1,12 @@
+from io import BytesIO
from pathlib import Path
from dataclasses import dataclass
from typing import TYPE_CHECKING, Literal
from nonebot_plugin_saa import Text, Image, MessageSegmentFactory
+from nonebot_bison.utils import text_fletten
+from nonebot_bison.theme.utils import web_embed_image
from nonebot_bison.theme import Theme, ThemeRenderError, ThemeRenderUnsupportError
if TYPE_CHECKING:
@@ -34,16 +37,21 @@ class ArknightsTheme(Theme):
if not post.title:
raise ThemeRenderUnsupportError("标题为空")
- if post.images and len(post.images) > 1:
- raise ThemeRenderUnsupportError("图片数量大于1")
banner = post.images[0] if post.images else None
- if banner is not None and not isinstance(banner, str | Path):
- raise ThemeRenderUnsupportError(f"图片类型错误, 期望 str 或 Path, 实际为 {type(banner)}")
+ match banner:
+ case bytes() | BytesIO():
+ banner = web_embed_image(banner)
+ case str() | Path() | None:
+ pass
+ case _:
+ raise ThemeRenderUnsupportError(
+ f"图片类型错误, 期望 str | Path | bytes | BytesIO | None, 实际为 {type(banner)}"
+ )
ark_data = ArkData(
- announce_title=post.title,
+ announce_title=text_fletten(post.title),
content=post.content,
banner_image_url=banner,
)
@@ -64,6 +72,10 @@ class ArknightsTheme(Theme):
raise ThemeRenderError(f"渲染文本失败: {e}")
msgs: list[MessageSegmentFactory] = []
msgs.append(Image(announce_pic))
+
if post.url:
msgs.append(Text(f"前往:{post.url}"))
- return [Image(announce_pic)]
+ if post.images:
+ msgs.extend(Image(img) for img in post.images[1:])
+
+ return msgs
diff --git a/nonebot_bison/theme/themes/ceobe_canteen/README.md b/nonebot_bison/theme/themes/ceobe_canteen/README.md
index d40fb1a..62c1501 100644
--- a/nonebot_bison/theme/themes/ceobe_canteen/README.md
+++ b/nonebot_bison/theme/themes/ceobe_canteen/README.md
@@ -3,8 +3,12 @@
## LOGO图片
- `templates/ceobecanteen_logo.png`
+- `templates/bison_logo.png`
### 版权声明
-
logo图片采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。
-本项目使用已经过 [Ceobe Canteen](https://github.com/Enraged-Dun-Cookie-Development-Team) 授权许可使用。
+
所声明的 LOGO 图片采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。
+
+本项目使用的 小刻食堂 LOGO 已经过 [小刻食堂](https://github.com/Enraged-Dun-Cookie-Development-Team) 授权许可使用。
+
+本项目使用的 Nonebot-Bison LOGO
由画师 不画涩图の企鹅 倾情贡献,非常感谢!
diff --git a/nonebot_bison/theme/themes/ceobe_canteen/build.py b/nonebot_bison/theme/themes/ceobe_canteen/build.py
index 8b24464..862b111 100644
--- a/nonebot_bison/theme/themes/ceobe_canteen/build.py
+++ b/nonebot_bison/theme/themes/ceobe_canteen/build.py
@@ -4,12 +4,15 @@ from datetime import datetime
from typing import TYPE_CHECKING, Literal
import jinja2
+from yarl import URL
+from httpx import AsyncClient
from pydantic import BaseModel
+from PIL import Image as PILImage
from nonebot_plugin_saa import Text, Image, MessageSegmentFactory
from nonebot_bison.compat import model_validator
-from nonebot_bison.theme.utils import convert_to_qr
-from nonebot_bison.utils.image import pic_merge, is_pics_mergable
+from nonebot_bison.utils import pic_merge, is_pics_mergable
+from nonebot_bison.theme.utils import convert_to_qr, web_embed_image
from nonebot_bison.theme import Theme, ThemeRenderError, ThemeRenderUnsupportError
if TYPE_CHECKING:
@@ -35,13 +38,26 @@ class CeoboContent(BaseModel):
text: 文字内容
"""
+ image: str | None = None
+ text: str
+
+
+class CeoboRetweet(BaseModel):
+ """卡片的转发部分
+
+ author: 原作者
+ image: 图片链接
+ content: 文字内容
+ """
+
image: str | None
- text: str | None
+ content: str | None
+ author: str
@model_validator(mode="before")
def check(cls, values):
- if values["image"] is None and values["text"] is None:
- raise ValueError("image and text cannot be both None")
+ if values["image"] is None and values["content"] is None:
+ raise ValueError("image and content cannot be both None")
return values
@@ -49,6 +65,7 @@ class CeobeCard(BaseModel):
info: CeobeInfo
content: CeoboContent
qr: str | None
+ retweet: CeoboRetweet | None
class CeobeCanteenTheme(Theme):
@@ -63,8 +80,8 @@ class CeobeCanteenTheme(Theme):
template_path: Path = Path(__file__).parent / "templates"
template_name: str = "ceobe_canteen.html.jinja"
- def parse(self, post: "Post") -> CeobeCard:
- """解析 Post 为 CeobeCard"""
+ async def parse(self, post: "Post") -> tuple[CeobeCard, list[str | bytes | Path | BytesIO]]:
+ """解析 Post 为 CeobeCard与处理好的图片列表"""
if not post.nickname:
raise ThemeRenderUnsupportError("post.nickname is None")
if not post.timestamp:
@@ -73,15 +90,96 @@ class CeobeCanteenTheme(Theme):
datasource=post.nickname, time=datetime.fromtimestamp(post.timestamp).strftime("%Y-%m-%d %H:%M:%S")
)
- head_pic = post.images[0] if post.images else None
- if head_pic is not None and not isinstance(head_pic, str):
- raise ThemeRenderUnsupportError("post.images[0] is not str")
+ http_client = await post.platform.ctx.get_client_for_static()
+ images: list[str | bytes | Path | BytesIO] = []
+ if post.images:
+ images = await self.merge_pics(post.images, http_client)
- content = CeoboContent(image=head_pic, text=post.content)
- return CeobeCard(info=info, content=content, qr=convert_to_qr(post.url or "No URL"))
+ content = CeoboContent(text=post.content)
+
+ retweet: CeoboRetweet | None = None
+ if post.repost:
+ repost_head_pic: str | None = None
+ if post.repost.images:
+ repost_images = await self.merge_pics(post.repost.images, http_client)
+ repost_head_pic = self.extract_head_pic(repost_images)
+ images.extend(repost_images)
+
+ repost_nickname = f"@{post.repost.nickname}:" if post.repost.nickname else ""
+ retweet = CeoboRetweet(image=repost_head_pic, content=post.repost.content, author=repost_nickname)
+
+ return (
+ CeobeCard(
+ info=info,
+ content=content,
+ qr=web_embed_image(convert_to_qr(post.url or "No URL", back_color=(240, 236, 233))),
+ retweet=retweet,
+ ),
+ images,
+ )
+
+ @staticmethod
+ async def merge_pics(
+ images: list[str | bytes | Path | BytesIO],
+ client: AsyncClient,
+ ) -> list[str | bytes | Path | BytesIO]:
+ if is_pics_mergable(images):
+ pics = await pic_merge(images, client)
+ else:
+ pics = images
+ return list(pics)
+
+ @staticmethod
+ def extract_head_pic(pics: list[str | bytes | Path | BytesIO]) -> str:
+ head_pic = web_embed_image(pics[0]) if not isinstance(pics[0], str) else pics[0]
+ return head_pic
+
+ @staticmethod
+ def card_link(head_pic: PILImage.Image, card_body: PILImage.Image) -> PILImage.Image:
+ """将头像与卡片合并"""
+
+ def resize_image(img: PILImage.Image, size: tuple[int, int]) -> PILImage.Image:
+ return img.resize(size)
+
+ # 统一图片宽度
+ head_pic_w, head_pic_h = head_pic.size
+ card_body_w, card_body_h = card_body.size
+
+ if head_pic_w > card_body_w:
+ head_pic = resize_image(head_pic, (card_body_w, int(head_pic_h * card_body_w / head_pic_w)))
+ else:
+ card_body = resize_image(card_body, (head_pic_w, int(card_body_h * head_pic_w / card_body_w)))
+
+ # 合并图片
+ card = PILImage.new("RGBA", (head_pic.width, head_pic.height + card_body.height))
+ card.paste(head_pic, (0, 0))
+ card.paste(card_body, (0, head_pic.height))
+ return card
async def render(self, post: "Post") -> list[MessageSegmentFactory]:
- ceobe_card = self.parse(post)
+ ceobe_card, merged_images = await self.parse(post)
+
+ need_card_link: bool = True
+ head_pic = None
+
+ # 如果没有 post.images,则全部都是转发里的图片,不需要头图
+ if post.images:
+ match merged_images[0]:
+ case bytes():
+ head_pic = merged_images[0]
+ merged_images = merged_images[1:]
+ case BytesIO():
+ head_pic = merged_images[0].getvalue()
+ merged_images = merged_images[1:]
+ case str(s) if URL(s).scheme in ("http", "https"):
+ ceobe_card.content.image = merged_images[0]
+ need_card_link = False
+ case Path():
+ ceobe_card.content.image = merged_images[0].as_uri()
+ need_card_link = False
+ case _:
+ raise ThemeRenderError(f"Unknown image type: {type(merged_images[0])}")
+
from nonebot_plugin_htmlrender import get_new_page
template_env = jinja2.Environment(
@@ -91,7 +189,8 @@ class CeobeCanteenTheme(Theme):
template = template_env.get_template(self.template_name)
html = await template.render_async(card=ceobe_card)
pages = {
- "viewport": {"width": 1000, "height": 3000},
+ "device_scale_factor": 2,
+ "viewport": {"width": 512, "height": 455},
"base_url": self.template_path.as_uri(),
}
try:
@@ -99,12 +198,24 @@ class CeobeCanteenTheme(Theme):
await page.goto(self.template_path.as_uri())
await page.set_content(html)
await page.wait_for_timeout(1)
- img_raw = await page.locator("#ceobecanteen-card").screenshot(
- type="png",
+ card_body = await page.locator("#ceobecanteen-card").screenshot(
+ type="jpeg",
+ quality=90,
)
except Exception as e:
raise ThemeRenderError(f"Render error: {e}") from e
- msgs: list[MessageSegmentFactory] = [Image(img_raw)]
+
+ msgs: list[MessageSegmentFactory] = []
+ if need_card_link and head_pic:
+ card_pil = self.card_link(
+ head_pic=PILImage.open(BytesIO(head_pic)),
+ card_body=PILImage.open(BytesIO(card_body)),
+ )
+ card_data = BytesIO()
+ card_pil.save(card_data, format="PNG")
+ msgs.append(Image(card_data.getvalue()))
+ else:
+ msgs.append(Image(card_body))
text = f"来源: {post.platform.name} {post.nickname or ''}\n"
if post.url:
diff --git a/nonebot_bison/theme/themes/ceobe_canteen/templates/bison_logo.jpg b/nonebot_bison/theme/themes/ceobe_canteen/templates/bison_logo.jpg
deleted file mode 100644
index 9e95800..0000000
Binary files a/nonebot_bison/theme/themes/ceobe_canteen/templates/bison_logo.jpg and /dev/null differ
diff --git a/nonebot_bison/theme/themes/ceobe_canteen/templates/bison_logo.png b/nonebot_bison/theme/themes/ceobe_canteen/templates/bison_logo.png
new file mode 100644
index 0000000..faeccfc
Binary files /dev/null and b/nonebot_bison/theme/themes/ceobe_canteen/templates/bison_logo.png differ
diff --git a/nonebot_bison/theme/themes/ceobe_canteen/templates/ceobe_canteen.html.jinja b/nonebot_bison/theme/themes/ceobe_canteen/templates/ceobe_canteen.html.jinja
index baba226..37b680b 100644
--- a/nonebot_bison/theme/themes/ceobe_canteen/templates/ceobe_canteen.html.jinja
+++ b/nonebot_bison/theme/themes/ceobe_canteen/templates/ceobe_canteen.html.jinja
@@ -11,6 +11,19 @@
{% endif %}
{% if card.content.text %}