mirror of
https://github.com/suyiiyii/nonebot-bison.git
synced 2026-05-09 10:17:56 +08:00
✨ 适配小刻食堂平台 (#379)
* 🐛 插入新的Schedulable时应传入use_batch参数 * ✨ 适配ceobecanteen平台 Co-authored-by: phidiaLam <2957035701@qq.com> * ✨ ✨ 明日方舟公告与官网采用截图分享 (#480) * ✨ 明日方舟公告与官网采用截图分享 * 💄 auto fix by pre-commit hooks * 🐛 修复缺少的导入,优化逻辑 --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Azide <rukuy@qq.com> * 🐛 优化截图图片效果 * 🐛 修复错误将转发内图片视作头图的问题 * 🍱 使用正式 Bison Logo * 💄 auto fix by pre-commit hooks * 🐛 请求小刻API时不在headers里添加过多字段 * 🐛 get_comb_id方法删除无用的targets参数 * 💡 get_comb_id方法更新注释 * 🔥 移除发送部分的更改 * ✨ 在命名中明确表示cond_func意图 * ♻️ 拆分get_comb_id功能 * ♻️ 调整缓存逻辑 * ✨ 使用uri在theme中调用platform截图 * ♻️ 重构截图逻辑 * ✨ 添加模糊匹配提示 * ✨ 适配新版Site * 💄 auto fix by pre-commit hooks * 🐛 去掉不必要的排序 * 🐛 修正不应出现的驼峰变量名 * ♻️ 按review意见修改 * ♻️ 调整截图函数逻辑 * 🔊 调低日志等级 * ✏️ 修复一些拼写和格式 --------- Co-authored-by: phidiaLam <2957035701@qq.com> Co-authored-by: 洛梧藤 <67498817+phidiaLam@users.noreply.github.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
This commit is contained in:
@@ -1,12 +1,13 @@
|
||||
import random
|
||||
import string
|
||||
from datetime import timedelta
|
||||
|
||||
from expiringdict import ExpiringDict
|
||||
from expiringdictx import ExpiringDict
|
||||
|
||||
|
||||
class TokenManager:
|
||||
def __init__(self):
|
||||
self.token_manager = ExpiringDict(max_len=100, max_age_seconds=60 * 10)
|
||||
self.token_manager = ExpiringDict[str, tuple](capacity=100, default_age=timedelta(minutes=10))
|
||||
|
||||
def get_user(self, token: str) -> tuple | None:
|
||||
res = self.token_manager.get(token)
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
from .platform import CeobeCanteen as CeobeCanteen
|
||||
@@ -0,0 +1,119 @@
|
||||
from typing import TypeAlias
|
||||
from functools import partial
|
||||
from datetime import timedelta
|
||||
from types import MappingProxyType
|
||||
from collections.abc import Callable
|
||||
|
||||
from httpx import AsyncClient, AsyncHTTPTransport
|
||||
from expiringdictx import SimpleCache, ExpiringDict
|
||||
from hishel import Controller, AsyncCacheTransport, AsyncInMemoryStorage
|
||||
|
||||
from .const import DATASOURCE_URL
|
||||
from .utils import process_response
|
||||
from .models import CeobeSource, CeobeTarget, DataSourceResponse
|
||||
|
||||
cache_transport = AsyncCacheTransport(
|
||||
AsyncHTTPTransport(),
|
||||
storage=AsyncInMemoryStorage(),
|
||||
controller=Controller(
|
||||
always_revalidate=True,
|
||||
),
|
||||
)
|
||||
|
||||
CeobeClient = partial(
|
||||
AsyncClient,
|
||||
transport=cache_transport,
|
||||
)
|
||||
|
||||
UniqueId: TypeAlias = str
|
||||
|
||||
|
||||
class CeobeCache:
|
||||
# 不在 __init__ 中初始化,让多个实例共享一个缓存
|
||||
_cache = SimpleCache()
|
||||
|
||||
def __init__(self, lifetime: timedelta, store_key: str | None = None):
|
||||
self.store_key = store_key
|
||||
self.lifetime = lifetime
|
||||
|
||||
def __set_name__(self, owner, name: str):
|
||||
self.key = self.store_key or name
|
||||
|
||||
def __get__(self, instance, owner):
|
||||
return self._cache.get(self.key)
|
||||
|
||||
def __set__(self, instance, value):
|
||||
self._cache[self.key, self.lifetime] = value
|
||||
|
||||
|
||||
class CeobeDataSourceCache:
|
||||
"""数据源缓存, 以unique_id为key存储数据源"""
|
||||
|
||||
def __init__(self):
|
||||
self._cache = ExpiringDict[UniqueId, CeobeTarget](capacity=100, default_age=timedelta(days=1))
|
||||
self.client = CeobeClient()
|
||||
self.url = DATASOURCE_URL
|
||||
|
||||
@property
|
||||
def cache(self) -> MappingProxyType[str, CeobeTarget]:
|
||||
return MappingProxyType(self._cache)
|
||||
|
||||
async def refresh_data_sources(self) -> MappingProxyType[UniqueId, CeobeTarget]:
|
||||
"""请求数据源API刷新缓存"""
|
||||
data_sources_resp = await self.client.get(self.url)
|
||||
data_sources = process_response(data_sources_resp, DataSourceResponse).data
|
||||
for ds in data_sources:
|
||||
self._cache[ds.unique_id] = ds
|
||||
return self.cache
|
||||
|
||||
async def get_all(self, force_refresh: bool = False) -> MappingProxyType[UniqueId, CeobeTarget]:
|
||||
"""获取所有数据源, 如果缓存为空则尝试刷新缓存"""
|
||||
if not self.cache or force_refresh:
|
||||
await self.refresh_data_sources()
|
||||
return self.cache
|
||||
|
||||
def select_one(self, cond_func: Callable[[CeobeTarget], bool]) -> CeobeTarget | None:
|
||||
"""根据条件获取数据源
|
||||
|
||||
不会刷新缓存
|
||||
"""
|
||||
cache = self._cache.values()
|
||||
return next(filter(cond_func, cache), None)
|
||||
|
||||
async def get_by_unique_id(self, unique_id: str) -> CeobeTarget | None:
|
||||
"""根据unique_id获取数据源
|
||||
|
||||
如果在缓存中找不到,会刷新缓存
|
||||
"""
|
||||
if target := self._cache.get(unique_id):
|
||||
return target
|
||||
await self.refresh_data_sources()
|
||||
return self._cache.get(unique_id)
|
||||
|
||||
async def get_by_nickname(self, nickname: str) -> CeobeTarget | None:
|
||||
"""根据nickname获取数据源
|
||||
|
||||
如果在缓存中找不到,会刷新缓存
|
||||
"""
|
||||
|
||||
def select_by_nickname(target: CeobeTarget):
|
||||
return target.nickname == nickname
|
||||
|
||||
if target := self.select_one(select_by_nickname):
|
||||
return target
|
||||
await self.refresh_data_sources()
|
||||
return self.select_one(select_by_nickname)
|
||||
|
||||
async def get_by_source(self, source: CeobeSource) -> CeobeTarget | None:
|
||||
"""根据source获取数据源
|
||||
|
||||
如果在缓存中找不到,会刷新缓存
|
||||
"""
|
||||
|
||||
def select_by_source(target: CeobeTarget):
|
||||
return target.db_unique_key == source.data and target.datasource == source.type
|
||||
|
||||
if target := self.select_one(select_by_source):
|
||||
return target
|
||||
await self.refresh_data_sources()
|
||||
return self.select_one(select_by_source)
|
||||
@@ -0,0 +1,4 @@
|
||||
DATASOURCE_URL = "https://server.ceobecanteen.top/api/v1/canteen/config/datasource/list"
|
||||
COMB_ID_URL = "https://server.ceobecanteen.top/api/v1/canteen/user/getDatasourceComb"
|
||||
COOKIE_ID_URL = "http://cdn.ceobecanteen.top/datasource-comb"
|
||||
COOKIES_URL = "https://server-cdn.ceobecanteen.top/api/v1/cdn/cookie/mainList/cookieList"
|
||||
@@ -0,0 +1,10 @@
|
||||
class CeobeResponseError(Exception): ...
|
||||
|
||||
|
||||
class CeobeSnapshotException(Exception): ...
|
||||
|
||||
|
||||
class CeobeSnapshotSkip(CeobeSnapshotException): ...
|
||||
|
||||
|
||||
class CeobeSnapshotFailed(CeobeSnapshotException): ...
|
||||
@@ -0,0 +1,127 @@
|
||||
from typing import Literal, TypeVar, NamedTuple
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class CeobeTextPic(NamedTuple):
|
||||
text: str
|
||||
pics: list[bytes | str]
|
||||
|
||||
|
||||
class CeobeTarget(BaseModel):
|
||||
"""账户结构"""
|
||||
|
||||
avatar: str
|
||||
"""数据源头像"""
|
||||
datasource: str
|
||||
"""数据源类型"""
|
||||
db_unique_key: str
|
||||
"""数据源相关唯一id"""
|
||||
nickname: str
|
||||
"""数据源昵称"""
|
||||
platform: str
|
||||
"""平台代码"""
|
||||
unique_id: str
|
||||
"""数据源唯一标识(用于前后端交互标识)"""
|
||||
jump_url: str | None = None
|
||||
"""跳转url(null就是没办法跳转)"""
|
||||
|
||||
|
||||
class DataSourceResponse(BaseModel):
|
||||
code: int
|
||||
message: str
|
||||
data: list[CeobeTarget]
|
||||
|
||||
|
||||
class CeobeImage(BaseModel):
|
||||
origin_url: str
|
||||
"""原图"""
|
||||
compress_url: str | None = None
|
||||
"""压缩图,为null就是没有原图对应压缩图"""
|
||||
|
||||
|
||||
class CeobeDefaultCookie(BaseModel):
|
||||
text: str
|
||||
images: list[CeobeImage] | None
|
||||
|
||||
|
||||
class CeobeRetweeted(BaseModel):
|
||||
author_name: str
|
||||
author_avatar: str
|
||||
text: str
|
||||
images: list[CeobeImage] | None = None
|
||||
|
||||
|
||||
class CeobeItem(BaseModel):
|
||||
id: str
|
||||
"""单条id"""
|
||||
url: str
|
||||
"""跳转链接"""
|
||||
type: str | None = None
|
||||
"""类型"""
|
||||
is_long_text: bool | None = None
|
||||
"""是否长文"""
|
||||
is_retweeted: bool = False
|
||||
"""是否转发"""
|
||||
retweeted: CeobeRetweeted | None = None
|
||||
"""展示类型,公告类型的数据源有这个字段"""
|
||||
display_type: int | None = None
|
||||
|
||||
class Config:
|
||||
extra = "allow"
|
||||
|
||||
|
||||
class CeobeSource(BaseModel):
|
||||
data: str
|
||||
"""数据源id"""
|
||||
type: str
|
||||
"""数据源类型"""
|
||||
|
||||
|
||||
class CeobeTimestamp(BaseModel):
|
||||
fetcher: int
|
||||
"""蹲饼时间,毫秒"""
|
||||
platform_precision: Literal["none", "day", "hour", "minute", "second", "ms"]
|
||||
"""平台时间精度,不足的长度补0"""
|
||||
platform: int | None = None
|
||||
"""平台时间戳,毫秒"""
|
||||
|
||||
|
||||
class CeobeCookie(BaseModel):
|
||||
datasource: str
|
||||
"""数据源名字"""
|
||||
icon: str
|
||||
"""数据源头像"""
|
||||
timestamp: CeobeTimestamp
|
||||
"""时间戳"""
|
||||
default_cookie: CeobeDefaultCookie
|
||||
"""原始饼"""
|
||||
item: CeobeItem
|
||||
"""数据源信息,有平台的特殊字段"""
|
||||
source: CeobeSource
|
||||
"""数据源"""
|
||||
|
||||
|
||||
class CeobeData(BaseModel):
|
||||
cookies: list[CeobeCookie]
|
||||
next_page_id: str | None = None
|
||||
|
||||
|
||||
class CookiesResponse(BaseModel):
|
||||
code: int
|
||||
message: str
|
||||
data: CeobeData
|
||||
|
||||
|
||||
class CombIdResponse(BaseModel):
|
||||
code: int
|
||||
message: str
|
||||
data: dict[Literal["datasource_comb_id"], str]
|
||||
|
||||
|
||||
class CookieIdResponse(BaseModel):
|
||||
cookie_id: str
|
||||
update_cookie_id: str
|
||||
|
||||
|
||||
ResponseModel = TypeVar("ResponseModel", bound=CookiesResponse | CombIdResponse | CookieIdResponse | DataSourceResponse)
|
||||
@@ -0,0 +1,324 @@
|
||||
from typing import ParamSpec
|
||||
from functools import partial
|
||||
from datetime import timedelta
|
||||
from collections import defaultdict
|
||||
|
||||
from httpx import AsyncClient
|
||||
from nonebot import logger, require
|
||||
from rapidfuzz import fuzz, process
|
||||
|
||||
from nonebot_bison.post import Post
|
||||
from nonebot_bison.plugin_config import plugin_config
|
||||
from nonebot_bison.types import Target, RawPost, Category
|
||||
from nonebot_bison.utils import Site, ClientManager, capture_html
|
||||
|
||||
from ..platform import NewMessage
|
||||
from .utils import process_response
|
||||
from .const import COMB_ID_URL, COOKIES_URL, COOKIE_ID_URL
|
||||
from .exception import CeobeSnapshotSkip, CeobeSnapshotFailed
|
||||
from .cache import CeobeCache, CeobeClient, CeobeDataSourceCache
|
||||
from .models import CeobeImage, CeobeCookie, CeobeTextPic, CombIdResponse, CookiesResponse, CookieIdResponse
|
||||
|
||||
P = ParamSpec("P")
|
||||
|
||||
|
||||
class CeobeCanteenClientManager(ClientManager):
|
||||
_client: AsyncClient
|
||||
|
||||
def __init__(self):
|
||||
self._client = CeobeClient(
|
||||
headers={
|
||||
"User-Agent": "MountainDash/Nonebot-Bison",
|
||||
}
|
||||
)
|
||||
|
||||
async def get_client(self, target: Target | None) -> AsyncClient:
|
||||
return self._client
|
||||
|
||||
async def get_client_for_static(self) -> AsyncClient:
|
||||
return self._client
|
||||
|
||||
async def get_query_name_client(self) -> AsyncClient:
|
||||
return self._client
|
||||
|
||||
async def refresh_client(self):
|
||||
raise NotImplementedError("refresh_client is not implemented")
|
||||
|
||||
|
||||
class CeobeCanteenSite(Site):
|
||||
name = "ceobe_canteen"
|
||||
schedule_type = "interval"
|
||||
# lwt の 推荐间隔
|
||||
schedule_setting = {"seconds": 15}
|
||||
client_mgr = CeobeCanteenClientManager
|
||||
|
||||
|
||||
class CeobeCanteen(NewMessage):
|
||||
enable_tag: bool = False
|
||||
platform_name: str = "ceobecanteen"
|
||||
name: str = "小刻食堂"
|
||||
enabled: bool = True
|
||||
is_common: bool = False
|
||||
site = CeobeCanteenSite
|
||||
has_target: bool = True
|
||||
use_batch: bool = True
|
||||
default_theme: str = "ceobecanteen"
|
||||
|
||||
categories: dict[Category, str] = {1: "普通", 2: "转发"}
|
||||
|
||||
data_source_cache = CeobeDataSourceCache()
|
||||
|
||||
comb_id = CeobeCache(timedelta(hours=12))
|
||||
cookie_id = CeobeCache(timedelta(hours=1))
|
||||
cookies = CeobeCache(timedelta(hours=1))
|
||||
|
||||
async def get_comb_id(self, target_uuids: list[str]):
|
||||
"""获取数据源的组合id"""
|
||||
payload = {"datasource_push": target_uuids}
|
||||
logger.trace(payload)
|
||||
client = await self.ctx.get_client()
|
||||
resp = await client.post(
|
||||
COMB_ID_URL,
|
||||
json=payload,
|
||||
)
|
||||
comb_id = process_response(resp, CombIdResponse).data["datasource_comb_id"]
|
||||
logger.trace(f"get comb_id: {comb_id}")
|
||||
return comb_id
|
||||
|
||||
async def get_comb_id_of_all(self):
|
||||
"""获取 "全部数据源" 的组合id,获取到的comb_id会缓存12小时"""
|
||||
logger.trace("no comb_id, request")
|
||||
target_uuids = (await self.data_source_cache.get_all()).keys()
|
||||
comb_id = await self.get_comb_id(list(target_uuids))
|
||||
|
||||
logger.trace(f"use comb_id: {comb_id}")
|
||||
return comb_id
|
||||
|
||||
async def get_latest_cookie_id(self, comb_id: str):
|
||||
"""根据comb_id获取最新cookie_id"""
|
||||
client = await self.ctx.get_client()
|
||||
resp = await client.get(f"{COOKIE_ID_URL}/{comb_id}")
|
||||
cookie_id = process_response(resp, CookieIdResponse).cookie_id
|
||||
logger.trace(f"get cookie_id: {cookie_id}")
|
||||
return cookie_id
|
||||
|
||||
async def get_cookies(self, cookie_id: str, comb_id: str | None = None):
|
||||
"""根据cookie_id、comb_id组合获取cookies"""
|
||||
client = await self.ctx.get_client()
|
||||
parmas = {
|
||||
"datasource_comb_id": comb_id,
|
||||
"cookie_id": cookie_id,
|
||||
}
|
||||
logger.trace(f"will reuquest: {parmas}")
|
||||
resp = await client.get(COOKIES_URL, params=parmas)
|
||||
return process_response(resp, CookiesResponse).data.cookies
|
||||
|
||||
async def fetch_ceobe_cookies(self) -> list[CeobeCookie]:
|
||||
if not self.comb_id:
|
||||
self.comb_id = await self.get_comb_id_of_all()
|
||||
|
||||
latest_cookie_id = await self.get_latest_cookie_id(self.comb_id)
|
||||
if not latest_cookie_id:
|
||||
return []
|
||||
|
||||
if latest_cookie_id != self.cookie_id:
|
||||
self.cookie_id = latest_cookie_id
|
||||
self.cookies = await self.get_cookies(latest_cookie_id, self.comb_id)
|
||||
|
||||
return self.cookies or []
|
||||
|
||||
async def batch_get_sub_list(self, targets: list[Target]) -> list[list[CeobeCookie]]:
|
||||
cookies = await self.fetch_ceobe_cookies()
|
||||
|
||||
dispatched_cookies: defaultdict[Target, list[CeobeCookie]] = defaultdict(list)
|
||||
for cookie in cookies:
|
||||
if ceobe_target := await self.data_source_cache.get_by_source(cookie.source):
|
||||
dispatched_cookies[Target(ceobe_target.unique_id)].append(cookie)
|
||||
|
||||
return [dispatched_cookies[target] for target in targets]
|
||||
|
||||
@classmethod
|
||||
async def get_target_name(cls, _, uuid_target: Target) -> str:
|
||||
ceobe_target = await cls.data_source_cache.get_by_unique_id(uuid_target)
|
||||
return ceobe_target.nickname if ceobe_target else "UNKNOWN"
|
||||
|
||||
@classmethod
|
||||
async def parse_target(cls, nickname: str) -> Target:
|
||||
ceobe_target = await cls.data_source_cache.get_by_nickname(nickname)
|
||||
if not ceobe_target:
|
||||
all_targets_name = [target.nickname for target in (await cls.data_source_cache.get_all()).values()]
|
||||
matched_targets_name = process.extract(nickname, all_targets_name, scorer=fuzz.token_sort_ratio, limit=3)
|
||||
logger.debug(f"possible targets: {matched_targets_name}")
|
||||
raise cls.ParseTargetException(
|
||||
prompt="未能匹配到对应的小刻食堂数据源,可能的选择有: \n"
|
||||
+ "\n".join([name for name, *_ in matched_targets_name])
|
||||
+ f"\n\n请检查原输入是否正确: {nickname}"
|
||||
)
|
||||
return Target(ceobe_target.unique_id)
|
||||
|
||||
def get_tags(self, _: RawPost) -> None:
|
||||
return
|
||||
|
||||
def get_category(self, post: CeobeCookie) -> Category:
|
||||
if post.item.is_retweeted:
|
||||
return Category(2)
|
||||
return Category(1)
|
||||
|
||||
def get_id(self, post: CeobeCookie) -> str:
|
||||
return post.item.id
|
||||
|
||||
def get_date(self, post: CeobeCookie) -> int:
|
||||
return post.timestamp.fetcher
|
||||
|
||||
async def parse(self, raw_post: CeobeCookie) -> Post:
|
||||
target = await self.data_source_cache.get_by_source(raw_post.source)
|
||||
assert target, "target not found"
|
||||
|
||||
content, pics = await self.take_snapshot(raw_post)
|
||||
|
||||
timestamp = raw_post.timestamp.platform or raw_post.timestamp.fetcher
|
||||
if timestamp:
|
||||
timestamp /= 1000 # 从毫秒级转换到秒级
|
||||
|
||||
retweet: Post | None = None
|
||||
if raw_post.item.is_retweeted and raw_post.item.retweeted:
|
||||
raw_retweet_pics = raw_post.item.retweeted.images or []
|
||||
retweet_pics = await self.parse_retweet_images(raw_retweet_pics, raw_post.source.type)
|
||||
|
||||
retweet = Post(
|
||||
self,
|
||||
nickname=raw_post.item.retweeted.author_name,
|
||||
avatar=raw_post.item.retweeted.author_avatar,
|
||||
images=list(retweet_pics),
|
||||
content=raw_post.item.retweeted.text,
|
||||
)
|
||||
|
||||
return Post(
|
||||
self,
|
||||
content,
|
||||
url=raw_post.item.url,
|
||||
nickname=raw_post.datasource,
|
||||
images=list(pics),
|
||||
timestamp=timestamp,
|
||||
avatar=target.avatar,
|
||||
description=target.platform,
|
||||
repost=retweet,
|
||||
)
|
||||
|
||||
async def snapshot_official_website(self, url: str) -> bytes:
|
||||
"""截取小刻官网的截图"""
|
||||
require("nonebot_plugin_htmlrender")
|
||||
from nonebot_plugin_htmlrender import get_new_page
|
||||
|
||||
logger.debug(f"snapshot official website url: {url}")
|
||||
|
||||
# /html/body/div[1]/div[1]/div/div[1]/div[1]/div
|
||||
snapshot_selector = "html > body > div:nth-child(1) > div:nth-child(1) > div > div:nth-child(1) > div:nth-child(1) > div" # noqa: E501
|
||||
# /html/body/div[1]/div[1]/div/div[1]/div[1]/div/div[4]/div/div/div
|
||||
calculate_selector = "html > body > div:nth-child(1) > div:nth-child(1) > div > div:nth-child(1) > div:nth-child(1) > div > div:nth-child(4) > div > div > div" # noqa: E501
|
||||
viewport = {"width": 1024, "height": 19990}
|
||||
|
||||
try:
|
||||
async with get_new_page(viewport=viewport) as page:
|
||||
await page.goto(url, wait_until="networkidle")
|
||||
element_width = await page.evaluate(
|
||||
"(selector) => document.querySelector(selector).offsetWidth", calculate_selector
|
||||
)
|
||||
logger.debug(f"element width: {element_width}")
|
||||
element_height = await page.evaluate(
|
||||
"(selector) => document.querySelector(selector).offsetHeight", calculate_selector
|
||||
)
|
||||
logger.debug(f"element height: {element_height}")
|
||||
element_height += 1000
|
||||
|
||||
await page.set_viewport_size({"width": 1024, "height": element_height})
|
||||
|
||||
element = await page.locator(snapshot_selector).element_handle()
|
||||
# add padding to make the screenshot more beautiful
|
||||
await element.evaluate("(element) => {element.style.padding = '20px';}", element)
|
||||
|
||||
pic_data = await element.screenshot(
|
||||
type="png",
|
||||
)
|
||||
except Exception as e:
|
||||
raise CeobeSnapshotFailed("渲染错误") from e
|
||||
else:
|
||||
return pic_data
|
||||
|
||||
async def snapshot_bulletin_list(self, url: str) -> bytes:
|
||||
"""截取小刻公告列表的截图"""
|
||||
selector = "body > div.main > div.container"
|
||||
viewport = {"width": 1024, "height": 19990}
|
||||
|
||||
try:
|
||||
pic_data = await capture_html(
|
||||
url,
|
||||
selector,
|
||||
timeout=30000,
|
||||
wait_until="networkidle",
|
||||
viewport=viewport,
|
||||
)
|
||||
assert pic_data
|
||||
except Exception:
|
||||
raise CeobeSnapshotFailed("渲染错误")
|
||||
else:
|
||||
return pic_data
|
||||
|
||||
async def take_snapshot(
|
||||
self,
|
||||
raw_post: CeobeCookie,
|
||||
) -> CeobeTextPic:
|
||||
"""判断数据源类型,判断是否需要截图"""
|
||||
|
||||
match raw_post.source.type:
|
||||
case "arknights-website:official-website":
|
||||
|
||||
async def owss(url: str) -> CeobeTextPic:
|
||||
return CeobeTextPic(text="", pics=[await self.snapshot_official_website(url)])
|
||||
|
||||
snapshot_func = partial(owss, raw_post.item.url)
|
||||
case "arknights-game:bulletin-list" if raw_post.item.display_type != 2:
|
||||
|
||||
async def blss(url: str) -> CeobeTextPic:
|
||||
return CeobeTextPic(text="", pics=[await self.snapshot_bulletin_list(url)])
|
||||
|
||||
snapshot_func = partial(blss, raw_post.item.url)
|
||||
case _:
|
||||
|
||||
async def npss() -> CeobeTextPic:
|
||||
raise CeobeSnapshotSkip("无需截图的数据源")
|
||||
|
||||
snapshot_func = partial(npss)
|
||||
|
||||
raw_pics = raw_post.default_cookie.images or []
|
||||
try:
|
||||
if not plugin_config.bison_use_browser:
|
||||
raise CeobeSnapshotSkip("未启用浏览器")
|
||||
res = await snapshot_func()
|
||||
except CeobeSnapshotSkip as e:
|
||||
logger.info(f"skip snapshot: {e}")
|
||||
pics = await self.parse_retweet_images(raw_pics, raw_post.source.type)
|
||||
res = CeobeTextPic(text=raw_post.default_cookie.text, pics=list(pics))
|
||||
except CeobeSnapshotFailed:
|
||||
logger.exception("snapshot failed")
|
||||
pics = await self.parse_retweet_images(raw_pics, raw_post.source.type)
|
||||
res = CeobeTextPic(text=raw_post.default_cookie.text, pics=list(pics))
|
||||
|
||||
return res
|
||||
|
||||
async def parse_retweet_images(self, images: list[CeobeImage], source_type: str) -> list[bytes] | list[str]:
|
||||
if source_type.startswith("weibo"):
|
||||
retweet_pics = await self.download_weibo_image([image.origin_url for image in images])
|
||||
else:
|
||||
retweet_pics = [image.origin_url for image in images]
|
||||
return retweet_pics
|
||||
|
||||
async def download_weibo_image(self, image_urls: list[str]) -> list[bytes]:
|
||||
headers = {"referer": "https://weibo.cn/"}
|
||||
pics = []
|
||||
async with CeobeClient(headers=headers) as client:
|
||||
for url in image_urls:
|
||||
resp = await client.get(url)
|
||||
resp.raise_for_status()
|
||||
pics.append(resp.content)
|
||||
return pics
|
||||
@@ -0,0 +1,20 @@
|
||||
from httpx import Response
|
||||
from nonebot import logger
|
||||
from nonebot.compat import type_validate_python
|
||||
|
||||
from .exception import CeobeResponseError
|
||||
from .models import ResponseModel, CookieIdResponse
|
||||
|
||||
|
||||
def process_response(response: Response, parse_model: type[ResponseModel]) -> ResponseModel:
|
||||
response.raise_for_status()
|
||||
logger.trace(f"小刻食堂请求结果: {response.json().get('message')} {parse_model=}")
|
||||
|
||||
try:
|
||||
data = type_validate_python(parse_model, response.json())
|
||||
except Exception as e:
|
||||
raise CeobeResponseError(f"解析小刻食堂响应失败: {e}")
|
||||
|
||||
if not isinstance(data, CookieIdResponse) and data.code != 0:
|
||||
raise CeobeResponseError(f"获取饼数据失败: {data.message}")
|
||||
return data
|
||||
@@ -31,8 +31,8 @@ class Post(AbstractPost):
|
||||
"""标题"""
|
||||
images: list[str | bytes | Path | BytesIO] | None = None
|
||||
"""图片列表"""
|
||||
timestamp: int | None = None
|
||||
"""发布/获取时间戳"""
|
||||
timestamp: float | None = None
|
||||
"""发布/获取时间戳, 秒"""
|
||||
url: str | None = None
|
||||
"""来源链接"""
|
||||
avatar: str | bytes | Path | BytesIO | None = None
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING, Literal
|
||||
|
||||
from nonebot_plugin_saa import Text, Image, MessageSegmentFactory
|
||||
|
||||
from nonebot_bison.utils import text_fletten
|
||||
from nonebot_bison.theme.utils import web_embed_image
|
||||
from nonebot_bison.theme import Theme, ThemeRenderError, ThemeRenderUnsupportError
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -34,16 +37,21 @@ class ArknightsTheme(Theme):
|
||||
|
||||
if not post.title:
|
||||
raise ThemeRenderUnsupportError("标题为空")
|
||||
if post.images and len(post.images) > 1:
|
||||
raise ThemeRenderUnsupportError("图片数量大于1")
|
||||
|
||||
banner = post.images[0] if post.images else None
|
||||
|
||||
if banner is not None and not isinstance(banner, str | Path):
|
||||
raise ThemeRenderUnsupportError(f"图片类型错误, 期望 str 或 Path, 实际为 {type(banner)}")
|
||||
match banner:
|
||||
case bytes() | BytesIO():
|
||||
banner = web_embed_image(banner)
|
||||
case str() | Path() | None:
|
||||
pass
|
||||
case _:
|
||||
raise ThemeRenderUnsupportError(
|
||||
f"图片类型错误, 期望 str | Path | bytes | BytesIO | None, 实际为 {type(banner)}"
|
||||
)
|
||||
|
||||
ark_data = ArkData(
|
||||
announce_title=post.title,
|
||||
announce_title=text_fletten(post.title),
|
||||
content=post.content,
|
||||
banner_image_url=banner,
|
||||
)
|
||||
@@ -64,6 +72,10 @@ class ArknightsTheme(Theme):
|
||||
raise ThemeRenderError(f"渲染文本失败: {e}")
|
||||
msgs: list[MessageSegmentFactory] = []
|
||||
msgs.append(Image(announce_pic))
|
||||
|
||||
if post.url:
|
||||
msgs.append(Text(f"前往:{post.url}"))
|
||||
return [Image(announce_pic)]
|
||||
if post.images:
|
||||
msgs.extend(Image(img) for img in post.images[1:])
|
||||
|
||||
return msgs
|
||||
|
||||
@@ -3,8 +3,12 @@
|
||||
## LOGO图片
|
||||
|
||||
- `templates/ceobecanteen_logo.png`
|
||||
- `templates/bison_logo.png`
|
||||
|
||||
### 版权声明
|
||||
|
||||
<a rel="license" href="http://creativecommons.org/licenses/by-nc-sa/4.0/"><img alt="知识共享许可协议" style="border-width:0" src="https://i.creativecommons.org/l/by-nc-sa/4.0/88x31.png" /></a><br />logo图片采用<a rel="license" href="http://creativecommons.org/licenses/by-nc-sa/4.0/">知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议</a>进行许可。
|
||||
本项目<img src="templates/ceobecanteen_logo.png" style="width:100px">使用已经过 [Ceobe Canteen](https://github.com/Enraged-Dun-Cookie-Development-Team) 授权许可使用。
|
||||
<a rel="license" href="http://creativecommons.org/licenses/by-nc-sa/4.0/"><img alt="知识共享许可协议" style="border-width:0" src="https://i.creativecommons.org/l/by-nc-sa/4.0/88x31.png" /></a><br />所声明的 LOGO 图片采用<a rel="license" href="http://creativecommons.org/licenses/by-nc-sa/4.0/">知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议</a>进行许可。
|
||||
|
||||
本项目使用的 小刻食堂 LOGO <img src="templates/ceobecanteen_logo.png" alt="ceobecanteen-logo" style="width:100px">已经过 [小刻食堂](https://github.com/Enraged-Dun-Cookie-Development-Team) 授权许可使用。
|
||||
|
||||
本项目使用的 Nonebot-Bison LOGO <img src="templates/bison_logo.png" alt="nonebot-bison-logo" style="width:100px">由画师 不画涩图の企鹅 倾情贡献,非常感谢!
|
||||
|
||||
@@ -4,12 +4,15 @@ from datetime import datetime
|
||||
from typing import TYPE_CHECKING, Literal
|
||||
|
||||
import jinja2
|
||||
from yarl import URL
|
||||
from httpx import AsyncClient
|
||||
from pydantic import BaseModel
|
||||
from PIL import Image as PILImage
|
||||
from nonebot_plugin_saa import Text, Image, MessageSegmentFactory
|
||||
|
||||
from nonebot_bison.compat import model_validator
|
||||
from nonebot_bison.theme.utils import convert_to_qr
|
||||
from nonebot_bison.utils.image import pic_merge, is_pics_mergable
|
||||
from nonebot_bison.utils import pic_merge, is_pics_mergable
|
||||
from nonebot_bison.theme.utils import convert_to_qr, web_embed_image
|
||||
from nonebot_bison.theme import Theme, ThemeRenderError, ThemeRenderUnsupportError
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -35,13 +38,26 @@ class CeoboContent(BaseModel):
|
||||
text: 文字内容
|
||||
"""
|
||||
|
||||
image: str | None = None
|
||||
text: str
|
||||
|
||||
|
||||
class CeoboRetweet(BaseModel):
|
||||
"""卡片的转发部分
|
||||
|
||||
author: 原作者
|
||||
image: 图片链接
|
||||
content: 文字内容
|
||||
"""
|
||||
|
||||
image: str | None
|
||||
text: str | None
|
||||
content: str | None
|
||||
author: str
|
||||
|
||||
@model_validator(mode="before")
|
||||
def check(cls, values):
|
||||
if values["image"] is None and values["text"] is None:
|
||||
raise ValueError("image and text cannot be both None")
|
||||
if values["image"] is None and values["content"] is None:
|
||||
raise ValueError("image and content cannot be both None")
|
||||
return values
|
||||
|
||||
|
||||
@@ -49,6 +65,7 @@ class CeobeCard(BaseModel):
|
||||
info: CeobeInfo
|
||||
content: CeoboContent
|
||||
qr: str | None
|
||||
retweet: CeoboRetweet | None
|
||||
|
||||
|
||||
class CeobeCanteenTheme(Theme):
|
||||
@@ -63,8 +80,8 @@ class CeobeCanteenTheme(Theme):
|
||||
template_path: Path = Path(__file__).parent / "templates"
|
||||
template_name: str = "ceobe_canteen.html.jinja"
|
||||
|
||||
def parse(self, post: "Post") -> CeobeCard:
|
||||
"""解析 Post 为 CeobeCard"""
|
||||
async def parse(self, post: "Post") -> tuple[CeobeCard, list[str | bytes | Path | BytesIO]]:
|
||||
"""解析 Post 为 CeobeCard与处理好的图片列表"""
|
||||
if not post.nickname:
|
||||
raise ThemeRenderUnsupportError("post.nickname is None")
|
||||
if not post.timestamp:
|
||||
@@ -73,15 +90,96 @@ class CeobeCanteenTheme(Theme):
|
||||
datasource=post.nickname, time=datetime.fromtimestamp(post.timestamp).strftime("%Y-%m-%d %H:%M:%S")
|
||||
)
|
||||
|
||||
head_pic = post.images[0] if post.images else None
|
||||
if head_pic is not None and not isinstance(head_pic, str):
|
||||
raise ThemeRenderUnsupportError("post.images[0] is not str")
|
||||
http_client = await post.platform.ctx.get_client_for_static()
|
||||
images: list[str | bytes | Path | BytesIO] = []
|
||||
if post.images:
|
||||
images = await self.merge_pics(post.images, http_client)
|
||||
|
||||
content = CeoboContent(image=head_pic, text=post.content)
|
||||
return CeobeCard(info=info, content=content, qr=convert_to_qr(post.url or "No URL"))
|
||||
content = CeoboContent(text=post.content)
|
||||
|
||||
retweet: CeoboRetweet | None = None
|
||||
if post.repost:
|
||||
repost_head_pic: str | None = None
|
||||
if post.repost.images:
|
||||
repost_images = await self.merge_pics(post.repost.images, http_client)
|
||||
repost_head_pic = self.extract_head_pic(repost_images)
|
||||
images.extend(repost_images)
|
||||
|
||||
repost_nickname = f"@{post.repost.nickname}:" if post.repost.nickname else ""
|
||||
retweet = CeoboRetweet(image=repost_head_pic, content=post.repost.content, author=repost_nickname)
|
||||
|
||||
return (
|
||||
CeobeCard(
|
||||
info=info,
|
||||
content=content,
|
||||
qr=web_embed_image(convert_to_qr(post.url or "No URL", back_color=(240, 236, 233))),
|
||||
retweet=retweet,
|
||||
),
|
||||
images,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
async def merge_pics(
|
||||
images: list[str | bytes | Path | BytesIO],
|
||||
client: AsyncClient,
|
||||
) -> list[str | bytes | Path | BytesIO]:
|
||||
if is_pics_mergable(images):
|
||||
pics = await pic_merge(images, client)
|
||||
else:
|
||||
pics = images
|
||||
return list(pics)
|
||||
|
||||
@staticmethod
|
||||
def extract_head_pic(pics: list[str | bytes | Path | BytesIO]) -> str:
|
||||
head_pic = web_embed_image(pics[0]) if not isinstance(pics[0], str) else pics[0]
|
||||
return head_pic
|
||||
|
||||
@staticmethod
|
||||
def card_link(head_pic: PILImage.Image, card_body: PILImage.Image) -> PILImage.Image:
|
||||
"""将头像与卡片合并"""
|
||||
|
||||
def resize_image(img: PILImage.Image, size: tuple[int, int]) -> PILImage.Image:
|
||||
return img.resize(size)
|
||||
|
||||
# 统一图片宽度
|
||||
head_pic_w, head_pic_h = head_pic.size
|
||||
card_body_w, card_body_h = card_body.size
|
||||
|
||||
if head_pic_w > card_body_w:
|
||||
head_pic = resize_image(head_pic, (card_body_w, int(head_pic_h * card_body_w / head_pic_w)))
|
||||
else:
|
||||
card_body = resize_image(card_body, (head_pic_w, int(card_body_h * head_pic_w / card_body_w)))
|
||||
|
||||
# 合并图片
|
||||
card = PILImage.new("RGBA", (head_pic.width, head_pic.height + card_body.height))
|
||||
card.paste(head_pic, (0, 0))
|
||||
card.paste(card_body, (0, head_pic.height))
|
||||
return card
|
||||
|
||||
async def render(self, post: "Post") -> list[MessageSegmentFactory]:
|
||||
ceobe_card = self.parse(post)
|
||||
ceobe_card, merged_images = await self.parse(post)
|
||||
|
||||
need_card_link: bool = True
|
||||
head_pic = None
|
||||
|
||||
# 如果没有 post.images,则全部都是转发里的图片,不需要头图
|
||||
if post.images:
|
||||
match merged_images[0]:
|
||||
case bytes():
|
||||
head_pic = merged_images[0]
|
||||
merged_images = merged_images[1:]
|
||||
case BytesIO():
|
||||
head_pic = merged_images[0].getvalue()
|
||||
merged_images = merged_images[1:]
|
||||
case str(s) if URL(s).scheme in ("http", "https"):
|
||||
ceobe_card.content.image = merged_images[0]
|
||||
need_card_link = False
|
||||
case Path():
|
||||
ceobe_card.content.image = merged_images[0].as_uri()
|
||||
need_card_link = False
|
||||
case _:
|
||||
raise ThemeRenderError(f"Unknown image type: {type(merged_images[0])}")
|
||||
|
||||
from nonebot_plugin_htmlrender import get_new_page
|
||||
|
||||
template_env = jinja2.Environment(
|
||||
@@ -91,7 +189,8 @@ class CeobeCanteenTheme(Theme):
|
||||
template = template_env.get_template(self.template_name)
|
||||
html = await template.render_async(card=ceobe_card)
|
||||
pages = {
|
||||
"viewport": {"width": 1000, "height": 3000},
|
||||
"device_scale_factor": 2,
|
||||
"viewport": {"width": 512, "height": 455},
|
||||
"base_url": self.template_path.as_uri(),
|
||||
}
|
||||
try:
|
||||
@@ -99,12 +198,24 @@ class CeobeCanteenTheme(Theme):
|
||||
await page.goto(self.template_path.as_uri())
|
||||
await page.set_content(html)
|
||||
await page.wait_for_timeout(1)
|
||||
img_raw = await page.locator("#ceobecanteen-card").screenshot(
|
||||
type="png",
|
||||
card_body = await page.locator("#ceobecanteen-card").screenshot(
|
||||
type="jpeg",
|
||||
quality=90,
|
||||
)
|
||||
except Exception as e:
|
||||
raise ThemeRenderError(f"Render error: {e}") from e
|
||||
msgs: list[MessageSegmentFactory] = [Image(img_raw)]
|
||||
|
||||
msgs: list[MessageSegmentFactory] = []
|
||||
if need_card_link and head_pic:
|
||||
card_pil = self.card_link(
|
||||
head_pic=PILImage.open(BytesIO(head_pic)),
|
||||
card_body=PILImage.open(BytesIO(card_body)),
|
||||
)
|
||||
card_data = BytesIO()
|
||||
card_pil.save(card_data, format="PNG")
|
||||
msgs.append(Image(card_data.getvalue()))
|
||||
else:
|
||||
msgs.append(Image(card_body))
|
||||
|
||||
text = f"来源: {post.platform.name} {post.nickname or ''}\n"
|
||||
if post.url:
|
||||
|
||||
Binary file not shown.
|
Before Width: | Height: | Size: 106 KiB |
Binary file not shown.
|
After Width: | Height: | Size: 640 KiB |
@@ -11,6 +11,19 @@
|
||||
{% endif %}
|
||||
{% if card.content.text %}
|
||||
<div class="main-content">{{ card.content.text }}</div>
|
||||
{% if card.retweet %}
|
||||
<div class="retweet">
|
||||
{% if card.retweet.author %}
|
||||
<div class="origin-author">{{ card.retweet.author }}</div>
|
||||
{% endif %}
|
||||
{% if card.retweet.content %}
|
||||
<div class="retweet-content">{{ card.retweet.content }}</div>
|
||||
{% endif %}
|
||||
{% if card.retweet.image %}
|
||||
<img class='retweet-image' src="{{ card.retweet.image }}">
|
||||
{% endif %}
|
||||
</div>
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
<div class="footer">
|
||||
<div class="datasource">
|
||||
@@ -21,7 +34,7 @@
|
||||
<img class='qr' src="{{ card.qr }}">
|
||||
</div>
|
||||
<div class="source">
|
||||
<img class='bison-logo' src="bison_logo.jpg">
|
||||
<img class='bison-logo' src="bison_logo.png">
|
||||
<div class="source-text">
|
||||
<div class="slogan">小刻吃到饼啦!</div>
|
||||
<div class="linkage">bison&小刻食堂联动</div>
|
||||
@@ -46,6 +59,16 @@
|
||||
padding: 30px;
|
||||
white-space: pre-line;
|
||||
}
|
||||
.retweet {
|
||||
margin: -20px 30px 20px;
|
||||
background-color: rgb(226, 223, 219);
|
||||
border: solid 1px rgb(212, 210, 207);
|
||||
border-radius: 3px;
|
||||
padding: 5px;
|
||||
}
|
||||
.retweet .retweet-image {
|
||||
width: 100%;
|
||||
}
|
||||
.footer {
|
||||
margin: 0 2%;
|
||||
height: 80px;
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
from base64 import b64encode
|
||||
|
||||
from qrcode import constants
|
||||
from qrcode.main import QRCode
|
||||
from qrcode.image.svg import SvgFragmentImage
|
||||
from qrcode.image.pil import PilImage
|
||||
|
||||
|
||||
def convert_to_qr(data: str) -> str:
|
||||
def convert_to_qr(data: str, **kwarg) -> bytes:
|
||||
"""Convert data to QR code
|
||||
Args:
|
||||
data (str): data to be converted
|
||||
@@ -15,8 +19,24 @@ def convert_to_qr(data: str) -> str:
|
||||
error_correction=constants.ERROR_CORRECT_L,
|
||||
box_size=10,
|
||||
border=2,
|
||||
image_factory=SvgFragmentImage,
|
||||
image_factory=PilImage,
|
||||
)
|
||||
qr.add_data(data)
|
||||
qr.make(fit=True)
|
||||
return qr.make_image().to_string().decode("utf-8")
|
||||
f = BytesIO()
|
||||
qr.make_image(**kwarg).save(f)
|
||||
return f.getvalue()
|
||||
|
||||
|
||||
def web_embed_image(pic_data: bytes | Path | BytesIO, *, ext: str = "png"):
|
||||
"""将图片数据转换为Base64编码的Data URI"""
|
||||
match pic_data:
|
||||
case bytes():
|
||||
pic_bytes = pic_data
|
||||
case Path():
|
||||
pic_bytes = Path(pic_data).read_bytes()
|
||||
case BytesIO():
|
||||
pic_bytes = pic_data.getvalue()
|
||||
case _:
|
||||
raise TypeError("pic_data must be bytes, Path or BytesIO")
|
||||
return f"data:image/{ext};base64,{b64encode(pic_bytes).decode()}"
|
||||
|
||||
@@ -12,6 +12,7 @@ from .site import Site as Site
|
||||
from ..plugin_config import plugin_config
|
||||
from .image import pic_merge as pic_merge
|
||||
from .http import http_client as http_client
|
||||
from .image import capture_html as capture_html
|
||||
from .site import ClientManager as ClientManager
|
||||
from .image import text_to_image as text_to_image
|
||||
from .site import anonymous_site as anonymous_site
|
||||
@@ -108,3 +109,8 @@ def decode_unicode_escapes(s: str):
|
||||
|
||||
regex = re.compile(r"\\[rnt]|\\u[0-9a-fA-F]{4}")
|
||||
return regex.sub(decode_match, s)
|
||||
|
||||
|
||||
def text_fletten(text: str, *, banned: str = "\n\r\t", replace: str = " ") -> str:
|
||||
"""将文本中的格式化字符去除"""
|
||||
return "".join(c if c not in banned else replace for c in text)
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
from io import BytesIO
|
||||
from typing import TypeGuard
|
||||
from functools import partial
|
||||
from typing import Literal, TypeGuard
|
||||
|
||||
from yarl import URL
|
||||
from PIL import Image
|
||||
from httpx import AsyncClient
|
||||
from nonebot import logger, require
|
||||
@@ -96,7 +97,11 @@ async def pic_merge(pics: list[str | bytes], http_client: AsyncClient) -> list[s
|
||||
|
||||
|
||||
def is_pics_mergable(imgs: list) -> TypeGuard[list[str | bytes]]:
|
||||
return all(isinstance(img, str | bytes) for img in imgs)
|
||||
if any(not isinstance(img, str | bytes) for img in imgs):
|
||||
return False
|
||||
|
||||
url = [URL(img) for img in imgs if isinstance(img, str)]
|
||||
return all(u.scheme in ("http", "https") for u in url)
|
||||
|
||||
|
||||
async def text_to_image(saa_text: SaaText) -> SaaImage:
|
||||
@@ -108,3 +113,32 @@ async def text_to_image(saa_text: SaaText) -> SaaImage:
|
||||
|
||||
render_data = await text_to_pic(str(saa_text))
|
||||
return SaaImage(render_data)
|
||||
|
||||
|
||||
async def capture_html(
|
||||
url: str,
|
||||
selector: str,
|
||||
timeout: float = 0,
|
||||
type: Literal["jpeg", "png"] = "png",
|
||||
quality: int | None = None,
|
||||
wait_until: Literal["commit", "domcontentloaded", "load", "networkidle"] | None = None,
|
||||
viewport: dict = {"width": 1024, "height": 990},
|
||||
device_scale_factor: int = 2,
|
||||
**page_kwargs,
|
||||
) -> bytes | None:
|
||||
"""
|
||||
将给定的url网页的指定CSS选择器部分渲染成图片
|
||||
|
||||
timeout: 超时时间,单位毫秒
|
||||
"""
|
||||
require("nonebot_plugin_htmlrender")
|
||||
from nonebot_plugin_htmlrender import get_new_page
|
||||
|
||||
assert url
|
||||
async with get_new_page(device_scale_factor=device_scale_factor, viewport=viewport, **page_kwargs) as page:
|
||||
await page.goto(url, timeout=timeout, wait_until=wait_until)
|
||||
pic_data = await page.locator(selector).screenshot(
|
||||
type=type,
|
||||
quality=quality,
|
||||
)
|
||||
return pic_data
|
||||
|
||||
Reference in New Issue
Block a user