Merge 43fb5231b8c727b83da51f2350f85fc7c4e1e7d9 into cf6b7fcd6dbc453af2cf5f2e98247afebd7b00d3

This commit is contained in:
suyiiyii 2024-09-21 12:34:51 +08:00 committed by GitHub
commit aecb6b2baf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 1458 additions and 43 deletions

View File

@ -1,3 +1,5 @@
from typing import cast
import nonebot
from fastapi import status
from fastapi.routing import APIRouter
@ -12,14 +14,18 @@ from ..apis import check_sub_target
from .jwt import load_jwt, pack_jwt
from ..types import Target as T_Target
from ..utils.get_bot import get_groups
from ..platform import platform_manager
from .token_manager import token_manager
from ..config.db_config import SubscribeDupException
from ..platform import site_manager, platform_manager
from ..utils.site import CookieClientManager, is_cookie_client_manager
from ..config import NoSuchUserException, NoSuchTargetException, NoSuchSubscribeException, config
from .types import (
Cookie,
TokenResp,
GlobalConf,
SiteConfig,
StatusResp,
CookieTarget,
SubscribeResp,
PlatformConfig,
AddSubscribeReq,
@ -54,16 +60,20 @@ async def check_is_superuser(token_obj: dict = Depends(get_jwt_obj)):
@router.get("/global_conf")
async def get_global_conf() -> GlobalConf:
res = {}
platform_res = {}
for platform_name, platform in platform_manager.items():
res[platform_name] = PlatformConfig(
platform_res[platform_name] = PlatformConfig(
platformName=platform_name,
categories=platform.categories,
enabledTag=platform.enable_tag,
site_name=platform.site.name,
name=platform.name,
hasTarget=getattr(platform, "has_target"),
)
return GlobalConf(platformConf=res)
site_res = {}
for site_name, site in site_manager.items():
site_res[site_name] = SiteConfig(name=site_name, enable_cookie=is_cookie_client_manager(site.client_mgr))
return GlobalConf(platformConf=platform_res, siteConf=site_res)
async def get_admin_groups(qq: int):
@ -197,3 +207,64 @@ async def update_weigth_config(platformName: str, target: str, weight_config: We
except NoSuchTargetException:
raise HTTPException(status.HTTP_400_BAD_REQUEST, "no such subscribe")
return StatusResp(ok=True, msg="")
@router.get("/cookie", dependencies=[Depends(check_is_superuser)])
async def get_cookie(site_name: str = None, target: str = None) -> list[Cookie]:
cookies_in_db = await config.get_cookie(site_name, is_anonymous=False)
client_mgr = cast(CookieClientManager, site_manager[site_name].client_mgr)
friendly_names = [await client_mgr.get_cookie_friendly_name(x) for x in cookies_in_db]
return [
Cookie(
id=cookies_in_db[i].id,
friendly_name=friendly_names[i],
site_name=cookies_in_db[i].site_name,
last_usage=cookies_in_db[i].last_usage,
status=cookies_in_db[i].status,
cd_milliseconds=cookies_in_db[i].cd_milliseconds,
is_universal=cookies_in_db[i].is_universal,
is_anonymous=cookies_in_db[i].is_anonymous,
tags=cookies_in_db[i].tags,
)
for i in range(len(cookies_in_db))
]
@router.post("/cookie", dependencies=[Depends(check_is_superuser)])
async def add_cookie(site_name: str, content: str) -> StatusResp:
client_mgr = cast(CookieClientManager, site_manager[site_name].client_mgr)
await client_mgr.add_user_cookie(content)
return StatusResp(ok=True, msg="")
@router.delete("/cookie/{cookie_id}", dependencies=[Depends(check_is_superuser)])
async def delete_cookie_by_id(cookie_id: int) -> StatusResp:
await config.delete_cookie_by_id(cookie_id)
return StatusResp(ok=True, msg="")
@router.get("/cookie_target", dependencies=[Depends(check_is_superuser)])
async def get_cookie_target(
site_name: str | None = None, target: str | None = None, cookie_id: int | None = None
) -> list[CookieTarget]:
cookie_targets = await config.get_cookie_target()
# TODO: filter in SQL
return [
x
for x in cookie_targets
if (site_name is None or x.cookie.site_name == site_name)
and (target is None or x.target.target == target)
and (cookie_id is None or x.cookie.id == cookie_id)
]
@router.post("/cookie_target", dependencies=[Depends(check_is_superuser)])
async def add_cookie_target(platform_name: str, target: str, cookie_id: int) -> StatusResp:
await config.add_cookie_target(target, platform_name, cookie_id)
return StatusResp(ok=True, msg="")
@router.delete("/cookie_target", dependencies=[Depends(check_is_superuser)])
async def del_cookie_target(platform_name: str, target: str, cookie_id: int) -> StatusResp:
await config.delete_cookie_target(target, platform_name, cookie_id)
return StatusResp(ok=True, msg="")

View File

@ -6,14 +6,22 @@ class PlatformConfig(BaseModel):
categories: dict[int, str]
enabledTag: bool
platformName: str
site_name: str
hasTarget: bool
class SiteConfig(BaseModel):
name: str
enable_cookie: bool
AllPlatformConf = dict[str, PlatformConfig]
AllSiteConf = dict[str, SiteConfig]
class GlobalConf(BaseModel):
platformConf: AllPlatformConf
siteConf: AllSiteConf
class TokenResp(BaseModel):
@ -50,3 +58,32 @@ class AddSubscribeReq(BaseModel):
class StatusResp(BaseModel):
ok: bool
msg: str
from typing import Any
from datetime import datetime
from pydantic import BaseModel
class Target(BaseModel):
platform_name: str
target_name: str
target: str
class Cookie(BaseModel):
id: int
site_name: str
friendly_name: str
last_usage: datetime
status: str
cd_milliseconds: int
is_universal: bool
is_anonymous: bool
tags: dict[str, Any]
class CookieTarget(BaseModel):
target: Target
cookie_id: int

View File

@ -12,8 +12,8 @@ from nonebot_plugin_datastore import create_session
from ..types import Tag
from ..types import Target as T_Target
from .utils import NoSuchTargetException
from .db_model import User, Target, Subscribe, ScheduleTimeWeight
from .utils import NoSuchTargetException, DuplicateCookieTargetException
from .db_model import User, Cookie, Target, Subscribe, CookieTarget, ScheduleTimeWeight
from ..types import Category, UserSubInfo, WeightConfig, TimeWeightConfig, PlatformWeightConfigResp
@ -259,5 +259,108 @@ class DBConfig:
)
return res
async def get_cookie(
self,
site_name: str | None = None,
target: T_Target | None = None,
is_universal: bool | None = None,
is_anonymous: bool | None = None,
) -> Sequence[Cookie]:
"""获取满足传入条件的所有 cookie"""
async with create_session() as sess:
query = select(Cookie).distinct()
if is_universal is not None:
query = query.where(Cookie.is_universal == is_universal)
if is_anonymous is not None:
query = query.where(Cookie.is_anonymous == is_anonymous)
if site_name:
query = query.where(Cookie.site_name == site_name)
query = query.outerjoin(CookieTarget).options(selectinload(Cookie.targets))
res = (await sess.scalars(query)).all()
if target:
# 如果指定了 target过滤掉不满足要求的cookie
query = select(CookieTarget.cookie_id).join(Target).where(Target.target == target)
ids = set((await sess.scalars(query)).all())
# 如果指定了 target 且未指定 is_universal则添加返回 universal cookie
res = [cookie for cookie in res if cookie.id in ids or cookie.is_universal]
return res
async def add_cookie(self, cookie: Cookie) -> int:
async with create_session() as sess:
sess.add(cookie)
await sess.commit()
await sess.refresh(cookie)
return cookie.id
async def update_cookie(self, cookie: Cookie):
async with create_session() as sess:
cookie_in_db: Cookie | None = await sess.scalar(select(Cookie).where(Cookie.id == cookie.id))
if not cookie_in_db:
raise ValueError(f"cookie {cookie.id} not found")
cookie_in_db.content = cookie.content
cookie_in_db.last_usage = cookie.last_usage
cookie_in_db.status = cookie.status
cookie_in_db.tags = cookie.tags
await sess.commit()
async def delete_cookie_by_id(self, cookie_id: int):
async with create_session() as sess:
cookie = await sess.scalar(
select(Cookie)
.where(Cookie.id == cookie_id)
.outerjoin(CookieTarget)
.options(selectinload(Cookie.targets))
)
if len(cookie.targets) > 0:
raise Exception(f"cookie {cookie.id} in use")
await sess.execute(delete(Cookie).where(Cookie.id == cookie_id))
await sess.commit()
async def add_cookie_target(self, target: T_Target, platform_name: str, cookie_id: int):
"""通过 cookie_id 可以唯一确定一个 Cookie通过 target 和 platform_name 可以唯一确定一个 Target"""
async with create_session() as sess:
target_obj = await sess.scalar(
select(Target).where(Target.platform_name == platform_name, Target.target == target)
)
# check if relation exists
cookie_target = await sess.scalar(
select(CookieTarget).where(CookieTarget.target == target_obj, CookieTarget.cookie_id == cookie_id)
)
if cookie_target:
raise DuplicateCookieTargetException()
cookie_obj = await sess.scalar(select(Cookie).where(Cookie.id == cookie_id))
cookie_target = CookieTarget(target=target_obj, cookie=cookie_obj)
sess.add(cookie_target)
await sess.commit()
async def delete_cookie_target(self, target: T_Target, platform_name: str, cookie_id: int):
async with create_session() as sess:
target_obj = await sess.scalar(
select(Target).where(Target.platform_name == platform_name, Target.target == target)
)
cookie_obj = await sess.scalar(select(Cookie).where(Cookie.id == cookie_id))
await sess.execute(
delete(CookieTarget).where(CookieTarget.target == target_obj, CookieTarget.cookie == cookie_obj)
)
await sess.commit()
async def delete_cookie_target_by_id(self, cookie_target_id: int):
async with create_session() as sess:
await sess.execute(delete(CookieTarget).where(CookieTarget.id == cookie_target_id))
await sess.commit()
async def get_cookie_target(self) -> list[CookieTarget]:
async with create_session() as sess:
query = (
select(CookieTarget)
.outerjoin(Target)
.options(selectinload(CookieTarget.target))
.outerjoin(Cookie)
.options(selectinload(CookieTarget.cookie))
)
res = list((await sess.scalars(query)).all())
res.sort(key=lambda x: (x.target.platform_name, x.cookie_id, x.target_id))
return res
config = DBConfig()

View File

@ -1,4 +1,5 @@
import datetime
from typing import Any
from pathlib import Path
from nonebot_plugin_saa import PlatformTarget
@ -6,7 +7,7 @@ from sqlalchemy.dialects.postgresql import JSONB
from nonebot.compat import PYDANTIC_V2, ConfigDict
from nonebot_plugin_datastore import get_plugin_data
from sqlalchemy.orm import Mapped, relationship, mapped_column
from sqlalchemy import JSON, String, ForeignKey, UniqueConstraint
from sqlalchemy import JSON, String, DateTime, ForeignKey, UniqueConstraint
from ..types import Tag, Category
@ -36,6 +37,7 @@ class Target(Model):
subscribes: Mapped[list["Subscribe"]] = relationship(back_populates="target")
time_weight: Mapped[list["ScheduleTimeWeight"]] = relationship(back_populates="target")
cookies: Mapped[list["CookieTarget"]] = relationship(back_populates="target")
class ScheduleTimeWeight(Model):
@ -66,3 +68,40 @@ class Subscribe(Model):
target: Mapped[Target] = relationship(back_populates="subscribes")
user: Mapped[User] = relationship(back_populates="subscribes")
class Cookie(Model):
id: Mapped[int] = mapped_column(primary_key=True)
site_name: Mapped[str] = mapped_column(String(100))
content: Mapped[str] = mapped_column(String(1024))
# 最后使用的时刻
last_usage: Mapped[datetime.datetime] = mapped_column(DateTime, default=datetime.datetime(1970, 1, 1))
# Cookie 当前的状态
status: Mapped[str] = mapped_column(String(20), default="")
# 使用一次之后,需要的冷却时间
cd_milliseconds: Mapped[int] = mapped_column(default=0)
# 是否是通用 Cookie对所有Target都有效
is_universal: Mapped[bool] = mapped_column(default=False)
# 是否是匿名 Cookie
is_anonymous: Mapped[bool] = mapped_column(default=False)
# 标签,扩展用
tags: Mapped[dict[str, Any]] = mapped_column(JSON().with_variant(JSONB, "postgresql"), default={})
targets: Mapped[list["CookieTarget"]] = relationship(back_populates="cookie")
@property
def cd(self) -> datetime.timedelta:
return datetime.timedelta(milliseconds=self.cd_milliseconds)
@cd.setter
def cd(self, value: datetime.timedelta):
self.cd_milliseconds = int(value.total_seconds() * 1000)
class CookieTarget(Model):
id: Mapped[int] = mapped_column(primary_key=True)
target_id: Mapped[int] = mapped_column(ForeignKey("nonebot_bison_target.id", ondelete="CASCADE"))
cookie_id: Mapped[int] = mapped_column(ForeignKey("nonebot_bison_cookie.id", ondelete="CASCADE"))
target: Mapped[Target] = relationship(back_populates="cookies")
cookie: Mapped[Cookie] = relationship(back_populates="targets")

View File

@ -0,0 +1,62 @@
"""empty message
Revision ID: ef796b74b0fe
Revises: f9baef347cc8
Create Date: 2024-09-13 00:34:08.601438
"""
import sqlalchemy as sa
from alembic import op
from sqlalchemy import Text
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = "ef796b74b0fe"
down_revision = "f9baef347cc8"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"nonebot_bison_cookie",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("site_name", sa.String(length=100), nullable=False),
sa.Column("content", sa.String(length=1024), nullable=False),
sa.Column("last_usage", sa.DateTime(), nullable=False),
sa.Column("status", sa.String(length=20), nullable=False),
sa.Column("cd_milliseconds", sa.Integer(), nullable=False),
sa.Column("is_universal", sa.Boolean(), nullable=False),
sa.Column("is_anonymous", sa.Boolean(), nullable=False),
sa.Column("tags", sa.JSON().with_variant(postgresql.JSONB(astext_type=Text()), "postgresql"), nullable=False),
sa.PrimaryKeyConstraint("id", name=op.f("pk_nonebot_bison_cookie")),
)
op.create_table(
"nonebot_bison_cookietarget",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("target_id", sa.Integer(), nullable=False),
sa.Column("cookie_id", sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(
["cookie_id"],
["nonebot_bison_cookie.id"],
name=op.f("fk_nonebot_bison_cookietarget_cookie_id_nonebot_bison_cookie"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["target_id"],
["nonebot_bison_target.id"],
name=op.f("fk_nonebot_bison_cookietarget_target_id_nonebot_bison_target"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_nonebot_bison_cookietarget")),
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("nonebot_bison_cookietarget")
op.drop_table("nonebot_bison_cookie")
# ### end Alembic commands ###

View File

@ -0,0 +1,106 @@
"""nbesf is Nonebot Bison Enchangable Subscribes File! ver.2"""
from typing import Any
from functools import partial
from nonebot.log import logger
from pydantic import BaseModel
from nonebot_plugin_saa.registries import AllSupportedPlatformTarget
from nonebot.compat import PYDANTIC_V2, ConfigDict, model_dump, type_validate_json, type_validate_python
from ..utils import NBESFParseErr
from ....types import Tag, Category
from .base import NBESFBase, SubReceipt
from ...db_config import SubscribeDupException, config
# ===== nbesf 定义格式 ====== #
NBESF_VERSION = 3
class Target(BaseModel):
"""Bsion快递包发货信息"""
target_name: str
target: str
platform_name: str
default_schedule_weight: int
if PYDANTIC_V2:
model_config = ConfigDict(from_attributes=True)
else:
class Config:
orm_mode = True
class SubPayload(BaseModel):
"""Bison快递包里的单件货物"""
categories: list[Category]
tags: list[Tag]
target: Target
if PYDANTIC_V2:
model_config = ConfigDict(from_attributes=True)
else:
class Config:
orm_mode = True
class SubPack(BaseModel):
"""Bison给指定用户派送的快递包"""
# user_target: Bison快递包收货信息
user_target: AllSupportedPlatformTarget
subs: list[SubPayload]
class SubGroup(NBESFBase):
"""
Bison的全部订单(按用户分组)
结构参见`nbesf_model`下的对应版本
"""
version: int = NBESF_VERSION
groups: list[SubPack] = []
# ======================= #
async def subs_receipt_gen(nbesf_data: SubGroup):
for item in nbesf_data.groups:
sub_receipt = partial(SubReceipt, user=item.user_target)
for sub in item.subs:
receipt = sub_receipt(
target=sub.target.target,
target_name=sub.target.target_name,
platform_name=sub.target.platform_name,
cats=sub.categories,
tags=sub.tags,
)
try:
await config.add_subscribe(receipt.user, **model_dump(receipt, exclude={"user"}))
except SubscribeDupException:
logger.warning(f"!添加订阅条目 {repr(receipt)} 失败: 相同的订阅已存在")
except Exception as e:
logger.error(f"!添加订阅条目 {repr(receipt)} 失败: {repr(e)}")
else:
logger.success(f"添加订阅条目 {repr(receipt)} 成功!")
def nbesf_parser(raw_data: Any) -> SubGroup:
try:
if isinstance(raw_data, str):
nbesf_data = type_validate_json(SubGroup, raw_data)
else:
nbesf_data = type_validate_python(SubGroup, raw_data)
except Exception as e:
logger.error("数据解析失败该数据格式可能不满足NBESF格式标准")
raise NBESFParseErr("数据解析失败") from e
else:
return nbesf_data

View File

@ -8,3 +8,7 @@ class NoSuchSubscribeException(Exception):
class NoSuchTargetException(Exception):
pass
class DuplicateCookieTargetException(Exception):
pass

View File

@ -3,6 +3,7 @@ from pkgutil import iter_modules
from collections import defaultdict
from importlib import import_module
from ..utils import Site
from ..plugin_config import plugin_config
from .platform import Platform, make_no_target_group
@ -35,3 +36,10 @@ def _get_unavailable_platforms() -> dict[str, str]:
# platform => reason for not available
unavailable_paltforms: dict[str, str] = _get_unavailable_platforms()
site_manager: dict[str, type[Site]] = {}
for site in Site.registry:
if not hasattr(site, "name"):
continue
site_manager[site.name] = site

View File

@ -16,7 +16,7 @@ from nonebot_plugin_saa import PlatformTarget
from ..post import Post
from ..utils import Site, ProcessContext
from ..plugin_config import plugin_config
from ..types import Tag, Target, RawPost, SubUnit, Category
from ..types import Tag, Target, RawPost, SubUnit, Category, RegistryMeta
class CategoryNotSupport(Exception):
@ -29,21 +29,6 @@ class CategoryNotRecognize(Exception):
"""raise in get_category, when you don't know the category of post"""
class RegistryMeta(type):
def __new__(cls, name, bases, namespace, **kwargs):
return super().__new__(cls, name, bases, namespace)
def __init__(cls, name, bases, namespace, **kwargs):
if kwargs.get("base"):
# this is the base class
cls.registry = []
elif not kwargs.get("abstract"):
# this is the subclass
cls.registry.append(cls)
super().__init__(name, bases, namespace, **kwargs)
P = ParamSpec("P")
R = TypeVar("R")

View File

@ -10,12 +10,14 @@ from ..post import Post
from .platform import NewMessage
from ..types import Target, RawPost
from ..utils import Site, text_similarity
from ..utils.site import create_cookie_client_manager
class RssSite(Site):
name = "rss"
schedule_type = "interval"
schedule_setting = {"seconds": 30}
client_mgr = create_cookie_client_manager("rss")
class RssPost(Post):
@ -63,7 +65,7 @@ class Rss(NewMessage):
return post.id
async def get_sub_list(self, target: Target) -> list[RawPost]:
client = await self.ctx.get_client()
client = await self.ctx.get_client(target)
res = await client.get(target, timeout=10.0)
feed = feedparser.parse(res)
entries = feed.entries

View File

@ -13,6 +13,7 @@ from bs4 import BeautifulSoup as bs
from ..post import Post
from .platform import NewMessage
from ..utils import Site, http_client
from ..utils.site import create_cookie_client_manager
from ..types import Tag, Target, RawPost, ApiError, Category
_HEADER = {
@ -39,6 +40,7 @@ class WeiboSite(Site):
name = "weibo.com"
schedule_type = "interval"
schedule_setting = {"seconds": 3}
client_mgr = create_cookie_client_manager(name)
class Weibo(NewMessage):
@ -78,9 +80,11 @@ class Weibo(NewMessage):
raise cls.ParseTargetException(prompt="正确格式:\n1. 用户数字UID\n2. https://weibo.com/u/xxxx")
async def get_sub_list(self, target: Target) -> list[RawPost]:
client = await self.ctx.get_client()
client = await self.ctx.get_client(target)
header = {"Referer": f"https://m.weibo.cn/u/{target}", "MWeibo-Pwa": "1", "X-Requested-With": "XMLHttpRequest"}
# 获取 cookie 见 https://docs.rsshub.app/zh/deploy/config#%E5%BE%AE%E5%8D%9A
params = {"containerid": "107603" + target}
res = await client.get("https://m.weibo.cn/api/container/getIndex?", params=params, timeout=4.0)
res = await client.get("https://m.weibo.cn/api/container/getIndex?", headers=header, params=params, timeout=4.0)
res_data = json.loads(res.text)
if not res_data["ok"] and res_data["msg"] != "这里还没有内容":
raise ApiError(res.request.url)

View File

@ -1,3 +1,5 @@
from typing import cast
from nonebot.log import logger
from ..utils import Site
@ -7,6 +9,7 @@ from ..config.db_model import Target
from ..types import Target as T_Target
from ..platform import platform_manager
from ..plugin_config import plugin_config
from ..utils.site import CookieClientManager, is_cookie_client_manager
scheduler_dict: dict[type[Site], Scheduler] = {}
@ -30,6 +33,9 @@ async def init_scheduler():
else:
_schedule_class_platform_dict[site].append(platform_name)
for site, target_list in _schedule_class_dict.items():
if is_cookie_client_manager(site.client_mgr):
client_mgr = cast(CookieClientManager, site.client_mgr)
await client_mgr.refresh_anonymous_cookie()
if not plugin_config.bison_use_browser and site.require_browser:
logger.warning(f"{site.name} requires browser, it will not schedule.")
continue

View File

@ -12,6 +12,7 @@ from ..send import send_msgs
from ..types import Target, SubUnit
from ..platform import platform_manager
from ..utils import Site, ProcessContext
from ..utils.site import SkipRequestException
@dataclass
@ -107,6 +108,8 @@ class Scheduler:
schedulable.platform_name, schedulable.target
)
to_send = await platform_obj.do_fetch_new_post(SubUnit(schedulable.target, send_userinfo_list))
except SkipRequestException as err:
logger.debug(f"skip request: {err}")
except Exception as err:
records = context.gen_req_records()
for record in records:

View File

@ -14,6 +14,10 @@ from nonebot.adapters.onebot.v11.event import PrivateMessageEvent
from .add_sub import do_add_sub
from .del_sub import do_del_sub
from .query_sub import do_query_sub
from .add_cookie import do_add_cookie
from .del_cookie import do_del_cookie
from .add_cookie_target import do_add_cookie_target
from .del_cookie_target import do_del_cookie_target
from .utils import common_platform, admin_permission, gen_handle_cancel, configurable_to_me, set_target_user_info
add_sub_matcher = on_command(
@ -26,12 +30,10 @@ add_sub_matcher = on_command(
add_sub_matcher.handle()(set_target_user_info)
do_add_sub(add_sub_matcher)
query_sub_matcher = on_command("查询订阅", rule=configurable_to_me, priority=5, block=True)
query_sub_matcher.handle()(set_target_user_info)
do_query_sub(query_sub_matcher)
del_sub_matcher = on_command(
"删除订阅",
rule=configurable_to_me,
@ -42,6 +44,46 @@ del_sub_matcher = on_command(
del_sub_matcher.handle()(set_target_user_info)
do_del_sub(del_sub_matcher)
add_cookie_matcher = on_command(
"添加cookie",
aliases={"添加Cookie"},
rule=to_me(),
permission=SUPERUSER,
priority=5,
block=True,
)
do_add_cookie(add_cookie_matcher)
add_cookie_target_matcher = on_command(
"关联cookie",
aliases={"关联Cookie"},
rule=to_me(),
permission=SUPERUSER,
priority=5,
block=True,
)
do_add_cookie_target(add_cookie_target_matcher)
del_cookie_target_matcher = on_command(
"取消关联cookie",
aliases={"取消关联Cookie"},
rule=to_me(),
permission=SUPERUSER,
priority=5,
block=True,
)
do_del_cookie_target(del_cookie_target_matcher)
del_cookie_matcher = on_command(
"删除cookie",
aliases={"删除Cookie"},
rule=to_me(),
permission=SUPERUSER,
priority=5,
block=True,
)
do_del_cookie(del_cookie_matcher)
group_manage_matcher = on_command("群管理", rule=to_me(), permission=SUPERUSER, priority=4, block=True)
group_handle_cancel = gen_handle_cancel(group_manage_matcher, "已取消")
@ -125,4 +167,8 @@ __all__ = [
"del_sub_matcher",
"group_manage_matcher",
"no_permission_matcher",
"add_cookie_matcher",
"add_cookie_target_matcher",
"del_cookie_target_matcher",
"del_cookie_matcher",
]

View File

@ -0,0 +1,69 @@
from typing import cast
from nonebot.typing import T_State
from nonebot.matcher import Matcher
from nonebot.params import Arg, ArgPlainText
from nonebot.adapters import Message, MessageTemplate
from ..platform import platform_manager
from .utils import common_platform, gen_handle_cancel
from ..utils.site import CookieClientManager, is_cookie_client_manager
def do_add_cookie(add_cookie: type[Matcher]):
handle_cancel = gen_handle_cancel(add_cookie, "已中止添加cookie")
@add_cookie.handle()
async def init_promote(state: T_State):
state["_prompt"] = (
"请输入想要添加 Cookie 的平台,目前支持,请输入冒号左边的名称:\n"
+ "".join(
[
f"{platform_name}: {platform_manager[platform_name].name}\n"
for platform_name in common_platform
if is_cookie_client_manager(platform_manager[platform_name].site.client_mgr)
]
)
+ "要查看全部平台请输入:“全部”\n中止添加cookie过程请输入“取消”"
)
@add_cookie.got("platform", MessageTemplate("{_prompt}"), [handle_cancel])
async def parse_platform(state: T_State, platform: str = ArgPlainText()) -> None:
if platform == "全部":
message = "全部平台\n" + "\n".join(
[
f"{platform_name}: {platform.name}"
for platform_name, platform in platform_manager.items()
if is_cookie_client_manager(platform_manager[platform_name].site.client_mgr)
]
)
await add_cookie.reject(message)
elif platform == "取消":
await add_cookie.finish("已中止添加cookie")
elif platform in platform_manager:
state["platform"] = platform
state["site"] = platform_manager[platform].site
else:
await add_cookie.reject("平台输入错误")
@add_cookie.handle()
async def prepare_get_id(state: T_State):
state["_prompt"] = "请输入 Cookie"
@add_cookie.got("cookie", MessageTemplate("{_prompt}"), [handle_cancel])
async def got_cookie(state: T_State, cookie: Message = Arg()):
client_mgr: type[CookieClientManager] = cast(
type[CookieClientManager], platform_manager[state["platform"]].site.client_mgr
)
cookie_text = cookie.extract_plain_text()
if not await client_mgr.validate_cookie(cookie_text):
await add_cookie.reject(state["site"].cookie_format_prompt)
state["cookie"] = cookie_text
@add_cookie.handle()
async def add_cookie_process(state: T_State):
client_mgr = cast(CookieClientManager, platform_manager[state["platform"]].site.client_mgr)
await client_mgr.add_user_cookie(state["cookie"])
await add_cookie.finish(
f"已添加 Cookie: {state['cookie']} 到平台 {state['platform']}" + "\n请使用“关联cookie”为 Cookie 关联订阅"
)

View File

@ -0,0 +1,75 @@
from typing import cast
from nonebot.typing import T_State
from nonebot.matcher import Matcher
from nonebot.params import ArgPlainText
from nonebot_plugin_saa import MessageFactory
from nonebot.internal.adapter import MessageTemplate
from ..config import config
from ..utils import parse_text
from ..platform import platform_manager
from ..utils.site import CookieClientManager
from .utils import gen_handle_cancel, generate_sub_list_text
def do_add_cookie_target(add_cookie_target_matcher: type[Matcher]):
handle_cancel = gen_handle_cancel(add_cookie_target_matcher, "已中止关联 cookie")
@add_cookie_target_matcher.handle()
async def init_promote(state: T_State):
res = await generate_sub_list_text(
add_cookie_target_matcher, state, is_index=True, is_show_cookie=True, is_hide_no_cookie_platfrom=True
)
res += "请输入要关联 cookie 的订阅的序号\n输入'取消'中止"
await MessageFactory(await parse_text(res)).send()
@add_cookie_target_matcher.got("target_idx", parameterless=[handle_cancel])
async def got_target_idx(state: T_State, target_idx: str = ArgPlainText()):
try:
target_idx = int(target_idx)
state["target"] = state["sub_table"][target_idx]
state["site"] = platform_manager[state["target"]["platform_name"]].site
except Exception:
await add_cookie_target_matcher.reject("序号错误")
@add_cookie_target_matcher.handle()
async def init_promote_cookie(state: T_State):
# 获取 site 的所有用户 cookie再排除掉已经关联的 cookie剩下的就是可以关联的 cookie
cookies = await config.get_cookie(site_name=state["site"].name, is_anonymous=False)
associated_cookies = await config.get_cookie(
target=state["target"]["target"],
site_name=state["site"].name,
is_anonymous=False,
)
associated_cookie_ids = {cookie.id for cookie in associated_cookies}
cookies = [cookie for cookie in cookies if cookie.id not in associated_cookie_ids]
if not cookies:
await add_cookie_target_matcher.finish(
"当前平台暂无可关联的 Cookie请使用“添加cookie”命令添加或检查已关联的 Cookie"
)
state["cookies"] = cookies
client_mgr = cast(CookieClientManager, state["site"].client_mgr)
state["_prompt"] = "请选择一个 Cookie已关联的 Cookie 不会显示\n" + "\n".join(
[f"{idx}. {await client_mgr.get_cookie_friendly_name(cookie)}" for idx, cookie in enumerate(cookies, 1)]
)
@add_cookie_target_matcher.got("cookie_idx", MessageTemplate("{_prompt}"), [handle_cancel])
async def got_cookie_idx(state: T_State, cookie_idx: str = ArgPlainText()):
try:
cookie_idx = int(cookie_idx)
state["cookie"] = state["cookies"][cookie_idx - 1]
except Exception:
await add_cookie_target_matcher.reject("序号错误")
@add_cookie_target_matcher.handle()
async def add_cookie_target_process(state: T_State):
await config.add_cookie_target(state["target"]["target"], state["target"]["platform_name"], state["cookie"].id)
cookie = state["cookie"]
client_mgr = cast(CookieClientManager, state["site"].client_mgr)
await add_cookie_target_matcher.finish(
f"已关联 Cookie: {await client_mgr.get_cookie_friendly_name(cookie)} "
f"到订阅 {state['site'].name} {state['target']['target']}"
)

View File

@ -0,0 +1,48 @@
from nonebot.typing import T_State
from nonebot.matcher import Matcher
from nonebot.params import EventPlainText
from nonebot_plugin_saa import MessageFactory
from ..config import config
from ..utils import parse_text
from ..platform import site_manager
from .utils import gen_handle_cancel
def do_del_cookie(del_cookie: type[Matcher]):
handle_cancel = gen_handle_cancel(del_cookie, "删除中止")
@del_cookie.handle()
async def send_list(state: T_State):
cookies = await config.get_cookie(is_anonymous=False)
if not cookies:
await del_cookie.finish("暂无已添加的 Cookie\n请使用“添加cookie”命令添加")
res = "已添加的 Cookie 为:\n"
state["cookie_table"] = {}
for index, cookie in enumerate(cookies, 1):
state["cookie_table"][index] = cookie
client_mgr = site_manager[cookie.site_name].client_mgr
friendly_name = await client_mgr.get_cookie_friendly_name(cookie)
res += f"{index} {cookie.site_name} {friendly_name} {len(cookie.targets)}个关联\n"
if res[-1] != "\n":
res += "\n"
res += "请输入要删除的 Cookie 的序号\n输入'取消'中止"
await MessageFactory(await parse_text(res)).send()
@del_cookie.receive(parameterless=[handle_cancel])
async def do_del(
state: T_State,
index_str: str = EventPlainText(),
):
try:
index = int(index_str)
cookie = state["cookie_table"][index]
if cookie.targets:
await del_cookie.reject("只能删除未关联的 Cookie请使用“取消关联cookie”命令取消关联")
await config.delete_cookie_by_id(cookie.id)
except KeyError:
await del_cookie.reject("序号错误")
except Exception:
await del_cookie.reject("删除错误")
else:
await del_cookie.finish("删除成功")

View File

@ -0,0 +1,51 @@
from typing import cast
from nonebot.typing import T_State
from nonebot.matcher import Matcher
from nonebot.params import EventPlainText
from nonebot_plugin_saa import MessageFactory
from ..config import config
from ..utils import parse_text
from .utils import gen_handle_cancel
from ..platform import platform_manager
from ..utils.site import CookieClientManager
def do_del_cookie_target(del_cookie_target: type[Matcher]):
handle_cancel = gen_handle_cancel(del_cookie_target, "取消关联中止")
@del_cookie_target.handle()
async def send_list(state: T_State):
cookie_targets = await config.get_cookie_target()
if not cookie_targets:
await del_cookie_target.finish("暂无已关联 Cookie\n请使用“添加cookie”命令添加关联")
res = "已关联的 Cookie 为:\n"
state["cookie_target_table"] = {}
for index, cookie_target in enumerate(cookie_targets, 1):
client_mgr = cast(CookieClientManager, platform_manager[cookie_target.target.platform_name].site.client_mgr)
friendly_name = await client_mgr.get_cookie_friendly_name(cookie_target.cookie)
state["cookie_target_table"][index] = {
"platform_name": cookie_target.target.platform_name,
"target": cookie_target.target,
"friendly_name": friendly_name,
"cookie_target": cookie_target,
}
res += f"{index} {cookie_target.target.platform_name} {cookie_target.target.target_name} {friendly_name}\n"
if res[-1] != "\n":
res += "\n"
res += "请输入要删除的关联的序号\n输入'取消'中止"
await MessageFactory(await parse_text(res)).send()
@del_cookie_target.receive(parameterless=[handle_cancel])
async def do_del(
state: T_State,
index_str: str = EventPlainText(),
):
try:
index = int(index_str)
await config.delete_cookie_target_by_id(state["cookie_target_table"][index]["cookie_target"].id)
except Exception:
await del_cookie_target.reject("删除错误")
else:
await del_cookie_target.finish("删除成功")

View File

@ -1,16 +1,21 @@
import contextlib
from typing import Annotated
from itertools import groupby
from operator import attrgetter
from typing import Annotated, cast
from nonebot.rule import Rule
from nonebot.adapters import Event
from nonebot.typing import T_State
from nonebot.matcher import Matcher
from nonebot.permission import SUPERUSER
from nonebot_plugin_saa import extract_target
from nonebot.params import Depends, EventToMe, EventPlainText
from nonebot_plugin_saa import PlatformTarget, extract_target
from ..platform import platform_manager
from ..config import config
from ..types import Category
from ..plugin_config import plugin_config
from ..platform import site_manager, platform_manager
from ..utils.site import CookieClientManager, is_cookie_client_manager
def _configurable_to_me(to_me: bool = EventToMe()):
@ -60,3 +65,59 @@ def admin_permission():
permission = permission | GROUP_ADMIN | GROUP_OWNER
return permission
async def generate_sub_list_text(
matcher: type[Matcher],
state: T_State,
user_info: PlatformTarget = None,
is_index=False,
is_show_cookie=False,
is_hide_no_cookie_platfrom=False,
):
"""根据配置参数生产订阅列表文本同时将订阅信息存入state["sub_table"]"""
if user_info:
sub_list = await config.list_subscribe(user_info)
else:
sub_list = await config.list_subs_with_all_info()
sub_list = [
next(group)
for key, group in groupby(sorted(sub_list, key=attrgetter("target_id")), key=attrgetter("target_id"))
]
if is_hide_no_cookie_platfrom:
sub_list = [
sub
for sub in sub_list
if is_cookie_client_manager(platform_manager.get(sub.target.platform_name).site.client_mgr)
]
if not sub_list:
await matcher.finish("暂无已订阅账号\n请使用“添加订阅”命令添加订阅")
res = "订阅的帐号为:\n"
state["sub_table"] = {}
for index, sub in enumerate(sub_list, 1):
state["sub_table"][index] = {
"platform_name": sub.target.platform_name,
"target": sub.target.target,
}
res += f"{index} " if is_index else ""
res += f"{sub.target.platform_name} {sub.target.target_name} {sub.target.target}\n"
if platform := platform_manager.get(sub.target.platform_name):
if platform.categories:
res += " [{}]".format(", ".join(platform.categories[Category(x)] for x in sub.categories)) + "\n"
if platform.enable_tag:
if sub.tags:
res += " {}".format(", ".join(sub.tags)) + "\n"
if is_show_cookie:
target_cookies = await config.get_cookie(
target=sub.target.target, site_name=platform.site.name, is_anonymous=False
)
if target_cookies:
res += " 关联的 Cookie\n"
for cookie in target_cookies:
client_mgr = cast(CookieClientManager, site_manager[platform.site.name].client_mgr)
res += f" \t{await client_mgr.get_cookie_friendly_name(cookie)}\n"
else:
res += f" (平台 {sub.target.platform_name} 已失效,请删除此订阅)"
return res

View File

@ -58,3 +58,18 @@ class ApiError(Exception):
class SubUnit(NamedTuple):
sub_target: Target
user_sub_infos: list[UserSubInfo]
class RegistryMeta(type):
def __new__(cls, name, bases, namespace, **kwargs):
return super().__new__(cls, name, bases, namespace)
def __init__(cls, name, bases, namespace, **kwargs):
if kwargs.get("base"):
# this is the base class
cls.registry = []
elif not kwargs.get("abstract"):
# this is the subclass
cls.registry.append(cls)
super().__init__(name, bases, namespace, **kwargs)

View File

@ -1,25 +1,25 @@
import difflib
import re
import sys
import difflib
import nonebot
from nonebot.plugin import require
from bs4 import BeautifulSoup as bs
from nonebot.log import logger, default_format
from nonebot.plugin import require
from nonebot_plugin_saa import Text, Image, MessageSegmentFactory
from .site import Site as Site
from ..plugin_config import plugin_config
from .image import pic_merge as pic_merge
from .context import ProcessContext as ProcessContext
from .http import http_client as http_client
from .image import capture_html as capture_html
from .site import ClientManager as ClientManager
from .image import text_to_image as text_to_image
from .site import anonymous_site as anonymous_site
from .context import ProcessContext as ProcessContext
from .image import is_pics_mergable as is_pics_mergable
from .image import pic_merge as pic_merge
from .image import pic_url_to_image as pic_url_to_image
from .image import text_to_image as text_to_image
from .site import ClientManager as ClientManager
from .site import DefaultClientManager as DefaultClientManager
from .site import Site as Site
from .site import anonymous_site as anonymous_site
from ..plugin_config import plugin_config
class Singleton(type):

View File

@ -22,8 +22,9 @@ class ProcessContext:
async def _log_to_ctx(r: Response):
self._log_response(r)
existing_hooks = client.event_hooks["response"]
hooks = {
"response": [_log_to_ctx],
"response": [*existing_hooks, _log_to_ctx],
}
client.event_hooks = hooks

View File

@ -1,10 +1,17 @@
import json
from typing import Literal
from json import JSONDecodeError
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
import httpx
from httpx import AsyncClient
from nonebot.log import logger
from ..types import Target
from ..config import config
from .http import http_client
from ..config.db_model import Cookie
from ..types import Target, RegistryMeta
class ClientManager(ABC):
@ -35,12 +42,121 @@ class DefaultClientManager(ClientManager):
pass
class Site:
class CookieClientManager(ClientManager):
_site_name: str
_default_cd: int = timedelta(seconds=10)
@classmethod
async def refresh_anonymous_cookie(cls):
"""移除已有的匿名cookie添加一个新的匿名cookie"""
anonymous_cookies = await config.get_cookie(cls._site_name, is_anonymous=True)
anonymous_cookie = Cookie(site_name=cls._site_name, content="{}", is_universal=True, is_anonymous=True)
for cookie in anonymous_cookies:
if not cookie.is_anonymous:
continue
await config.delete_cookie_by_id(cookie.id)
anonymous_cookie.id = cookie.id # 保持原有的id
anonymous_cookie.last_usage = datetime.now() # 使得第一次请求优先使用用户 cookie
await config.add_cookie(anonymous_cookie)
@classmethod
async def add_user_cookie(cls, content: str):
"""添加用户 cookie"""
cookie = Cookie(site_name=cls._site_name, content=content)
cookie.cd = cls._default_cd
await config.add_cookie(cookie)
@classmethod
async def validate_cookie(cls, content: str) -> bool:
"""验证 cookie 内容是否有效,添加 cookie 时用,可根据平台的具体情况进行重写"""
try:
data = json.loads(content)
if not isinstance(data, dict):
return False
except JSONDecodeError:
return False
return True
@classmethod
async def get_cookie_friendly_name(cls, cookie: Cookie) -> str:
"""获取 cookie 的友好名字,用于展示"""
from . import text_fletten
return text_fletten(f"{cookie.site_name} [{cookie.content[:10]}]")
def _generate_hook(self, cookie: Cookie) -> callable:
"""hook 函数生成器,用于回写请求状态到数据库"""
async def _response_hook(resp: httpx.Response):
if resp.status_code == 200:
logger.trace(f"请求成功: {cookie.id} {resp.request.url}")
cookie.status = "success"
else:
logger.warning(f"请求失败:{cookie.id} {resp.request.url}, 状态码: {resp.status_code}")
cookie.status = "failed"
cookie.last_usage = datetime.now()
await config.update_cookie(cookie)
return _response_hook
async def _choose_cookie(self, target: Target | None) -> Cookie:
"""选择 cookie 的具体算法"""
cookies = await config.get_cookie(self._site_name, target)
cookies = (cookie for cookie in cookies if cookie.last_usage + cookie.cd < datetime.now())
cookie = min(cookies, key=lambda x: x.last_usage)
return cookie
async def get_client(self, target: Target | None) -> AsyncClient:
"""获取 client根据 target 选择 cookie"""
client = http_client()
cookie = await self._choose_cookie(target)
if cookie.is_universal:
logger.trace(f"平台 {self._site_name} 未获取到用户cookie, 使用匿名cookie")
else:
logger.trace(f"平台 {self._site_name} 获取到用户cookie: {cookie.id}")
return await self._assemble_client(client, cookie)
async def _assemble_client(self, client, cookie) -> AsyncClient:
"""组装 client可以自定义 cookie 对象的 content 装配到 client 中的方式"""
cookies = httpx.Cookies()
if cookie:
cookies.update(json.loads(cookie.content))
client.cookies = cookies
client.event_hooks = {"response": [self._generate_hook(cookie)]}
return client
async def get_client_for_static(self) -> AsyncClient:
return http_client()
async def get_query_name_client(self) -> AsyncClient:
return http_client()
async def refresh_client(self):
pass
def is_cookie_client_manager(manger: type[ClientManager]) -> bool:
return issubclass(manger, CookieClientManager)
def create_cookie_client_manager(site_name: str) -> type[CookieClientManager]:
"""创建一个平台特化的 CookieClientManger"""
return type(
"CookieClientManager",
(CookieClientManager,),
{"_site_name": site_name},
)
class Site(metaclass=RegistryMeta, base=True):
schedule_type: Literal["date", "interval", "cron"]
schedule_setting: dict
name: str
client_mgr: type[ClientManager] = DefaultClientManager
require_browser: bool = False
registry: list[type["Site"]]
cookie_format_prompt = "无效的 Cookie请检查后重新输入详情见<待添加的文档>"
def __str__(self):
return f"[{self.name}]-{self.name}-{self.schedule_setting}"
@ -56,3 +172,7 @@ def anonymous_site(schedule_type: Literal["date", "interval", "cron"], schedule_
"client_mgr": DefaultClientManager,
},
)
class SkipRequestException(Exception):
pass

0
tests/config/__init__.py Normal file
View File

123
tests/config/test_cookie.py Normal file
View File

@ -0,0 +1,123 @@
import json
from typing import cast
from datetime import datetime
import pytest
from nonebug import App
async def test_cookie(app: App, init_scheduler):
from nonebot_plugin_saa import TargetQQGroup
from nonebot_bison.platform import site_manager
from nonebot_bison.config.db_config import config
from nonebot_bison.types import Target as T_Target
from nonebot_bison.utils.site import CookieClientManager
from nonebot_bison.config.utils import DuplicateCookieTargetException
target = T_Target("weibo_id")
platform_name = "weibo"
await config.add_subscribe(
TargetQQGroup(group_id=123),
target=target,
target_name="weibo_name",
platform_name=platform_name,
cats=[],
tags=[],
)
site = site_manager["weibo.com"]
client_mgr = cast(CookieClientManager, site.client_mgr)
# 刷新匿名cookie
await client_mgr.refresh_anonymous_cookie()
cookies = await config.get_cookie(site_name=site.name)
assert len(cookies) == 1
# 添加用户cookie
await client_mgr.add_user_cookie(json.dumps({"test_cookie": "1"}))
await client_mgr.add_user_cookie(json.dumps({"test_cookie": "2"}))
cookies = await config.get_cookie(site_name=site.name)
assert len(cookies) == 3
cookies = await config.get_cookie(site_name=site.name, is_anonymous=False)
assert len(cookies) == 2
# 单个target多个cookie
await config.add_cookie_target(target, platform_name, cookies[0].id)
await config.add_cookie_target(target, platform_name, cookies[1].id)
cookies = await config.get_cookie(site_name=site.name, target=target)
assert len(cookies) == 3
cookies = await config.get_cookie(site_name=site.name, target=target, is_anonymous=False)
assert len(cookies) == 2
cookies = await config.get_cookie(site_name=site.name, target=target, is_universal=False)
assert len(cookies) == 2
# 测试不同的target
target2 = T_Target("weibo_id2")
await config.add_subscribe(
TargetQQGroup(group_id=123),
target=target2,
target_name="weibo_name2",
platform_name=platform_name,
cats=[],
tags=[],
)
await client_mgr.add_user_cookie(json.dumps({"test_cookie": "3"}))
cookies = await config.get_cookie(site_name=site.name, is_anonymous=False)
# 多个target多个cookie
await config.add_cookie_target(target2, platform_name, cookies[0].id)
await config.add_cookie_target(target2, platform_name, cookies[2].id)
cookies = await config.get_cookie(site_name=site.name, target=target2)
assert len(cookies) == 3
# 重复关联 target
with pytest.raises(DuplicateCookieTargetException) as e:
await config.add_cookie_target(target2, platform_name, cookies[2].id)
assert isinstance(e.value, DuplicateCookieTargetException)
cookies = await config.get_cookie(site_name=site.name, target=target2, is_anonymous=False)
assert len(cookies) == 2
# 有关联的cookie不能删除
with pytest.raises(Exception, match="cookie") as e:
await config.delete_cookie_by_id(cookies[1].id)
cookies = await config.get_cookie(site_name=site.name, target=target2, is_anonymous=False)
assert len(cookies) == 2
await config.delete_cookie_target(target2, platform_name, cookies[1].id)
await config.delete_cookie_by_id(cookies[1].id)
cookies = await config.get_cookie(site_name=site.name, target=target2, is_anonymous=False)
assert len(cookies) == 1
cookie = cookies[0]
cookie_id = cookie.id
cookie.last_usage = datetime(2024, 9, 13)
cookie.status = "test"
await config.update_cookie(cookie)
cookies = await config.get_cookie(site_name=site.name, target=target2, is_anonymous=False)
assert len(cookies) == 1
assert cookies[0].id == cookie_id
assert cookies[0].last_usage == datetime(2024, 9, 13)
assert cookies[0].status == "test"
# 不存在的 cookie_id
cookie.id = 114514
with pytest.raises(ValueError, match="cookie") as e:
await config.update_cookie(cookie)
# 获取所有关联对象
cookie_targets = await config.get_cookie_target()
assert len(cookie_targets) == 3
# 删除关联对象
await config.delete_cookie_target_by_id(cookie_targets[0].id)
cookie_targets = await config.get_cookie_target()
assert len(cookie_targets) == 2

View File

@ -0,0 +1,212 @@
import json
from nonebug.app import App
from pytest_mock import MockerFixture
from ..utils import BotReply, fake_superuser, fake_admin_user, fake_private_message_event
async def test_add_cookie_rule(app: App, mocker: MockerFixture):
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.plugin_config import plugin_config
from nonebot_bison.sub_manager import add_cookie_matcher
mocker.patch.object(plugin_config, "bison_to_me", True)
async with app.test_matcher(add_cookie_matcher) as ctx:
bot = ctx.create_bot(base=Bot)
event = fake_private_message_event(message=Message("添加cookie"), sender=fake_superuser)
ctx.receive_event(bot, event)
ctx.should_pass_rule()
ctx.should_pass_permission()
async with app.test_matcher(add_cookie_matcher) as ctx:
bot = ctx.create_bot(base=Bot)
event = fake_private_message_event(message=Message("添加cookie"), sender=fake_admin_user)
ctx.receive_event(bot, event)
ctx.should_not_pass_rule()
ctx.should_pass_permission()
async def test_add_cookie_target_no_cookie(app: App, mocker: MockerFixture):
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.sub_manager import add_cookie_target_matcher
async with app.test_matcher(add_cookie_target_matcher) as ctx:
bot = ctx.create_bot(base=Bot)
from nonebug_saa import should_send_saa
from nonebot_plugin_saa import TargetQQGroup, MessageFactory
from nonebot_bison.config import config
from nonebot_bison.types import Target as T_Target
target = T_Target("weibo_id")
platform_name = "weibo"
await config.add_subscribe(
TargetQQGroup(group_id=123),
target=target,
target_name="weibo_name",
platform_name=platform_name,
cats=[],
tags=[],
)
event_1 = fake_private_message_event(
message=Message("关联cookie"), sender=fake_superuser, to_me=True, user_id=fake_superuser.user_id
)
ctx.receive_event(bot, event_1)
ctx.should_pass_rule()
should_send_saa(
ctx,
MessageFactory(
"订阅的帐号为:\n1 weibo weibo_name weibo_id\n []\n请输入要关联 cookie 的订阅的序号\n输入'取消'中止"
),
bot,
event=event_1,
)
event_2 = fake_private_message_event(
message=Message("1"), sender=fake_superuser, to_me=True, user_id=fake_superuser.user_id
)
ctx.receive_event(bot, event_2)
ctx.should_pass_rule()
ctx.should_call_send(
event_2,
"当前平台暂无可关联的 Cookie请使用“添加cookie”命令添加或检查已关联的 Cookie",
True,
)
async def test_add_cookie(app: App, mocker: MockerFixture):
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.platform import platform_manager
from nonebot_bison.sub_manager import common_platform, add_cookie_matcher, add_cookie_target_matcher
async with app.test_matcher(add_cookie_matcher) as ctx:
bot = ctx.create_bot(base=Bot)
event_1 = fake_private_message_event(
message=Message("添加Cookie"), sender=fake_superuser, to_me=True, user_id=fake_superuser.user_id
)
ctx.receive_event(bot, event_1)
ctx.should_pass_rule()
ctx.should_call_send(
event_1,
BotReply.add_reply_on_add_cookie(platform_manager, common_platform),
True,
)
event_2 = fake_private_message_event(
message=Message("全部"), sender=fake_superuser, to_me=True, user_id=fake_superuser.user_id
)
ctx.receive_event(bot, event_2)
ctx.should_pass_rule()
ctx.should_rejected()
ctx.should_call_send(
event_2,
BotReply.add_reply_on_add_cookie_input_allplatform(platform_manager),
True,
)
event_3 = fake_private_message_event(
message=Message("weibo"), sender=fake_superuser, to_me=True, user_id=fake_superuser.user_id
)
ctx.receive_event(bot, event_3)
ctx.should_pass_rule()
ctx.should_call_send(event_3, BotReply.add_reply_on_input_cookie)
event_4_err = fake_private_message_event(
message=Message("test"), sender=fake_superuser, to_me=True, user_id=fake_superuser.user_id
)
ctx.receive_event(bot, event_4_err)
ctx.should_call_send(event_4_err, "无效的 Cookie请检查后重新输入详情见<待添加的文档>", True)
ctx.should_rejected()
event_4_ok = fake_private_message_event(
message=Message(json.dumps({"cookie": "test"})),
sender=fake_superuser,
to_me=True,
user_id=fake_superuser.user_id,
)
ctx.receive_event(bot, event_4_ok)
ctx.should_pass_rule()
ctx.should_call_send(
event_4_ok, '已添加 Cookie: {"cookie": "test"} 到平台 weibo\n请使用“关联cookie”为 Cookie 关联订阅', True
)
async with app.test_matcher(add_cookie_target_matcher) as ctx:
from nonebug_saa import should_send_saa
from nonebot_plugin_saa import TargetQQGroup, MessageFactory
from nonebot_bison.config import config
from nonebot_bison.types import Target as T_Target
target = T_Target("weibo_id")
platform_name = "weibo"
await config.add_subscribe(
TargetQQGroup(group_id=123),
target=target,
target_name="weibo_name",
platform_name=platform_name,
cats=[],
tags=[],
)
bot = ctx.create_bot(base=Bot)
event_1 = fake_private_message_event(
message=Message("关联cookie"), sender=fake_superuser, to_me=True, user_id=fake_superuser.user_id
)
ctx.receive_event(bot, event_1)
ctx.should_pass_rule()
should_send_saa(
ctx,
MessageFactory(
"订阅的帐号为:\n1 weibo weibo_name weibo_id\n []\n请输入要关联 cookie 的订阅的序号\n输入'取消'中止"
),
bot,
event=event_1,
)
event_2_err = fake_private_message_event(
message=Message("2"), sender=fake_superuser, to_me=True, user_id=fake_superuser.user_id
)
ctx.receive_event(bot, event_2_err)
ctx.should_call_send(event_2_err, "序号错误", True)
ctx.should_rejected()
event_2_ok = fake_private_message_event(
message=Message("1"), sender=fake_superuser, to_me=True, user_id=fake_superuser.user_id
)
ctx.receive_event(bot, event_2_ok)
ctx.should_pass_rule()
ctx.should_call_send(event_2_ok, '请选择一个 Cookie已关联的 Cookie 不会显示\n1. weibo.com [{"cookie":]', True)
event_3_err = fake_private_message_event(
message=Message("2"), sender=fake_superuser, to_me=True, user_id=fake_superuser.user_id
)
ctx.receive_event(bot, event_3_err)
ctx.should_call_send(event_3_err, "序号错误", True)
ctx.should_rejected()
event_3_ok = fake_private_message_event(
message=Message("1"), sender=fake_superuser, to_me=True, user_id=fake_superuser.user_id
)
ctx.receive_event(bot, event_3_ok)
ctx.should_pass_rule()
ctx.should_call_send(event_3_ok, '已关联 Cookie: weibo.com [{"cookie":] 到订阅 weibo.com weibo_id', True)
async def test_add_cookie_target_no_target(app: App, mocker: MockerFixture):
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.sub_manager import add_cookie_target_matcher
async with app.test_matcher(add_cookie_target_matcher) as ctx:
bot = ctx.create_bot(base=Bot)
event_1 = fake_private_message_event(
message=Message("关联cookie"), sender=fake_superuser, to_me=True, user_id=fake_superuser.user_id
)
ctx.receive_event(bot, event_1)
ctx.should_pass_rule()
ctx.should_call_send(
event_1,
"暂无已订阅账号\n请使用“添加订阅”命令添加订阅",
True,
)

View File

@ -0,0 +1,133 @@
import json
from nonebug.app import App
from ..utils import fake_superuser, fake_private_message_event
async def test_del_cookie_err(app: App):
from nonebug_saa import should_send_saa
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message
from nonebot_plugin_saa import TargetQQGroup, MessageFactory
from nonebot_bison.config import config
from nonebot_bison.config.db_model import Cookie
from nonebot_bison.types import Target as T_Target
from nonebot_bison.sub_manager import del_cookie_matcher
async with app.test_matcher(del_cookie_matcher) as ctx:
bot = ctx.create_bot(base=Bot)
event = fake_private_message_event(
message=Message("删除cookie"), sender=fake_superuser, to_me=True, user_id=fake_superuser.user_id
)
ctx.receive_event(bot, event)
ctx.should_pass_rule()
ctx.should_pass_permission()
ctx.should_call_send(event, "暂无已添加的 Cookie\n请使用“添加cookie”命令添加", True)
async with app.test_matcher(del_cookie_matcher) as ctx:
bot = ctx.create_bot(base=Bot)
target = T_Target("weibo_id")
platform_name = "weibo"
await config.add_subscribe(
TargetQQGroup(group_id=123),
target=target,
target_name="weibo_name",
platform_name=platform_name,
cats=[],
tags=[],
)
await config.add_cookie(Cookie(content=json.dumps({"cookie": "test"}), site_name="weibo.com"))
cookies = await config.get_cookie(is_anonymous=False)
await config.add_cookie_target(target, platform_name, cookies[0].id)
event_1 = fake_private_message_event(
message=Message("删除cookie"), sender=fake_superuser, to_me=True, user_id=fake_superuser.user_id
)
ctx.receive_event(bot, event_1)
ctx.should_pass_rule()
ctx.should_pass_permission()
should_send_saa(
ctx,
MessageFactory(
'已添加的 Cookie 为:\n1 weibo.com weibo.com [{"cookie":] '
"1个关联\n请输入要删除的 Cookie 的序号\n输入'取消'中止"
),
bot,
event=event_1,
)
event_2_err = fake_private_message_event(
message=Message("2"), sender=fake_superuser, to_me=True, user_id=fake_superuser.user_id
)
ctx.receive_event(bot, event_2_err)
ctx.should_call_send(event_2_err, "序号错误", True)
ctx.should_rejected()
event_2 = fake_private_message_event(
message=Message("1"), sender=fake_superuser, to_me=True, user_id=fake_superuser.user_id
)
ctx.receive_event(bot, event_2)
ctx.should_pass_rule()
ctx.should_call_send(event_2, "只能删除未关联的 Cookie请使用“取消关联cookie”命令取消关联", True)
ctx.should_call_send(event_2, "删除错误", True)
ctx.should_rejected()
async def test_del_cookie(app: App):
from nonebug_saa import should_send_saa
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message
from nonebot_plugin_saa import TargetQQGroup, MessageFactory
from nonebot_bison.config import config
from nonebot_bison.config.db_model import Cookie
from nonebot_bison.types import Target as T_Target
from nonebot_bison.sub_manager import del_cookie_matcher
async with app.test_matcher(del_cookie_matcher) as ctx:
bot = ctx.create_bot(base=Bot)
event = fake_private_message_event(
message=Message("删除cookie"), sender=fake_superuser, to_me=True, user_id=fake_superuser.user_id
)
ctx.receive_event(bot, event)
ctx.should_pass_rule()
ctx.should_pass_permission()
ctx.should_call_send(event, "暂无已添加的 Cookie\n请使用“添加cookie”命令添加", True)
async with app.test_matcher(del_cookie_matcher) as ctx:
bot = ctx.create_bot(base=Bot)
target = T_Target("weibo_id")
platform_name = "weibo"
await config.add_subscribe(
TargetQQGroup(group_id=123),
target=target,
target_name="weibo_name",
platform_name=platform_name,
cats=[],
tags=[],
)
await config.add_cookie(Cookie(content=json.dumps({"cookie": "test"}), site_name="weibo.com"))
event_1 = fake_private_message_event(
message=Message("删除cookie"), sender=fake_superuser, to_me=True, user_id=fake_superuser.user_id
)
ctx.receive_event(bot, event_1)
ctx.should_pass_rule()
ctx.should_pass_permission()
should_send_saa(
ctx,
MessageFactory(
'已添加的 Cookie 为:\n1 weibo.com weibo.com [{"cookie":]'
" 0个关联\n请输入要删除的 Cookie 的序号\n输入'取消'中止"
),
bot,
event=event_1,
)
event_2 = fake_private_message_event(
message=Message("1"), sender=fake_superuser, to_me=True, user_id=fake_superuser.user_id
)
ctx.receive_event(bot, event_2)
ctx.should_pass_rule()
ctx.should_pass_permission()
ctx.should_call_send(event_2, "删除成功", True)

View File

@ -89,6 +89,7 @@ add_reply_on_id_input_search = (
class BotReply:
@staticmethod
def add_reply_on_platform(platform_manager, common_platform):
return (
@ -159,3 +160,33 @@ class BotReply:
add_reply_on_tags_need_more_info = "订阅标签直接输入标签内容\n屏蔽标签请在标签名称前添加~号\n详见https://nonebot-bison.netlify.app/usage/#%E5%B9%B3%E5%8F%B0%E8%AE%A2%E9%98%85%E6%A0%87%E7%AD%BE-tag"
add_reply_abort = "已中止订阅"
no_permission = "您没有权限进行此操作,请联系 Bot 管理员"
@staticmethod
def add_reply_on_add_cookie(platform_manager, common_platform):
from nonebot_bison.utils.site import is_cookie_client_manager
return (
"请输入想要添加 Cookie 的平台,目前支持,请输入冒号左边的名称:\n"
+ "".join(
[
f"{platform_name}: {platform_manager[platform_name].name}\n"
for platform_name in common_platform
if is_cookie_client_manager(platform_manager[platform_name].site.client_mgr)
]
)
+ "要查看全部平台请输入:“全部”\n中止添加cookie过程请输入“取消”"
)
@staticmethod
def add_reply_on_add_cookie_input_allplatform(platform_manager):
from nonebot_bison.utils.site import is_cookie_client_manager
return "全部平台\n" + "\n".join(
[
f"{platform_name}: {platform.name}"
for platform_name, platform in platform_manager.items()
if is_cookie_client_manager(platform.site.client_mgr)
]
)
add_reply_on_input_cookie = "请输入 Cookie"