mirror of
https://github.com/suyiiyii/nonebot-bison.git
synced 2025-06-04 02:26:11 +08:00
139 lines
4.3 KiB
Python
139 lines
4.3 KiB
Python
import json
|
||
from typing import Literal
|
||
from abc import ABC, abstractmethod
|
||
from datetime import datetime, timedelta
|
||
|
||
import httpx
|
||
from httpx import AsyncClient
|
||
from nonebot.log import logger
|
||
|
||
from ..types import Target
|
||
from ..config import config
|
||
from .http import http_client
|
||
from ..config.db_model import Cookie
|
||
|
||
|
||
class ClientManager(ABC):
|
||
@abstractmethod
|
||
async def get_client(self, target: Target | None) -> AsyncClient: ...
|
||
|
||
@abstractmethod
|
||
async def get_client_for_static(self) -> AsyncClient: ...
|
||
|
||
@abstractmethod
|
||
async def get_query_name_client(self) -> AsyncClient: ...
|
||
|
||
@abstractmethod
|
||
async def refresh_client(self): ...
|
||
|
||
|
||
class DefaultClientManager(ClientManager):
|
||
async def get_client(self, target: Target | None) -> AsyncClient:
|
||
return http_client()
|
||
|
||
async def get_client_for_static(self) -> AsyncClient:
|
||
return http_client()
|
||
|
||
async def get_query_name_client(self) -> AsyncClient:
|
||
return http_client()
|
||
|
||
async def refresh_client(self):
|
||
pass
|
||
|
||
|
||
class CookieClientManager(ClientManager):
|
||
_platform_name: str
|
||
_cookie_cd: int = 10
|
||
|
||
def _generate_hook(self, cookie: Cookie):
|
||
"""hook函数生成器,用于回写请求状态到数据库"""
|
||
|
||
async def _response_hook(resp: httpx.Response):
|
||
if cookie.content == "{}":
|
||
logger.debug(f"未携带bison cookie: {resp.request.url}")
|
||
return
|
||
if resp.status_code == 200:
|
||
logger.debug(f"请求成功: {cookie.id} {resp.request.url}")
|
||
cookie.status = "success"
|
||
else:
|
||
logger.error(f"请求失败:{cookie.id} {resp.request.url}, 状态码: {resp.status_code}")
|
||
cookie.status = "failed"
|
||
cookie.last_usage = datetime.now()
|
||
await config.update_cookie(cookie)
|
||
|
||
return _response_hook
|
||
|
||
async def _choose_cookie(self, target: Target) -> Cookie:
|
||
"""选择 cookie 的具体算法"""
|
||
if not target:
|
||
return Cookie(content="{}")
|
||
cookies = await config.get_cookie_by_target(target, self._platform_name)
|
||
cookies = (
|
||
cookie for cookie in cookies if cookie.last_usage + timedelta(seconds=self._cookie_cd) < datetime.now()
|
||
)
|
||
if not cookies:
|
||
return Cookie(content="{}")
|
||
cookie = max(cookies, key=lambda x: x.last_usage)
|
||
return cookie
|
||
|
||
async def get_client(self, target: Target | None) -> AsyncClient:
|
||
"""获取 client,根据 target 选择 cookie"""
|
||
client = http_client()
|
||
cookie = await self._choose_cookie(target)
|
||
if cookie.content != "{}":
|
||
logger.debug(f"平台 {self._platform_name} 获取到用户cookie: {cookie.id}")
|
||
else:
|
||
logger.debug(f"平台 {self._platform_name} 未获取到用户cookie, 使用空cookie")
|
||
|
||
return await self.assemble_client(client, cookie)
|
||
|
||
async def assemble_client(self, client, cookie):
|
||
cookies = httpx.Cookies()
|
||
if cookie:
|
||
cookies.update(json.loads(cookie.content))
|
||
client.cookies = cookies
|
||
client._bison_cookie = cookie
|
||
client.event_hooks = {"response": [self._generate_hook(cookie)]}
|
||
return client
|
||
|
||
async def get_client_for_static(self) -> AsyncClient:
|
||
return http_client()
|
||
|
||
async def get_query_name_client(self) -> AsyncClient:
|
||
return http_client()
|
||
|
||
async def refresh_client(self):
|
||
pass
|
||
|
||
|
||
def create_cookie_client_manager(platform_name: str) -> type[CookieClientManager]:
|
||
"""创建一个平台特化的 CookieClientManger"""
|
||
return type(
|
||
"CookieClientManager",
|
||
(CookieClientManager,),
|
||
{"_platform_name": platform_name},
|
||
)
|
||
|
||
|
||
class Site:
|
||
schedule_type: Literal["date", "interval", "cron"]
|
||
schedule_setting: dict
|
||
name: str
|
||
client_mgr: type[ClientManager] = DefaultClientManager
|
||
require_browser: bool = False
|
||
|
||
def __str__(self):
|
||
return f"[{self.name}]-{self.name}-{self.schedule_setting}"
|
||
|
||
|
||
def anonymous_site(schedule_type: Literal["date", "interval", "cron"], schedule_setting: dict) -> type[Site]:
|
||
return type(
|
||
"AnonymousSite",
|
||
(Site,),
|
||
{
|
||
"schedule_type": schedule_type,
|
||
"schedule_setting": schedule_setting,
|
||
"client_mgr": DefaultClientManager,
|
||
},
|
||
)
|