Azide e2a97a9e56
适配小刻食堂平台 (#379)
* 🐛 插入新的Schedulable时应传入use_batch参数

*  适配ceobecanteen平台

Co-authored-by: phidiaLam <2957035701@qq.com>

*   明日方舟公告与官网采用截图分享 (#480)

*  明日方舟公告与官网采用截图分享

* 💄 auto fix by pre-commit hooks

* 🐛 修复缺少的导入,优化逻辑

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Azide <rukuy@qq.com>

* 🐛 优化截图图片效果

* 🐛 修复错误将转发内图片视作头图的问题

* 🍱 使用正式 Bison Logo

* 💄 auto fix by pre-commit hooks

* 🐛 请求小刻API时不在headers里添加过多字段

* 🐛 get_comb_id方法删除无用的targets参数

* 💡 get_comb_id方法更新注释

* 🔥 移除发送部分的更改

*  在命名中明确表示cond_func意图

* ♻️ 拆分get_comb_id功能

* ♻️ 调整缓存逻辑

*  使用uri在theme中调用platform截图

* ♻️ 重构截图逻辑

*  添加模糊匹配提示

*  适配新版Site

* 💄 auto fix by pre-commit hooks

* 🐛 去掉不必要的排序

* 🐛 修正不应出现的驼峰变量名

* ♻️ 按review意见修改

* ♻️ 调整截图函数逻辑

* 🔊 调低日志等级

* ✏️ 修复一些拼写和格式

---------

Co-authored-by: phidiaLam <2957035701@qq.com>
Co-authored-by: 洛梧藤 <67498817+phidiaLam@users.noreply.github.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
2024-07-13 01:06:42 +08:00

120 lines
3.8 KiB
Python

from typing import TypeAlias
from functools import partial
from datetime import timedelta
from types import MappingProxyType
from collections.abc import Callable
from httpx import AsyncClient, AsyncHTTPTransport
from expiringdictx import SimpleCache, ExpiringDict
from hishel import Controller, AsyncCacheTransport, AsyncInMemoryStorage
from .const import DATASOURCE_URL
from .utils import process_response
from .models import CeobeSource, CeobeTarget, DataSourceResponse
cache_transport = AsyncCacheTransport(
AsyncHTTPTransport(),
storage=AsyncInMemoryStorage(),
controller=Controller(
always_revalidate=True,
),
)
CeobeClient = partial(
AsyncClient,
transport=cache_transport,
)
UniqueId: TypeAlias = str
class CeobeCache:
# 不在 __init__ 中初始化,让多个实例共享一个缓存
_cache = SimpleCache()
def __init__(self, lifetime: timedelta, store_key: str | None = None):
self.store_key = store_key
self.lifetime = lifetime
def __set_name__(self, owner, name: str):
self.key = self.store_key or name
def __get__(self, instance, owner):
return self._cache.get(self.key)
def __set__(self, instance, value):
self._cache[self.key, self.lifetime] = value
class CeobeDataSourceCache:
"""数据源缓存, 以unique_id为key存储数据源"""
def __init__(self):
self._cache = ExpiringDict[UniqueId, CeobeTarget](capacity=100, default_age=timedelta(days=1))
self.client = CeobeClient()
self.url = DATASOURCE_URL
@property
def cache(self) -> MappingProxyType[str, CeobeTarget]:
return MappingProxyType(self._cache)
async def refresh_data_sources(self) -> MappingProxyType[UniqueId, CeobeTarget]:
"""请求数据源API刷新缓存"""
data_sources_resp = await self.client.get(self.url)
data_sources = process_response(data_sources_resp, DataSourceResponse).data
for ds in data_sources:
self._cache[ds.unique_id] = ds
return self.cache
async def get_all(self, force_refresh: bool = False) -> MappingProxyType[UniqueId, CeobeTarget]:
"""获取所有数据源, 如果缓存为空则尝试刷新缓存"""
if not self.cache or force_refresh:
await self.refresh_data_sources()
return self.cache
def select_one(self, cond_func: Callable[[CeobeTarget], bool]) -> CeobeTarget | None:
"""根据条件获取数据源
不会刷新缓存
"""
cache = self._cache.values()
return next(filter(cond_func, cache), None)
async def get_by_unique_id(self, unique_id: str) -> CeobeTarget | None:
"""根据unique_id获取数据源
如果在缓存中找不到,会刷新缓存
"""
if target := self._cache.get(unique_id):
return target
await self.refresh_data_sources()
return self._cache.get(unique_id)
async def get_by_nickname(self, nickname: str) -> CeobeTarget | None:
"""根据nickname获取数据源
如果在缓存中找不到,会刷新缓存
"""
def select_by_nickname(target: CeobeTarget):
return target.nickname == nickname
if target := self.select_one(select_by_nickname):
return target
await self.refresh_data_sources()
return self.select_one(select_by_nickname)
async def get_by_source(self, source: CeobeSource) -> CeobeTarget | None:
"""根据source获取数据源
如果在缓存中找不到,会刷新缓存
"""
def select_by_source(target: CeobeTarget):
return target.db_unique_key == source.data and target.datasource == source.type
if target := self.select_one(select_by_source):
return target
await self.refresh_data_sources()
return self.select_one(select_by_source)