mirror of
https://github.com/suyiiyii/nonebot-bison.git
synced 2025-06-04 02:26:11 +08:00
✨ 将get_cookie_friendly_name和valid_cookie移动到ccm内部
This commit is contained in:
parent
b61bde6e3f
commit
afd1bee762
@ -1,6 +1,4 @@
|
|||||||
from .types import Target
|
from .types import Target
|
||||||
from .utils import text_fletten
|
|
||||||
from .config.db_model import Cookie
|
|
||||||
from .scheduler import scheduler_dict
|
from .scheduler import scheduler_dict
|
||||||
from .platform import platform_manager
|
from .platform import platform_manager
|
||||||
|
|
||||||
@ -12,13 +10,3 @@ async def check_sub_target(platform_name: str, target: Target):
|
|||||||
client = await scheduler.client_mgr.get_query_name_client()
|
client = await scheduler.client_mgr.get_query_name_client()
|
||||||
|
|
||||||
return await platform_manager[platform_name].get_target_name(client, target)
|
return await platform_manager[platform_name].get_target_name(client, target)
|
||||||
|
|
||||||
|
|
||||||
async def check_sub_target_cookie(platform_name: str, target: Target, cookie: str):
|
|
||||||
# TODO
|
|
||||||
return "check pass"
|
|
||||||
|
|
||||||
|
|
||||||
async def get_cookie_friendly_name(cookie: Cookie):
|
|
||||||
# TODO
|
|
||||||
return text_fletten(f"{cookie.platform_name} [{cookie.content[:10]}]")
|
|
||||||
|
@ -1,15 +1,15 @@
|
|||||||
|
from typing import cast
|
||||||
|
|
||||||
from nonebot.typing import T_State
|
from nonebot.typing import T_State
|
||||||
from nonebot.matcher import Matcher
|
from nonebot.matcher import Matcher
|
||||||
from nonebot.params import Arg, ArgPlainText
|
from nonebot.params import Arg, ArgPlainText
|
||||||
from nonebot.adapters import Message, MessageTemplate
|
from nonebot.adapters import Message, MessageTemplate
|
||||||
|
|
||||||
from ..types import Target
|
|
||||||
from ..config import config
|
from ..config import config
|
||||||
from ..config.db_model import Cookie
|
from ..config.db_model import Cookie
|
||||||
from ..platform import platform_manager
|
from ..platform import platform_manager
|
||||||
from ..apis import check_sub_target_cookie
|
|
||||||
from ..utils.site import is_cookie_client_manager
|
|
||||||
from .utils import common_platform, gen_handle_cancel
|
from .utils import common_platform, gen_handle_cancel
|
||||||
|
from ..utils.site import CookieClientManager, is_cookie_client_manager
|
||||||
|
|
||||||
|
|
||||||
def do_add_cookie(add_cookie: type[Matcher]):
|
def do_add_cookie(add_cookie: type[Matcher]):
|
||||||
@ -48,24 +48,21 @@ def do_add_cookie(add_cookie: type[Matcher]):
|
|||||||
await add_cookie.reject("平台输入错误")
|
await add_cookie.reject("平台输入错误")
|
||||||
|
|
||||||
@add_cookie.handle()
|
@add_cookie.handle()
|
||||||
async def prepare_get_id(matcher: Matcher, state: T_State):
|
async def prepare_get_id(state: T_State):
|
||||||
cur_platform = platform_manager[state["platform"]]
|
state["_prompt"] = "请输入 Cookie"
|
||||||
if cur_platform.has_target:
|
|
||||||
state["_prompt"] = "请输入 Cookie"
|
|
||||||
else:
|
|
||||||
matcher.set_arg("cookie", None) # type: ignore
|
|
||||||
state["id"] = "default"
|
|
||||||
|
|
||||||
@add_cookie.got("cookie", MessageTemplate("{_prompt}"), [handle_cancel])
|
@add_cookie.got("cookie", MessageTemplate("{_prompt}"), [handle_cancel])
|
||||||
async def got_cookie(state: T_State, cookie: Message = Arg()):
|
async def got_cookie(state: T_State, cookie: Message = Arg()):
|
||||||
|
client_mgr: CookieClientManager = platform_manager[state["platform"]].site.client_mgr
|
||||||
cookie_text = cookie.extract_plain_text()
|
cookie_text = cookie.extract_plain_text()
|
||||||
state["cookie"] = cookie_text
|
state["cookie"] = cookie_text
|
||||||
state["name"] = await check_sub_target_cookie(state["platform"], Target(""), cookie_text)
|
state["name"] = await client_mgr.valid_cookie(cookie_text)
|
||||||
|
|
||||||
@add_cookie.handle()
|
@add_cookie.handle()
|
||||||
async def add_cookie_process(state: T_State):
|
async def add_cookie_process(state: T_State):
|
||||||
cookie = Cookie(platform_name=state["platform"], content=state["cookie"])
|
cookie = Cookie(platform_name=state["platform"], content=state["cookie"])
|
||||||
cookie = platform_manager[state["platform"]].site.init_cookie(cookie)
|
client_mgr = cast(CookieClientManager, platform_manager[state["platform"]].site.client_mgr)
|
||||||
|
cookie = await client_mgr.init_cookie(cookie)
|
||||||
await config.add_cookie(cookie)
|
await config.add_cookie(cookie)
|
||||||
await add_cookie.finish(
|
await add_cookie.finish(
|
||||||
f"已添加 Cookie: {state['cookie']} 到平台 {state['platform']}" + "\n请使用“关联cookie”为 Cookie 关联订阅"
|
f"已添加 Cookie: {state['cookie']} 到平台 {state['platform']}" + "\n请使用“关联cookie”为 Cookie 关联订阅"
|
||||||
|
@ -1,3 +1,5 @@
|
|||||||
|
from typing import cast
|
||||||
|
|
||||||
from nonebot.typing import T_State
|
from nonebot.typing import T_State
|
||||||
from nonebot.matcher import Matcher
|
from nonebot.matcher import Matcher
|
||||||
from nonebot.params import ArgPlainText
|
from nonebot.params import ArgPlainText
|
||||||
@ -6,7 +8,8 @@ from nonebot.internal.adapter import MessageTemplate
|
|||||||
|
|
||||||
from ..config import config
|
from ..config import config
|
||||||
from ..utils import parse_text
|
from ..utils import parse_text
|
||||||
from ..apis import get_cookie_friendly_name
|
from ..platform import platform_manager
|
||||||
|
from ..utils.site import CookieClientManager
|
||||||
from .utils import gen_handle_cancel, generate_sub_list_text
|
from .utils import gen_handle_cancel, generate_sub_list_text
|
||||||
|
|
||||||
|
|
||||||
@ -44,8 +47,10 @@ def do_add_cookie_target(add_cookie_target_matcher: type[Matcher]):
|
|||||||
"当前平台暂无可关联的 Cookie,请使用“添加cookie”命令添加或检查已关联的 Cookie"
|
"当前平台暂无可关联的 Cookie,请使用“添加cookie”命令添加或检查已关联的 Cookie"
|
||||||
)
|
)
|
||||||
state["cookies"] = cookies
|
state["cookies"] = cookies
|
||||||
|
|
||||||
|
client_mgr = cast(CookieClientManager, platform_manager[cookies[0].platform_name].site.client_mgr)
|
||||||
state["_prompt"] = "请选择一个 Cookie,已关联的 Cookie 不会显示\n" + "\n".join(
|
state["_prompt"] = "请选择一个 Cookie,已关联的 Cookie 不会显示\n" + "\n".join(
|
||||||
[f"{idx}. {await get_cookie_friendly_name(cookie)}" for idx, cookie in enumerate(cookies, 1)]
|
[f"{idx}. {await client_mgr.get_cookie_friendly_name(cookie)}" for idx, cookie in enumerate(cookies, 1)]
|
||||||
)
|
)
|
||||||
|
|
||||||
@add_cookie_target_matcher.got("cookie_idx", MessageTemplate("{_prompt}"), [handle_cancel])
|
@add_cookie_target_matcher.got("cookie_idx", MessageTemplate("{_prompt}"), [handle_cancel])
|
||||||
@ -59,7 +64,9 @@ def do_add_cookie_target(add_cookie_target_matcher: type[Matcher]):
|
|||||||
@add_cookie_target_matcher.handle()
|
@add_cookie_target_matcher.handle()
|
||||||
async def add_cookie_target_process(state: T_State):
|
async def add_cookie_target_process(state: T_State):
|
||||||
await config.add_cookie_target(state["target"]["target"], state["target"]["platform_name"], state["cookie"].id)
|
await config.add_cookie_target(state["target"]["target"], state["target"]["platform_name"], state["cookie"].id)
|
||||||
|
cookie = state["cookie"]
|
||||||
|
client_mgr = cast(CookieClientManager, platform_manager[cookie.platform_name].site.client_mgr)
|
||||||
await add_cookie_target_matcher.finish(
|
await add_cookie_target_matcher.finish(
|
||||||
f"已关联 Cookie: {await get_cookie_friendly_name(state['cookie'])} "
|
f"已关联 Cookie: {await client_mgr.get_cookie_friendly_name(cookie)} "
|
||||||
f"到订阅 {state['target']['platform_name']} {state['target']['target']}"
|
f"到订阅 {state['target']['platform_name']} {state['target']['target']}"
|
||||||
)
|
)
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
import contextlib
|
import contextlib
|
||||||
from typing import Annotated
|
|
||||||
from itertools import groupby
|
from itertools import groupby
|
||||||
from operator import attrgetter
|
from operator import attrgetter
|
||||||
|
from typing import Annotated, cast
|
||||||
|
|
||||||
from nonebot.rule import Rule
|
from nonebot.rule import Rule
|
||||||
from nonebot.adapters import Event
|
from nonebot.adapters import Event
|
||||||
@ -15,8 +15,7 @@ from ..config import config
|
|||||||
from ..types import Category
|
from ..types import Category
|
||||||
from ..platform import platform_manager
|
from ..platform import platform_manager
|
||||||
from ..plugin_config import plugin_config
|
from ..plugin_config import plugin_config
|
||||||
from ..apis import get_cookie_friendly_name
|
from ..utils.site import CookieClientManager, is_cookie_client_manager
|
||||||
from ..utils.site import is_cookie_client_manager
|
|
||||||
|
|
||||||
|
|
||||||
def _configurable_to_me(to_me: bool = EventToMe()):
|
def _configurable_to_me(to_me: bool = EventToMe()):
|
||||||
@ -114,7 +113,8 @@ async def generate_sub_list_text(
|
|||||||
if target_cookies:
|
if target_cookies:
|
||||||
res += " 关联的 Cookie:\n"
|
res += " 关联的 Cookie:\n"
|
||||||
for cookie in target_cookies:
|
for cookie in target_cookies:
|
||||||
res += f" \t{await get_cookie_friendly_name(cookie)}\n"
|
client_mgr = cast(CookieClientManager, platform_manager[cookie.platform_name].site.client_mgr)
|
||||||
|
res += f" \t{await client_mgr.get_cookie_friendly_name(cookie)}\n"
|
||||||
|
|
||||||
else:
|
else:
|
||||||
res += f" (平台 {sub.target.platform_name} 已失效,请删除此订阅)"
|
res += f" (平台 {sub.target.platform_name} 已失效,请删除此订阅)"
|
||||||
|
@ -50,6 +50,39 @@ class CookieClientManager(ClientManager):
|
|||||||
_platform_name: str
|
_platform_name: str
|
||||||
_cookie_cd: int = 10
|
_cookie_cd: int = 10
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def init_universal_cookie(cls):
|
||||||
|
"""移除已有的匿名cookie,添加一个新的匿名cookie"""
|
||||||
|
universal_cookies = await config.get_unviersal_cookie(cls._platform_name)
|
||||||
|
for cookie in universal_cookies:
|
||||||
|
await config.delete_cookie(cookie.id)
|
||||||
|
universal_cookie = Cookie(platform_name=cls._platform_name, content="{}", is_universal=True)
|
||||||
|
await config.add_cookie(universal_cookie)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def init_cookie(cls, cookie: Cookie) -> Cookie:
|
||||||
|
"""初始化cookie,添加用户cookie时使用"""
|
||||||
|
cookie.cd = cls._cookie_cd
|
||||||
|
return cookie
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def valid_cookie(cls, content: str) -> bool:
|
||||||
|
"""验证cookie是否有效,添加cookie时考用,可根据平台的具体情况进行重写"""
|
||||||
|
try:
|
||||||
|
data = json.loads(content)
|
||||||
|
if not isinstance(data, dict):
|
||||||
|
raise ValueError
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def get_cookie_friendly_name(cls, cookie: Cookie) -> str:
|
||||||
|
"""获取友好的cookie名字,用于展示"""
|
||||||
|
from . import text_fletten
|
||||||
|
|
||||||
|
return text_fletten(f"{cookie.platform_name} [{cookie.content[:10]}]")
|
||||||
|
|
||||||
def _generate_hook(self, cookie: Cookie):
|
def _generate_hook(self, cookie: Cookie):
|
||||||
"""hook函数生成器,用于回写请求状态到数据库"""
|
"""hook函数生成器,用于回写请求状态到数据库"""
|
||||||
|
|
||||||
@ -65,20 +98,6 @@ class CookieClientManager(ClientManager):
|
|||||||
|
|
||||||
return _response_hook
|
return _response_hook
|
||||||
|
|
||||||
@classmethod
|
|
||||||
async def init_universal_cookie(cls):
|
|
||||||
"""移除已有的匿名cookie,添加一个新的匿名cookie"""
|
|
||||||
universal_cookies = await config.get_unviersal_cookie(cls._platform_name)
|
|
||||||
for cookie in universal_cookies:
|
|
||||||
await config.delete_cookie(cookie.id)
|
|
||||||
universal_cookie = Cookie(platform_name=cls._platform_name, content="{}", is_universal=True)
|
|
||||||
await config.add_cookie(universal_cookie)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
async def init_cookie(cls, cookie: Cookie):
|
|
||||||
"""初始化cookie,添加用户cookie时使用"""
|
|
||||||
cookie.cd = cls._cookie_cd
|
|
||||||
|
|
||||||
async def _choose_cookie(self, target: Target) -> Cookie:
|
async def _choose_cookie(self, target: Target) -> Cookie:
|
||||||
"""选择 cookie 的具体算法"""
|
"""选择 cookie 的具体算法"""
|
||||||
if not target:
|
if not target:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user