From 65a5976897382876178cebea859bf7f55cc3c6c9 Mon Sep 17 00:00:00 2001 From: suyiiyii Date: Mon, 9 Sep 2024 11:06:59 +0800 Subject: [PATCH] :recycles: CookieClientManager --- nonebot_bison/utils/site.py | 225 ++++++++++++++++++------------------ 1 file changed, 112 insertions(+), 113 deletions(-) diff --git a/nonebot_bison/utils/site.py b/nonebot_bison/utils/site.py index 1ad2f4e..69503b7 100644 --- a/nonebot_bison/utils/site.py +++ b/nonebot_bison/utils/site.py @@ -46,122 +46,121 @@ class DefaultClientManager(ClientManager): def is_cookie_client_manager(manger: type[ClientManager]) -> bool: return hasattr(manger, "_cookie_client_manger_") -# TODO: Implement CookieClientManager class CookieClientManager(ClientManager): -# _cookie_client_manger_ = True - _platform_name: str -# _cookie_cd: int = 10 -# -# @classmethod -# async def init_universal_cookie(cls): -# """移除已有的匿名cookie,添加一个新的匿名cookie""" -# universal_cookies = await config.get_unviersal_cookie(cls._platform_name) -# universal_cookie = Cookie( -# platform_name=cls._platform_name, content="{}", is_universal=True, tags={"temporary": True} -# ) -# for cookie in universal_cookies: -# if not cookie.tags.get("temporary"): -# continue -# await config.delete_cookie_by_id(cookie.id) -# universal_cookie.id = cookie.id # 保持原有的id -# await config.add_cookie(universal_cookie) -# -# @classmethod -# async def init_cookie(cls, cookie: Cookie) -> Cookie: -# """初始化 cookie,添加用户 cookie 时使用""" -# cookie.cd = cls._cookie_cd -# cookie.last_usage = datetime.now() # 使得优先使用用户 cookie -# return cookie -# -# @classmethod -# async def valid_cookie(cls, content: str) -> bool: -# """验证 cookie 内容是否有效,添加 cookie 时用,可根据平台的具体情况进行重写""" -# try: -# data = json.loads(content) -# if not isinstance(data, dict): -# return False -# except JSONDecodeError: -# return False -# return True -# -# @classmethod -# async def get_cookie_friendly_name(cls, cookie: Cookie) -> str: -# """获取 cookie 的友好名字,用于展示""" -# from . import text_fletten -# -# return text_fletten(f"{cookie.platform_name} [{cookie.content[:10]}]") -# -# def _generate_hook(self, cookie: Cookie) -> callable: -# """hook 函数生成器,用于回写请求状态到数据库""" -# -# async def _response_hook(resp: httpx.Response): -# if resp.status_code == 200: -# logger.debug(f"请求成功: {cookie.id} {resp.request.url}") -# cookie.status = "success" -# else: -# logger.error(f"请求失败:{cookie.id} {resp.request.url}, 状态码: {resp.status_code}") -# cookie.status = "failed" -# cookie.last_usage = datetime.now() -# await config.update_cookie(cookie) -# -# return _response_hook -# -# async def _choose_cookie(self, target: Target) -> Cookie: -# """选择 cookie 的具体算法""" -# cookies = await config.get_universal_cookie(self._platform_name) -# if target: -# cookies += await config.get_cookie(self._platform_name, target) -# cookies = (cookie for cookie in cookies if cookie.last_usage + timedelta(seconds=cookie.cd) < datetime.now()) -# cookie = min(cookies, key=lambda x: x.last_usage) -# return cookie -# -# async def _check_cookie(self, cookie: Cookie) -> Cookie: -# """检查Cookie,可以做一些自定义的逻辑,比如说Site的统一风控""" -# return cookie -# -# async def get_client(self, target: Target | None) -> AsyncClient: -# """获取 client,根据 target 选择 cookie""" -# client = http_client() -# cookie = await self._choose_cookie(target) -# if cookie.is_universal: -# logger.debug(f"平台 {self._platform_name} 未获取到用户cookie, 使用匿名cookie") -# else: -# logger.debug(f"平台 {self._platform_name} 获取到用户cookie: {cookie.id}") -# -# return await self._assemble_client(client, cookie) -# -# async def _assemble_client(self, client, cookie) -> AsyncClient: -# """组装 client,可以自定义 cookie 对象的 content 装配到 client 中的方式""" -# cookies = httpx.Cookies() -# if cookie: -# cookies.update(json.loads(cookie.content)) -# client.cookies = cookies -# client._bison_cookie = cookie -# client.event_hooks = {"response": [self._generate_hook(cookie)]} -# return client -# -# async def get_client_for_static(self) -> AsyncClient: -# return http_client() -# -# async def get_query_name_client(self) -> AsyncClient: -# return http_client() -# -# async def refresh_client(self): -# pass -# -# -# def create_cookie_client_manager(platform_name: str) -> type[CookieClientManager]: -# """创建一个平台特化的 CookieClientManger""" -# return type( -# "CookieClientManager", -# (CookieClientManager,), -# {"_platform_name": platform_name}, -# ) + _cookie_client_manger_ = True + _site_name: str + _cookie_cd: int = 10 -def create_cookie_client_manager(platform_name: str): + @classmethod + async def init_universal_cookie(cls): + """移除已有的匿名cookie,添加一个新的匿名cookie""" + universal_cookies = await config.get_unviersal_cookie(cls._site_name) + universal_cookie = Cookie( + site_name=cls._site_name, content="{}", is_universal=True, tags={"temporary": True} + ) + for cookie in universal_cookies: + if not cookie.tags.get("temporary"): + continue + await config.delete_cookie_by_id(cookie.id) + universal_cookie.id = cookie.id # 保持原有的id + await config.add_cookie(universal_cookie) + + @classmethod + async def init_cookie(cls, cookie: Cookie) -> Cookie: + """初始化 cookie,添加用户 cookie 时使用""" + cookie.cd = cls._cookie_cd + cookie.last_usage = datetime.now() # 使得优先使用用户 cookie + return cookie + + @classmethod + async def valid_cookie(cls, content: str) -> bool: + """验证 cookie 内容是否有效,添加 cookie 时用,可根据平台的具体情况进行重写""" + try: + data = json.loads(content) + if not isinstance(data, dict): + return False + except JSONDecodeError: + return False + return True + + @classmethod + async def get_cookie_friendly_name(cls, cookie: Cookie) -> str: + """获取 cookie 的友好名字,用于展示""" + from . import text_fletten + + return text_fletten(f"{cookie.site_name} [{cookie.content[:10]}]") + + def _generate_hook(self, cookie: Cookie) -> callable: + """hook 函数生成器,用于回写请求状态到数据库""" + + async def _response_hook(resp: httpx.Response): + if resp.status_code == 200: + logger.debug(f"请求成功: {cookie.id} {resp.request.url}") + cookie.status = "success" + else: + logger.error(f"请求失败:{cookie.id} {resp.request.url}, 状态码: {resp.status_code}") + cookie.status = "failed" + cookie.last_usage = datetime.now() + await config.update_cookie(cookie) + + return _response_hook + + async def _choose_cookie(self, target: Target) -> Cookie: + """选择 cookie 的具体算法""" + cookies = await config.get_universal_cookie(self._site_name) + if target: + cookies += await config.get_cookie(self._site_name, target) + cookies = (cookie for cookie in cookies if cookie.last_usage + timedelta(seconds=cookie.cd) < datetime.now()) + cookie = min(cookies, key=lambda x: x.last_usage) + return cookie + + async def _check_cookie(self, cookie: Cookie) -> Cookie: + """检查Cookie,可以做一些自定义的逻辑,比如说Site的统一风控""" + return cookie + + async def get_client(self, target: Target | None) -> AsyncClient: + """获取 client,根据 target 选择 cookie""" + client = http_client() + cookie = await self._choose_cookie(target) + if cookie.is_universal: + logger.debug(f"平台 {self._site_name} 未获取到用户cookie, 使用匿名cookie") + else: + logger.debug(f"平台 {self._site_name} 获取到用户cookie: {cookie.id}") + + return await self._assemble_client(client, cookie) + + async def _assemble_client(self, client, cookie) -> AsyncClient: + """组装 client,可以自定义 cookie 对象的 content 装配到 client 中的方式""" + cookies = httpx.Cookies() + if cookie: + cookies.update(json.loads(cookie.content)) + client.cookies = cookies + client._bison_cookie = cookie + client.event_hooks = {"response": [self._generate_hook(cookie)]} + return client + + async def get_client_for_static(self) -> AsyncClient: + return http_client() + + async def get_query_name_client(self) -> AsyncClient: + return http_client() + + async def refresh_client(self): + pass + + +def create_cookie_client_manager(site_name: str) -> type[CookieClientManager]: """创建一个平台特化的 CookieClientManger""" - return DefaultClientManager - + return type( + "CookieClientManager", + (CookieClientManager,), + {"_site_name": site_name}, + ) +# +# def create_cookie_client_manager(platform_name: str): +# """创建一个平台特化的 CookieClientManger""" +# return DefaultClientManager +# class Site(metaclass=RegistryMeta,base=True):