mirror of
https://github.com/suyiiyii/nonebot-bison.git
synced 2025-05-31 16:16:11 +08:00
♻️ 初步移除CookieSite
This commit is contained in:
parent
07190a7f64
commit
c5dea7e252
@ -18,7 +18,7 @@ from ..utils.get_bot import get_groups
|
||||
from .token_manager import token_manager
|
||||
from ..config.db_config import SubscribeDupException
|
||||
from ..platform import site_manager, platform_manager
|
||||
from ..utils.site import CookieSite, CookieClientManager, is_cookie_client_manager
|
||||
from ..utils.site import CookieClientManager, is_cookie_client_manager
|
||||
from ..config import NoSuchUserException, NoSuchTargetException, NoSuchSubscribeException, config
|
||||
from .types import (
|
||||
Cookie,
|
||||
@ -272,8 +272,8 @@ async def del_cookie_target(platform_name: str, target: str, cookie_id: int) ->
|
||||
|
||||
@router.post("/cookie/validate", dependencies=[Depends(check_is_superuser)])
|
||||
async def get_cookie_valid(site_name: str, content: str) -> StatusResp:
|
||||
site = cast(CookieSite, site_manager[site_name])
|
||||
if await site.validate_cookie(content):
|
||||
client_mgr = cast(CookieClientManager, scheduler_dict[site_manager[site_name]].client_mgr)
|
||||
if await client_mgr.validate_cookie(content):
|
||||
return StatusResp(ok=True, msg="")
|
||||
else:
|
||||
return StatusResp(ok=False, msg="")
|
||||
|
@ -1,8 +1,8 @@
|
||||
import re
|
||||
import json
|
||||
from typing import Any
|
||||
from datetime import datetime
|
||||
from typing import Any, override
|
||||
from urllib.parse import unquote
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from yarl import URL
|
||||
from lxml.etree import HTML
|
||||
@ -36,15 +36,10 @@ _HEADER = {
|
||||
}
|
||||
|
||||
|
||||
class WeiboSite(CookieSite):
|
||||
name = "weibo.com"
|
||||
schedule_type = "interval"
|
||||
schedule_setting = {"seconds": 3}
|
||||
client_mgr = CookieClientManager
|
||||
default_cookie_cd: int = timedelta(seconds=15)
|
||||
class WeiboClientManager(CookieClientManager):
|
||||
_site_name = "weibo.com"
|
||||
|
||||
@classmethod
|
||||
async def _get_current_user_name(cls, cookies: dict) -> str:
|
||||
async def _get_current_user_name(self, cookies: dict) -> str:
|
||||
url = "https://m.weibo.cn/setup/nick/detail"
|
||||
async with http_client() as client:
|
||||
r = await client.get(url, headers=_HEADER, cookies=cookies)
|
||||
@ -52,14 +47,21 @@ class WeiboSite(CookieSite):
|
||||
name = data["data"]["user"]["screen_name"]
|
||||
return name
|
||||
|
||||
@classmethod
|
||||
async def get_cookie_name(cls, content: str) -> str:
|
||||
@override
|
||||
async def get_cookie_name(self, content: str) -> str:
|
||||
"""从cookie内容中获取cookie的友好名字,添加cookie时调用,持久化在数据库中"""
|
||||
name = await cls._get_current_user_name(json.loads(content))
|
||||
name = await self._get_current_user_name(json.loads(content))
|
||||
|
||||
return text_fletten(f"weibo: [{name[:10]}]")
|
||||
|
||||
|
||||
class WeiboSite(CookieSite):
|
||||
name = "weibo.com"
|
||||
schedule_type = "interval"
|
||||
schedule_setting = {"seconds": 3}
|
||||
client_mgr = CookieClientManager
|
||||
|
||||
|
||||
class Weibo(NewMessage):
|
||||
categories = {
|
||||
1: "转发",
|
||||
|
@ -57,13 +57,14 @@ def do_add_cookie(add_cookie: type[Matcher]):
|
||||
@add_cookie.got("cookie", MessageTemplate("{_prompt}"), [handle_cancel])
|
||||
async def got_cookie(state: T_State, cookie: Message = Arg()):
|
||||
cookie_site = cast(type[CookieSite], platform_manager[state["platform"]].site)
|
||||
client_mgr = cast(CookieClientManager, scheduler_dict[cookie_site].client_mgr)
|
||||
cookie_text = cookie.extract_plain_text()
|
||||
if not await cookie_site.validate_cookie(cookie_text):
|
||||
if not await client_mgr.validate_cookie(cookie_text):
|
||||
await add_cookie.reject(
|
||||
"无效的 Cookie,请检查后重新输入,详情见https://nonebot-bison.netlify.app/usage/cookie.html"
|
||||
)
|
||||
try:
|
||||
cookie_name = await cookie_site.get_cookie_name(cookie_text)
|
||||
cookie_name = await client_mgr.get_cookie_name(cookie_text)
|
||||
state["cookie"] = cookie_text
|
||||
state["cookie_name"] = cookie_name
|
||||
except JSONDecodeError as e:
|
||||
|
@ -1,6 +1,6 @@
|
||||
import json
|
||||
from typing import Literal
|
||||
from json import JSONDecodeError
|
||||
from typing import Literal, cast
|
||||
from abc import ABC, abstractmethod
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
@ -43,6 +43,8 @@ class DefaultClientManager(ClientManager):
|
||||
|
||||
|
||||
class CookieClientManager(ClientManager):
|
||||
_default_cookie_cd: int = timedelta(seconds=15)
|
||||
|
||||
def __init__(self, site: "Site") -> None:
|
||||
self._site = site
|
||||
self._site_name = site.name
|
||||
@ -74,17 +76,31 @@ class CookieClientManager(ClientManager):
|
||||
|
||||
async def add_user_cookie(self, content: str, cookie_name: str | None = None) -> Cookie:
|
||||
"""添加用户 cookie"""
|
||||
from ..platform import site_manager
|
||||
|
||||
cookie_site = cast(type[CookieSite], site_manager[self._site_name])
|
||||
if not await cookie_site.validate_cookie(content):
|
||||
if not await self.validate_cookie(content):
|
||||
raise ValueError()
|
||||
cookie = Cookie(site_name=self._site_name, content=content)
|
||||
cookie.cookie_name = cookie_name if cookie_name else await cookie_site.get_cookie_name(content)
|
||||
cookie.cd = cookie_site.default_cookie_cd
|
||||
cookie.cookie_name = cookie_name if cookie_name else await self.get_cookie_name(content)
|
||||
cookie.cd = self._default_cookie_cd
|
||||
cookie_id = await config.add_cookie(cookie)
|
||||
return await config.get_cookie_by_id(cookie_id)
|
||||
|
||||
async def get_cookie_name(self, content: str) -> str:
|
||||
"""从cookie内容中获取cookie的友好名字,添加cookie时调用,持久化在数据库中"""
|
||||
from . import text_fletten
|
||||
|
||||
return text_fletten(f"{self._site_name} [{content[:10]}]")
|
||||
|
||||
async def validate_cookie(self, content: str) -> bool:
|
||||
"""验证 cookie 内容是否有效,添加 cookie 时用,可根据平台的具体情况进行重写"""
|
||||
try:
|
||||
data = json.loads(content)
|
||||
if not isinstance(data, dict):
|
||||
return False
|
||||
except JSONDecodeError:
|
||||
return False
|
||||
return True
|
||||
|
||||
def _generate_hook(self, cookie: Cookie) -> callable:
|
||||
"""hook 函数生成器,用于回写请求状态到数据库"""
|
||||
|
||||
@ -153,27 +169,17 @@ class Site(metaclass=RegistryMeta, base=True):
|
||||
return f"[{self.name}]-{self.name}-{self.schedule_setting}"
|
||||
|
||||
|
||||
def create_cookie_client_manager(site_name: str) -> type[CookieClientManager]:
|
||||
"""创建一个平台特化的 CookieClientManger"""
|
||||
return type(
|
||||
"CookieClientManager",
|
||||
(CookieClientManager,),
|
||||
{"_site_name": site_name},
|
||||
)
|
||||
|
||||
|
||||
class CookieSite(Site):
|
||||
client_mgr: type[CookieClientManager] = CookieClientManager
|
||||
default_cookie_cd: int = timedelta(seconds=10)
|
||||
|
||||
@classmethod
|
||||
async def get_cookie_name(cls, content: str) -> str:
|
||||
"""从cookie内容中获取cookie的友好名字,添加cookie时调用,持久化在数据库中"""
|
||||
from . import text_fletten
|
||||
|
||||
return text_fletten(f"{cls.name} [{content[:10]}]")
|
||||
|
||||
@classmethod
|
||||
async def validate_cookie(cls, content: str) -> bool:
|
||||
"""验证 cookie 内容是否有效,添加 cookie 时用,可根据平台的具体情况进行重写"""
|
||||
try:
|
||||
data = json.loads(content)
|
||||
if not isinstance(data, dict):
|
||||
return False
|
||||
except JSONDecodeError:
|
||||
return False
|
||||
return True
|
||||
pass
|
||||
|
||||
|
||||
def anonymous_site(schedule_type: Literal["date", "interval", "cron"], schedule_setting: dict) -> type[Site]:
|
||||
|
@ -153,6 +153,6 @@ async def _clear_db(app: App):
|
||||
def _patch_weibo_get_cookie_name(app: App, mocker: MockerFixture):
|
||||
from nonebot_bison.platform import weibo
|
||||
|
||||
mocker.patch.object(weibo.WeiboSite, "_get_current_user_name", return_value="test_name")
|
||||
mocker.patch.object(weibo.WeiboClientManager, "_get_current_user_name", return_value="test_name")
|
||||
yield
|
||||
mocker.stopall()
|
||||
|
@ -221,7 +221,10 @@ async def test_parse_target(weibo: "Weibo"):
|
||||
|
||||
@respx.mock
|
||||
async def test_get_cookie_name(weibo: "Weibo"):
|
||||
from nonebot_bison.platform.weibo import WeiboClientManager
|
||||
|
||||
router = respx.get("https://m.weibo.cn/setup/nick/detail")
|
||||
router.mock(return_value=Response(200, json=get_json("weibo_get-cookie-name.json")))
|
||||
name = await weibo.site.get_cookie_name("{}")
|
||||
weibo_client_mgr = WeiboClientManager()
|
||||
name = await weibo_client_mgr.get_cookie_name("{}")
|
||||
assert name == "weibo: [suyiiyii]"
|
||||
|
Loading…
x
Reference in New Issue
Block a user