136 lines
4.9 KiB
Python

from collections.abc import Callable
from datetime import datetime, timedelta
import json
import random
from typing import TYPE_CHECKING, ClassVar, TypeVar
from typing_extensions import override
from httpx import AsyncClient, Response
from nonebot import logger, require
from playwright.async_api import Cookie
from nonebot_bison.config import config
from nonebot_bison.config.db_model import Cookie as CookieModel
from nonebot_bison.config.db_model import Target
from nonebot_bison.plugin_config import plugin_config
from nonebot_bison.utils import Site, http_client
from nonebot_bison.utils.site import CookieClientManager
if TYPE_CHECKING:
from .platforms import Bilibili
if plugin_config.bison_use_browser:
require("nonebot_plugin_htmlrender")
from nonebot_plugin_htmlrender import get_browser
B = TypeVar("B", bound="Bilibili")
class BilibiliClientManager(CookieClientManager):
_default_cookie_cd = timedelta(seconds=120)
_current_user_cookie: CookieModel | None = None
_site_name = "bilibili.com"
async def _get_cookies(self) -> list[Cookie]:
browser = await get_browser()
async with await browser.new_page() as page:
await page.goto(f"https://space.bilibili.com/{random.randint(1, 1000)}/dynamic")
await page.wait_for_load_state("load") # 等待基本加载完成
await page.wait_for_function('document.cookie.includes("bili_ticket")') # 期望保证 GenWebTicket 请求完成
await page.wait_for_load_state("networkidle") # 期望保证 ExClimbWuzhi 请求完成
cookies = await page.context.cookies()
return cookies
def _gen_json_cookie(self, cookies: list[Cookie]):
cookie_dict = {}
for cookie in cookies:
cookie_dict[cookie.get("name", "")] = cookie.get("value", "")
return cookie_dict
@override
async def _generate_anonymous_cookie(self) -> CookieModel:
cookies = await self._get_cookies()
cookie = CookieModel(
cookie_name=f"{self._site_name} anonymous",
site_name=self._site_name,
content=json.dumps(self._gen_json_cookie(cookies)),
is_universal=True,
is_anonymous=True,
last_usage=datetime.now(),
cd_milliseconds=0,
tags="{}",
status="",
)
return cookie
def _generate_hook(self, cookie: CookieModel) -> Callable:
"""hook 函数生成器,用于回写请求状态到数据库"""
async def _response_hook(resp: Response):
await resp.aread()
if resp.status_code == 200 and "-352" not in resp.text:
logger.trace(f"请求成功: {cookie.id} {resp.request.url}")
cookie.status = "success"
else:
logger.warning(f"请求失败: {cookie.id} {resp.request.url}, 状态码: {resp.status_code}")
cookie.status = "failed"
self._current_user_cookie = None
cookie.last_usage = datetime.now()
await config.update_cookie(cookie)
return _response_hook
async def _get_next_user_cookie(self) -> CookieModel | None:
"""选择下一个用户 cookie"""
cookies = await config.get_cookie(self._site_name, is_anonymous=False)
available_cookies = [cookie for cookie in cookies if cookie.last_usage + cookie.cd < datetime.now()]
if not available_cookies:
return None
cookie = min(available_cookies, key=lambda x: x.last_usage)
return cookie
async def _choose_cookie(self, target: Target | None) -> CookieModel:
"""选择 cookie 的具体算法"""
if self._current_user_cookie is None:
# 若当前没有选定用户 cookie 则尝试获取
self._current_user_cookie = await self._get_next_user_cookie()
if self._current_user_cookie:
# 如果当前有选定的用户 cookie 则直接返回
return self._current_user_cookie
# 否则返回匿名 cookie
return (await config.get_cookie(self._site_name, is_anonymous=True))[0]
@override
async def refresh_client(self):
await self._refresh_anonymous_cookie()
logger.debug("刷新B站客户端的cookie")
@override
async def get_client_for_static(self) -> AsyncClient:
return http_client()
@override
async def get_query_name_client(self) -> AsyncClient:
return http_client()
class BilibiliSite(Site):
name = "bilibili.com"
schedule_setting: ClassVar[dict] = {"seconds": 60}
schedule_type = "interval"
client_mgr = BilibiliClientManager
require_browser = True
class BililiveSite(Site):
name = "live.bilibili.com"
schedule_setting: ClassVar[dict] = {"seconds": 5}
schedule_type = "interval"
class BiliBangumiSite(Site):
name = "bilibili.com/bangumi"
schedule_setting: ClassVar[dict] = {"seconds": 30}
schedule_type = "interval"