在 cookie 中添加 cookie_name 字段

This commit is contained in:
2024-09-23 10:27:27 +08:00
parent 59d42531a3
commit 8742de6cd1
11 changed files with 44 additions and 46 deletions
+20 -9
View File
@@ -1,6 +1,6 @@
import json
from typing import Literal
from json import JSONDecodeError
from typing import Literal, cast
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
@@ -62,13 +62,20 @@ class CookieClientManager(ClientManager):
@classmethod
async def add_user_cookie(cls, content: str):
"""添加用户 cookie"""
from ..platform import site_manager
cookie_site = cast(type[CookieSite], site_manager[cls._site_name])
if not await cls.validate_cookie(content):
raise ValueError()
cookie = Cookie(site_name=cls._site_name, content=content)
cookie.cookie_name = cookie_site.get_cookie_name(content)
cookie.cd = cls._default_cd
await config.add_cookie(cookie)
@classmethod
async def validate_cookie(cls, content: str) -> bool:
"""验证 cookie 内容是否有效,添加 cookie 时用,可根据平台的具体情况进行重写"""
# todo: 考虑移动到 cookie site 中
try:
data = json.loads(content)
if not isinstance(data, dict):
@@ -77,13 +84,6 @@ class CookieClientManager(ClientManager):
return False
return True
@classmethod
async def get_cookie_friendly_name(cls, cookie: Cookie) -> str:
"""获取 cookie 的友好名字,用于展示"""
from . import text_fletten
return text_fletten(f"{cookie.site_name} [{cookie.content[:10]}]")
def _generate_hook(self, cookie: Cookie) -> callable:
"""hook 函数生成器,用于回写请求状态到数据库"""
@@ -156,12 +156,23 @@ class Site(metaclass=RegistryMeta, base=True):
client_mgr: type[ClientManager] = DefaultClientManager
require_browser: bool = False
registry: list[type["Site"]]
cookie_format_prompt = "无效的 Cookie,请检查后重新输入,详情见<待添加的文档>"
def __str__(self):
return f"[{self.name}]-{self.name}-{self.schedule_setting}"
class CookieSite(Site):
client_mgr: type[CookieClientManager] = CookieClientManager
cookie_format_prompt = "无效的 Cookie,请检查后重新输入,详情见<待添加的文档>"
@classmethod
def get_cookie_name(cls, content: str) -> str:
"""从cookie内容中获取cookie的友好名字,添加cookie时调用,持久化在数据库中"""
from . import text_fletten
return text_fletten(f"{cls.name} [{content[:10]}]")
def anonymous_site(schedule_type: Literal["date", "interval", "cron"], schedule_setting: dict) -> type[Site]:
return type(
"AnonymousSite",