This commit is contained in:
Azide 2022-03-13 21:30:00 +08:00
commit 14a5cd84e5
34 changed files with 2575 additions and 8278 deletions

View File

@ -8,6 +8,9 @@ workflows:
- continuation/continue:
configuration_path: ".circleci/main.yml"
parameters: /home/circleci/params.json
filters:
tags:
only: /.*/
pre-steps:
- run:
command: |

View File

@ -14,7 +14,7 @@ orbs:
node: circleci/node@4.7.0
# poetry: frameio/poetry@0.21.0
swissknife: roopakv/swissknife@0.59.0
docker: circleci/docker@1.7.0
docker: circleci/docker@2.0.2
docker-cache: cci-x/docker-registry-image-cache@0.2.0
codecov: codecov/codecov@3.2.2
@ -151,8 +151,8 @@ jobs:
test:
docker:
- image: cimg/python:3.9
- image: browserless/chrome
# environment:
environment:
BISON_SKIP_BROWSER_CHECK: true
# BISON_BROWSER: wsc://localhost:3000
steps:
- checkout
@ -164,7 +164,8 @@ jobs:
command: poetry run playwright install-deps && poetry run playwright install chromium
- run:
name: Coverage test
command: poetry run pytest --cov-report html --cov-report xml --cov=./src/plugins/nonebot_bison --junitxml=test-results/junit.xml -k 'not compare'
command: poetry run pytest --cov-report html --cov-report xml --cov=./src/plugins/nonebot_bison --junitxml=test-results/junit.xml
-k 'not compare and not render'
- store_test_results:
path: test-results
- run:
@ -198,7 +199,7 @@ jobs:
- attach_workspace:
at: .
- run:
command: poetry publish -u $PYPI_USERNAME -p $PYPI_PASSWORD
command: poetry publish -u $PYPI_USERNAME -p $PYPI_PASSWORD || echo "Already pushed to pypi"
name: Publish to Pypi
publish-github-release:
@ -212,6 +213,5 @@ jobs:
name: Publish to Github Release
command: |
go install github.com/tcnksm/ghr@latest
VERSION=$(cat pyproject.toml | grep version | sed 's/version = "\([0-9\.]*\)"/\1/')
ghr -t ${GITHUB_TOKEN} -u ${CIRCLE_PROJECT_USERNAME} -r ${CIRCLE_PROJECT_REPONAME} -c ${CIRCLE_SHA1} -delete ${VERSION} ./dist
ghr -t ${GITHUB_TOKEN} -u ${CIRCLE_PROJECT_USERNAME} -r ${CIRCLE_PROJECT_REPONAME} -c ${CIRCLE_SHA1} -delete ${CIRCLE_TAG} ./dist

1
.gitignore vendored
View File

@ -274,6 +274,7 @@ dist
# vuepress build output
.vuepress/dist
docs/.vuepress/.temp/
# Serverless directories
.serverless/

View File

@ -61,3 +61,10 @@
## [0.5.0]
- 添加了 FF14
- 去掉了自己维护的 playwright转向[nonebot-plugin-htmlrender](https://github.com/kexue-z/nonebot-plugin-htmlrender)
- 支持了 nonebot 2.0.0beta
## [0.5.1]
- 使用了新的私聊进行群管理的方式
- 默认关闭自动重发功能

View File

@ -64,6 +64,8 @@ yarn && yarn build
本项目使用了 Python 3.9 的语法,请将 Python 版本升级到 3.9 及以上,推荐使用 docker 部署
2. bot 不理我
请确认自己是群主或者管理员,并且检查`COMMAND_START`环境变量是否设为`[""]`
或者按照`COMMAND_START`中的设置添加命令前缀,例:
`COMMAND_START=["/"]`则应发送`/添加订阅`
3. 微博漏订阅了
微博更新了新的风控措施,某些含有某些关键词的微博会获取不到。

View File

@ -14,7 +14,6 @@ services:
HOST: 0.0.0.0
# SUPERUSERS: '[<your QQ>]'
BISON_CONFIG_PATH: /data
BISON_BROWSER: ws://browserless:3000
# BISON_OUTER_URL: 'http://<your server ip>:8080/bison'
BISON_FILTER_LOG: true
BISON_USE_PIC: false # 如果需要将文字转为图片发送请改为true

View File

@ -10,7 +10,7 @@ RUN apt-get update && apt-get install -y xvfb fonts-noto-color-emoji ttf-unifont
libxdamage1 libxext6 libxfixes3 libxrandr2 libxshmfence1 \
&& rm -rf /var/lib/apt/lists/*
COPY ./pyproject.toml ./poetry.lock* /app/
RUN poetry install --no-root --no-dev
RUN poetry install --no-dev --no-root
RUN playwright install chromium
ADD src /app/src
ADD bot.py /app/

View File

@ -11,8 +11,8 @@ RUN apt-get update && apt-get install -y xvfb fonts-noto-color-emoji ttf-unifont
&& rm -rf /var/lib/apt/lists/*
COPY ./pyproject.toml ./poetry.lock* ./bot.py /app/
RUN poetry add nonebot-plugin-sentry && \
sed '/nonebot.load_builtin_plugins()/a nonebot.load_plugin("nonebot_plugin_sentry")' -i bot.py
RUN poetry install --no-root --no-dev
sed '/nonebot.load_builtin_plugins("echo")/a nonebot.load_plugin("nonebot_plugin_sentry")' -i bot.py
RUN poetry install --no-dev --no-root
RUN playwright install chromium
ADD src /app/src
ENV HOST=0.0.0.0

View File

@ -2,7 +2,7 @@ module.exports = {
title: 'Nonebot Bison',
description: 'Docs for Nonebot Bison',
themeConfig: {
nav: [
navbar: [
{ text: '主页', link: '/' },
{ text: '部署与使用', link: '/usage/' },
{ text: '开发', link: '/dev/' },

View File

@ -42,7 +42,7 @@ sidebar: auto
servers:
- ws-reverse:
universal: ws://nonebot:8080/cqhttp/ws # 将这个字段写为这个值
universal: ws://nonebot:8080/onebot/v11/ws/ # 将这个字段写为这个值
```
3. 登录 go-cqhttp
@ -107,18 +107,40 @@ sidebar: auto
截止发布时,本项目尚不能完全与 browserless 兼容,目前建议使用镜像内自带的浏览器,即
不要配置这个变量
:::
- `BISON_SKIP_BROWSER_CHECK`: 是否在启动时自动下载浏览器,如果选择`False`会在用到浏览器时自动下载,
默认`True`
- `BISON_OUTER_URL`: 从外部访问服务器的地址,默认为`http://localhost:8080/bison`,如果你的插件部署
在服务器上,建议配置为`http://<你的服务器ip>:8080/bison`
- `BISON_FILTER_LOG`: 是否过滤来自`nonebot`的 warning 级以下的 log如果你的 bot 只运行了这个插件可以考虑
开启,默认关
- `BISON_USE_QUEUE`: 是否用队列的方式发送消息,降低发送频率,默认开
- `BISON_RESEND_TIMES`: 最大重发次数,默认 0
- `BISON_USE_PIC_MERGE`: 是否启用多图片时合并转发(仅限群)
- `0`: 不启用(默认)
- `1`: 首条消息单独发送,剩余图片合并转发
- `2`: 所有消息全部合并转发
::: details 配置项示例
- 当`BISON_USE_PIC_MERGE=1`时:
![simple1](./pic/forward-msg-simple1.png)
- 当`BISON_USE_PIC_MERGE=2`时:
![simple1](./pic/forward-msg-simple2.png)
:::
::: warning
启用此功能时,可能会因为待推送图片过大/过多而导致文字消息与合并转发图片消息推送间隔过大(选择模式`1`时),请谨慎考虑开启。或者选择模式`2`,使图文消息一同合并转发(可能会使消息推送延迟过长)
:::
## 使用
::: warning
本节假设`COMMAND_START`设置中包含`''`,如果出现 bot 不响应的问题,请先
排查这个设置
:::
本节假设`COMMAND_START`设置中包含`''`
- 如果出现 bot 不响应的问题,请先排查这个设置
- 尝试在命令前添加设置的命令前缀,如`COMMAND_START=['/']`,则尝试使用`/添加订阅`
:::
### 命令

Binary file not shown.

After

Width:  |  Height:  |  Size: 36 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 12 KiB

View File

@ -8,7 +8,7 @@
"license": "MIT",
"private": false,
"devDependencies": {
"vuepress": "^1.8.2"
"vuepress": "^2.0.0-beta.36"
},
"scripts": {
"docs:dev": "vuepress dev docs",

609
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
[tool.poetry]
name = "nonebot-bison"
version = "0.4.4"
version = "0.5.0"
description = "Subscribe message from social medias"
authors = ["felinae98 <felinae225@qq.com>"]
license = "MIT"
@ -33,23 +33,23 @@ pillow = "^8.1.0"
apscheduler = "^3.7.0"
expiringdict = "^1.2.1"
pyjwt = "^2.1.0"
aiofiles = "^0.7.0"
aiofiles = "^0.8.0"
python-socketio = "^5.4.0"
playwright = "^1.17.2"
nonebot-plugin-help = "^0.1.5"
nonebot-adapter-onebot = "^2.0.0-beta.1"
nonebot-plugin-htmlrender = "^0.0.4"
[tool.poetry.dev-dependencies]
ipdb = "^0.13.4"
pytest = "^6.2.4"
pytest-asyncio = "^0.16"
pytest = "^7.0.1"
pytest-asyncio = "^0.18.1"
respx = "^0.19.0"
pytest-cov = "^3.0.0"
nonebug = "^0.2.0"
nonebug = {git = "https://github.com/nonebot/nonebug.git", rev = "40fcd4f"}
black = "^22.1.0"
isort = "^5.10.1"
pre-commit = "^2.17.0"
nb-cli = "^0.6.6"
flaky = "^3.7.0"
[build-system]
requires = ["poetry>=0.12"]
@ -60,6 +60,7 @@ markers = [
"compare: compare fetching result with rsshub",
"render: render img by chrome"
]
asyncio_mode = "auto"
[tool.black]
line-length = 88

View File

@ -1,5 +1,3 @@
import nonebot
from . import (
admin_page,
config,

View File

@ -45,10 +45,10 @@ class SinglePageApplication(StaticFiles):
self.index = index
super().__init__(directory=directory, packages=None, html=True, check_dir=True)
async def lookup_path(self, path: str) -> tuple[str, Union[os.stat_result, None]]:
full_path, stat_res = await super().lookup_path(path)
def lookup_path(self, path: str) -> tuple[str, Union[os.stat_result, None]]:
full_path, stat_res = super().lookup_path(path)
if stat_res is None:
return await super().lookup_path(self.index)
return super().lookup_path(self.index)
return (full_path, stat_res)

View File

@ -1,13 +1,18 @@
from typing import Type
import asyncio
from asyncio.tasks import Task
from datetime import datetime
from typing import Optional, Type
from nonebot import on_command
from nonebot.adapters import Event as AbstractEvent
from nonebot.adapters.onebot.v11 import Bot, Event
from nonebot.adapters.onebot.v11 import Bot, Event, MessageEvent
from nonebot.adapters.onebot.v11.event import GroupMessageEvent, PrivateMessageEvent
from nonebot.adapters.onebot.v11.message import Message
from nonebot.adapters.onebot.v11.permission import GROUP_ADMIN, GROUP_OWNER
from nonebot.internal.params import ArgStr
from nonebot.internal.rule import Rule
from nonebot.log import logger
from nonebot.matcher import Matcher
from nonebot.params import Depends, EventToMe
from nonebot.params import Depends, EventMessage, EventPlainText, EventToMe
from nonebot.permission import SUPERUSER
from nonebot.rule import to_me
from nonebot.typing import T_State
@ -15,7 +20,7 @@ from nonebot.typing import T_State
from .config import Config
from .platform import check_sub_target, platform_manager
from .plugin_config import plugin_config
from .types import Category, Target
from .types import Category, Target, User
from .utils import parse_text
@ -44,7 +49,28 @@ common_platform = [
]
def ensure_user_info(matcher: Type[Matcher]):
async def _check_user_info(state: T_State):
if not state.get("target_user_info"):
await matcher.finish(
"No target_user_info set, this shouldn't happen, please issue"
)
return _check_user_info
async def set_target_user_info(event: MessageEvent, state: T_State):
if isinstance(event, GroupMessageEvent):
user = User(event.group_id, "group")
state["target_user_info"] = user
elif isinstance(event, PrivateMessageEvent):
user = User(event.user_id, "private")
state["target_user_info"] = user
def do_add_sub(add_sub: Type[Matcher]):
add_sub.handle()(ensure_user_info(add_sub))
@add_sub.handle()
async def init_promote(state: T_State):
state["_prompt"] = (
@ -57,10 +83,10 @@ def do_add_sub(add_sub: Type[Matcher]):
for platform_name in common_platform
]
)
+ "要查看全部平台请输入:“全部”"
+ "要查看全部平台请输入:“全部”\n中止订阅过程请输入:“取消”"
)
async def parse_platform(event: AbstractEvent, state: T_State) -> None:
async def parse_platform(event: MessageEvent, state: T_State) -> None:
if not isinstance(state["platform"], Message):
return
platform = str(event.get_message()).strip()
@ -72,6 +98,8 @@ def do_add_sub(add_sub: Type[Matcher]):
]
)
await add_sub.reject(message)
elif platform == "取消":
await add_sub.finish("已中止订阅")
elif platform in platform_manager:
state["platform"] = platform
else:
@ -82,27 +110,44 @@ def do_add_sub(add_sub: Type[Matcher]):
)
async def init_id(state: T_State):
if platform_manager[state["platform"]].has_target:
state[
"_prompt"
] = "请输入订阅用户的id详情查阅https://nonebot-bison.vercel.app/usage/#%E6%89%80%E6%94%AF%E6%8C%81%E5%B9%B3%E5%8F%B0%E7%9A%84uid"
state["_prompt"] = "请输入订阅用户的id:\n查询id获取方法请回复:“查询”"
else:
state["id"] = "default"
state["name"] = await platform_manager[state["platform"]].get_target_name(
Target("")
)
async def parse_id(event: AbstractEvent, state: T_State):
async def parse_id(event: MessageEvent, state: T_State):
if not isinstance(state["id"], Message):
return
target = str(event.get_message()).strip()
try:
if target == "查询":
raise LookupError
if target == "取消":
raise KeyboardInterrupt
name = await check_sub_target(state["platform"], target)
if not name:
raise ValueError
state["id"] = target
state["name"] = name
except:
except (LookupError):
url = "https://nonebot-bison.vercel.app/usage/#%E6%89%80%E6%94%AF%E6%8C%81%E5%B9%B3%E5%8F%B0%E7%9A%84-uid"
title = "Bison所支持的平台UID"
content = "查询相关平台的uid格式或获取方式"
image = "https://s3.bmp.ovh/imgs/2022/03/ab3cc45d83bd3dd3.jpg"
getId_share = f"[CQ:share,url={url},title={title},content={content},image={image}]" # 缩短字符串格式长度,以及方便后续修改为消息段格式
await add_sub.reject(Message(getId_share))
except (KeyboardInterrupt):
await add_sub.finish("已中止订阅")
except (ValueError):
await add_sub.reject("id输入错误")
else:
await add_sub.send(
"即将订阅的用户为:{} {} {}\n如有错误请输入“取消”重新订阅".format(
state["platform"], state["name"], state["id"]
)
)
@add_sub.got("id", _gen_prompt_template("{_prompt}"), [Depends(parse_id)])
async def init_cat(state: T_State):
@ -113,12 +158,14 @@ def do_add_sub(add_sub: Type[Matcher]):
" ".join(list(platform_manager[state["platform"]].categories.values()))
)
async def parser_cats(event: AbstractEvent, state: T_State):
async def parser_cats(event: MessageEvent, state: T_State):
if not isinstance(state["cats"], Message):
return
res = []
for cat in str(event.get_message()).strip().split():
if cat not in platform_manager[state["platform"]].reverse_category:
if cat == "取消":
await add_sub.finish("已中止订阅")
elif cat not in platform_manager[state["platform"]].reverse_category:
await add_sub.reject("不支持 {}".format(cat))
res.append(platform_manager[state["platform"]].reverse_category[cat])
state["cats"] = res
@ -130,9 +177,11 @@ def do_add_sub(add_sub: Type[Matcher]):
return
state["_prompt"] = '请输入要订阅的tag订阅所有tag输入"全部标签"'
async def parser_tags(event: AbstractEvent, state: T_State):
async def parser_tags(event: MessageEvent, state: T_State):
if not isinstance(state["tags"], Message):
return
if str(event.get_message()).strip() == "取消": # 一般不会有叫 取消 的tag吧
await add_sub.finish("已中止订阅")
if str(event.get_message()).strip() == "全部标签":
state["tags"] = []
else:
@ -141,9 +190,13 @@ def do_add_sub(add_sub: Type[Matcher]):
@add_sub.got("tags", _gen_prompt_template("{_prompt}"), [Depends(parser_tags)])
async def add_sub_process(event: Event, state: T_State):
config = Config()
user = state.get("target_user_info")
assert isinstance(user, User)
config.add_subscribe(
state.get("_user_id") or event.group_id,
user_type="group",
# state.get("_user_id") or event.group_id,
# user_type="group",
user=user.user,
user_type=user.user_type,
target=state["id"],
target_name=state["name"],
target_type=state["platform"],
@ -154,11 +207,17 @@ def do_add_sub(add_sub: Type[Matcher]):
def do_query_sub(query_sub: Type[Matcher]):
query_sub.handle()(ensure_user_info(query_sub))
@query_sub.handle()
async def _(event: Event, state: T_State):
async def _(state: T_State):
config: Config = Config()
user_info = state["target_user_info"]
assert isinstance(user_info, User)
sub_list = config.list_subscribe(
state.get("_user_id") or event.group_id, "group"
# state.get("_user_id") or event.group_id, "group"
user_info.user,
user_info.user_type,
)
res = "订阅的帐号为:\n"
for sub in sub_list:
@ -179,11 +238,17 @@ def do_query_sub(query_sub: Type[Matcher]):
def do_del_sub(del_sub: Type[Matcher]):
del_sub.handle()(ensure_user_info(del_sub))
@del_sub.handle()
async def send_list(bot: Bot, event: Event, state: T_State):
config: Config = Config()
user_info = state["target_user_info"]
assert isinstance(user_info, User)
sub_list = config.list_subscribe(
state.get("_user_id") or event.group_id, "group"
# state.get("_user_id") or event.group_id, "group"
user_info.user,
user_info.user_type,
)
res = "订阅的帐号为:\n"
state["sub_table"] = {}
@ -213,9 +278,13 @@ def do_del_sub(del_sub: Type[Matcher]):
try:
index = int(str(event.get_message()).strip())
config = Config()
user_info = state["target_user_info"]
assert isinstance(user_info, User)
config.del_subscribe(
state.get("_user_id") or event.group_id,
"group",
# state.get("_user_id") or event.group_id,
# "group",
user_info.user,
user_info.user_type,
**state["sub_table"][index],
)
except Exception as e:
@ -224,41 +293,19 @@ def do_del_sub(del_sub: Type[Matcher]):
await del_sub.finish("删除成功")
async def parse_group_number(event: AbstractEvent, state: T_State):
if not isinstance(state["_user_id"], Message):
return
state["_user_id"] = int(str(event.get_message()))
add_sub_matcher = on_command(
"添加订阅",
rule=configurable_to_me,
permission=GROUP_ADMIN | GROUP_OWNER | SUPERUSER,
priority=5,
)
add_sub_matcher.handle()(set_target_user_info)
do_add_sub(add_sub_matcher)
manage_add_sub_matcher = on_command("管理-添加订阅", permission=SUPERUSER, priority=5)
@manage_add_sub_matcher.got("_user_id", "群号", [Depends(parse_group_number)])
async def add_sub_handle():
pass
do_add_sub(manage_add_sub_matcher)
query_sub_matcher = on_command("查询订阅", rule=configurable_to_me, priority=5)
query_sub_matcher.handle()(set_target_user_info)
do_query_sub(query_sub_matcher)
manage_query_sub_matcher = on_command("管理-查询订阅", permission=SUPERUSER, priority=5)
@manage_query_sub_matcher.got("_user_id", "群号", [Depends(parse_group_number)])
async def query_sub_handle():
pass
do_query_sub(manage_query_sub_matcher)
del_sub_matcher = on_command(
@ -267,13 +314,117 @@ del_sub_matcher = on_command(
permission=GROUP_ADMIN | GROUP_OWNER | SUPERUSER,
priority=5,
)
del_sub_matcher.handle()(set_target_user_info)
do_del_sub(del_sub_matcher)
manage_del_sub_matcher = on_command("管理-删除订阅", permission=SUPERUSER, priority=5)
group_manage_matcher = on_command("群管理")
@manage_del_sub_matcher.got("_user_id", "群号", [Depends(parse_group_number)])
async def del_sub_handle():
pass
@group_manage_matcher.handle()
async def send_group_list(bot: Bot, state: T_State):
groups = await bot.call_api("get_group_list")
res_text = "请选择需要管理的群:\n"
group_number_idx = {}
for idx, group in enumerate(groups, 1):
group_number_idx[idx] = group["group_id"]
res_text += f'{idx}. {group["group_id"]} - {group["group_name"]}\n'
res_text += "请输入左侧序号"
# await group_manage_matcher.send(res_text)
state["_prompt"] = res_text
state["group_number_idx"] = group_number_idx
do_del_sub(manage_del_sub_matcher)
async def _parse_group_idx(state: T_State, event_msg: str = EventPlainText()):
if not isinstance(state["group_idx"], Message):
return
group_number_idx: Optional[dict[int, int]] = state.get("group_number_idx")
assert group_number_idx
try:
idx = int(event_msg)
assert idx in group_number_idx.keys()
state["group_idx"] = idx
except:
await group_manage_matcher.reject("请输入正确序号")
@group_manage_matcher.got(
"group_idx", _gen_prompt_template("{_prompt}"), [Depends(_parse_group_idx)]
)
async def do_choose_group_number(state: T_State):
group_number_idx: dict[int, int] = state["group_number_idx"]
idx: int = state["group_idx"]
group_id = group_number_idx[idx]
state["target_user_info"] = User(user=group_id, user_type="group")
async def _check_command(event_msg: str = EventPlainText()):
if event_msg not in {"添加订阅", "查询订阅", "删除订阅"}:
await group_manage_matcher.reject("请输入正确的命令")
return
@group_manage_matcher.got(
"command", "请输入需要使用的命令:添加订阅,查询订阅,删除订阅", [Depends(_check_command)]
)
async def do_dispatch_command(
bot: Bot,
event: MessageEvent,
state: T_State,
matcher: Matcher,
command: str = ArgStr(),
):
permission = await matcher.update_permission(bot, event)
new_matcher = Matcher.new(
"message",
Rule(),
permission,
None,
True,
priority=0,
block=True,
plugin=matcher.plugin,
module=matcher.module,
expire_time=datetime.now() + bot.config.session_expire_timeout,
default_state=matcher.state,
default_type_updater=matcher.__class__._default_type_updater,
default_permission_updater=matcher.__class__._default_permission_updater,
)
if command == "查询订阅":
do_query_sub(new_matcher)
elif command == "添加订阅":
do_add_sub(new_matcher)
else:
do_del_sub(new_matcher)
new_matcher_ins = new_matcher()
asyncio.create_task(new_matcher_ins.run(bot, event, state))
test_matcher = on_command("testtt")
@test_matcher.handle()
async def _handler(bot: Bot, event: Event, matcher: Matcher, state: T_State):
permission = await matcher.update_permission(bot, event)
new_matcher = Matcher.new(
"message",
Rule(),
permission,
None,
True,
priority=0,
block=True,
plugin=matcher.plugin,
module=matcher.module,
expire_time=datetime.now() + bot.config.session_expire_timeout,
default_state=matcher.state,
default_type_updater=matcher.__class__._default_type_updater,
default_permission_updater=matcher.__class__._default_permission_updater,
)
async def h():
logger.warning("yes")
await new_matcher.send("666")
new_matcher.handle()(h)
new_matcher_ins = new_matcher()
await new_matcher_ins.run(bot, event, state)

View File

@ -3,10 +3,10 @@ from typing import Any
import httpx
from bs4 import BeautifulSoup as bs
from nonebot.plugin import require
from ..post import Post
from ..types import Category, RawPost, Target
from ..utils import Render
from .platform import CategoryNotSupport, NewMessage, StatusChange
@ -46,21 +46,30 @@ class Arknights(NewMessage):
text = ""
async with httpx.AsyncClient() as client:
raw_html = await client.get(announce_url)
soup = bs(raw_html, "html.parser")
soup = bs(raw_html.text, "html.parser")
pics = []
if soup.find("div", class_="standerd-container"):
# 图文
render = Render()
viewport = {"width": 320, "height": 6400, "deviceScaleFactor": 3}
pic_data = await render.render(
announce_url, viewport=viewport, target="div.main"
require("nonebot_plugin_htmlrender")
from nonebot_plugin_htmlrender import capture_element
pic_data = await capture_element(
announce_url,
"div.main",
viewport={"width": 320, "height": 6400},
device_scale_factor=3,
)
# render = Render()
# viewport = {"width": 320, "height": 6400, "deviceScaleFactor": 3}
# pic_data = await render.render(
# announce_url, viewport=viewport, target="div.main"
# )
if pic_data:
pics.append(pic_data)
else:
text = "图片渲染失败"
elif pic := soup.find("img", class_="banner-image"):
pics.append(pic["src"])
pics.append(pic["src"]) # type: ignore
else:
raise CategoryNotSupport()
return Post(

View File

@ -1,77 +0,0 @@
import hashlib
import json
import re
from datetime import datetime
from typing import Any, Optional
import httpx
from bs4 import BeautifulSoup as bs
from ..types import *
# from .platform import Platform
# class Wechat(Platform):
# categories = {}
# enable_tag = False
# platform_name = 'wechat'
# enabled = False
# is_common = False
# name = '微信公众号'
# @classmethod
# def _get_query_url(cls, target: Target):
# return 'https://weixin.sogou.com/weixin?type=1&s_from=input&query={}&ie=utf8&_sug_=n&_sug_type_='.format(target)
# @classmethod
# async def _get_target_soup(cls, target: Target) -> Optional[bs]:
# target_url = cls._get_query_url(target)
# async with httpx.AsyncClient() as client:
# res = await client.get(target_url)
# soup = bs(res.text, 'html.parser')
# blocks = soup.find(class_='news-list2').find_all('li',recursive=False)
# for block in blocks:
# if block.find(string=[target]):
# return block
# @classmethod
# async def get_account_name(cls, target: Target) -> Optional[str]:
# if not (block := await cls._get_target_soup(target)):
# return None
# return block.find('p', class_='tit').find('a').text
# async def get_sub_list(self, target: Target) -> list[RawPost]:
# block = await self._get_target_soup(target)
# if (last_post_dt := block.find('dt', string='最近文章:')):
# post = {
# 'title': last_post_dt.find_parent().find('a').text,
# 'target': target,
# 'page_url': self._get_query_url(target),
# 'name': block.find('p', class_='tit').find('a').text
# }
# return [post]
# else:
# return []
# def get_id(self, post: RawPost) -> Any:
# return post['title']
# def get_date(self, post: RawPost):
# return None
# def get_tags(self, post: RawPost):
# return None
# def get_category(self, post: RawPost):
# return None
# async def parse(self, raw_post: RawPost) -> Post:
# # TODO get content of post
# return Post(target_type='wechat',
# text='{}\n详细内容请自行查看公众号'.format(raw_post['title']),
# target_name=raw_post['name'],
# pics=[],
# url=''
# )

View File

@ -1,5 +1,3 @@
import warnings
import nonebot
from pydantic import BaseSettings
@ -8,15 +6,15 @@ class PlugConfig(BaseSettings):
bison_config_path: str = ""
bison_use_pic: bool = False
bison_use_local: bool = False
bison_use_forward_pic = True#当图片超过1张时改为合并消息发送
#bison_browser: str = "local:C:\Program Files (x86)\Microsoft\Edge\Application\msedge.exe"
bison_browser: str = ""
bison_init_filter: bool = True
bison_use_queue: bool = True
bison_outer_url: str = "http://localhost:15556/bison/"
bison_filter_log: bool = False
bison_to_me: bool = True
bison_skip_browser_check: bool = False
bison_use_pic_merge: int = 0 # 多图片时启用图片合并转发(仅限群),当bison_use_queue为False时该配置不会生效
# 0不启用1首条消息单独发送剩余照片合并转发2以及以上所有消息全部合并转发
bison_resend_times: int = 0
class Config:
extra = "ignore"
@ -24,5 +22,3 @@ class PlugConfig(BaseSettings):
global_config = nonebot.get_driver().config
plugin_config = PlugConfig(**global_config.dict())
if plugin_config.bison_use_local:
warnings.warn("BISON_USE_LOCAL is deprecated, please use BISON_BROWSER")

View File

@ -2,23 +2,36 @@ from email import message
import time
from typing import List, Literal, Union
from nonebot.adapters import Message, MessageSegment
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message, MessageSegment
from nonebot.log import logger
from .plugin_config import plugin_config
QUEUE = []
QUEUE: list[
tuple[
Bot,
int,
Literal["private", "group", "group-forward"],
Union[str, Message],
int,
]
] = []
LAST_SEND_TIME = time.time()
async def _do_send(
bot: "Bot", user: str, user_type: str, msg: Union[str, Message, MessageSegment]
bot: "Bot",
user: int,
user_type: Literal["group", "private", "group-forward"],
msg: Union[str, Message],
):
if user_type == "group":
await bot.call_api("send_group_msg", group_id=user, message=msg)
await bot.send_group_msg(group_id=user, message=msg)
elif user_type == "private":
await bot.call_api("send_private_msg", user_id=user, message=msg)
await bot.send_private_msg(user_id=user, message=msg)
elif user_type == "group-forward":
await bot.send_group_forward_msg(group_id=user, messages=msg)
async def _do_send_forward(
bot: "Bot", user: str, msgs: list
@ -56,13 +69,40 @@ async def do_send_msgs():
LAST_SEND_TIME = time.time()
async def send_msgs(bot: Bot, user, user_type: Literal["private", "group"], msgs: list):
if plugin_config.bison_use_forward_pic and user_type!="private":
if msgs:
_do_send_forward(bot, user, msgs)
elif plugin_config.bison_use_queue:
for msg in msgs:
QUEUE.append((bot, user, user_type, msg, 2))
async def _send_msgs_dispatch(
bot: Bot,
user,
user_type: Literal["private", "group", "group-forward"],
msg: Union[str, Message],
):
if plugin_config.bison_use_queue:
QUEUE.append((bot, user, user_type, msg, plugin_config.bison_resend_times))
else:
await _do_send(bot, user, user_type, msg)
async def send_msgs(bot: Bot, user, user_type: Literal["private", "group"], msgs: list):
if not plugin_config.bison_use_pic_merge or user_type == "private":
for msg in msgs:
await _do_send(bot, user, user_type, msg)
await _send_msgs_dispatch(bot, user, user_type, msg)
return
if plugin_config.bison_use_pic_merge == 1:
await _send_msgs_dispatch(bot, user, "group", msgs.pop(0))
if msgs:
if len(msgs) == 1: # 只有一条消息序列就不合并转发
await _send_msgs_dispatch(bot, user, "group", msgs.pop(0))
else:
group_bot_info = await bot.get_group_member_info(
group_id=user, user_id=int(bot.self_id), no_cache=True
) # 调用api获取群内bot的相关参数
forward_msg = Message(
[
MessageSegment.node_custom(
group_bot_info["user_id"],
nickname=group_bot_info["card"] or group_bot_info["nickname"],
content=msg,
)
for msg in msgs
]
)
await _send_msgs_dispatch(bot, user, "group-forward", forward_msg)

View File

@ -9,7 +9,7 @@ Tag = str
@dataclass(eq=True, frozen=True)
class User:
user: str
user: int
user_type: Literal["group", "private"]

View File

@ -1,23 +1,12 @@
import asyncio
import base64
import os
import platform
import re
import subprocess
import sys
from html import escape
from pathlib import Path
from time import asctime
from typing import Awaitable, Callable, Optional, Union
from typing import Union
import nonebot
from bs4 import BeautifulSoup as bs
from nonebot.adapters.onebot.v11.message import MessageSegment
from nonebot.log import default_format, logger
from playwright._impl._driver import compute_driver_executable
from playwright.async_api import Browser, Page, Playwright, async_playwright
from uvicorn import config
from uvicorn.loops import asyncio as _asyncio
from nonebot.plugin import require
from .plugin_config import plugin_config
@ -31,158 +20,21 @@ class Singleton(type):
return cls._instances[cls]
@nonebot.get_driver().on_startup
def download_browser():
if not plugin_config.bison_browser and not plugin_config.bison_use_local:
system = platform.system()
if system == "Linux":
browser_path = Path.home() / ".cache" / "ms-playwright"
elif system == "Windows":
browser_path = Path.home() / "AppData" / "Local" / "ms-playwright"
else:
raise RuntimeError("platform not supported")
if browser_path.exists() and os.listdir(str(browser_path)):
logger.warning("Browser Exists, skip")
return
env = os.environ.copy()
driver_executable = compute_driver_executable()
env["PW_CLI_TARGET_LANG"] = "python"
subprocess.run([str(driver_executable), "install", "chromium"], env=env)
class Render(metaclass=Singleton):
def __init__(self):
self.lock = asyncio.Lock()
self.browser: Browser
self.interval_log = ""
self.remote_browser = False
async def get_browser(self, playwright: Playwright) -> Browser:
if plugin_config.bison_browser:
if plugin_config.bison_browser.startswith("local:"):
path = plugin_config.bison_browser.split("local:", 1)[1]
return await playwright.chromium.launch(
executable_path=path, args=["--no-sandbox"]
)
if plugin_config.bison_browser.startswith("ws:"):
self.remote_browser = True
return await playwright.chromium.connect(plugin_config.bison_browser)
if plugin_config.bison_browser.startswith("wsc:"):
self.remote_browser = True
return await playwright.chromium.connect_over_cdp(
"ws:" + plugin_config.bison_browser[4:]
)
raise RuntimeError("bison_BROWSER error")
if plugin_config.bison_use_local:
return await playwright.chromium.launch(
executable_path="/usr/bin/chromium", args=["--no-sandbox"]
)
return await playwright.chromium.launch(args=["--no-sandbox"])
async def close_browser(self):
if not self.remote_browser:
await self.browser.close()
async def render(
self,
url: str,
viewport: Optional[dict] = None,
target: Optional[str] = None,
operation: Optional[Callable[[Page], Awaitable[None]]] = None,
) -> Optional[bytes]:
retry_times = 0
self.interval_log = ""
while retry_times < 3:
try:
return await asyncio.wait_for(
self.do_render(url, viewport, target, operation), 20
)
except asyncio.TimeoutError:
retry_times += 1
logger.warning(
"render error {}\n".format(retry_times) + self.interval_log
)
self.interval_log = ""
# if self.browser:
# await self.browser.close()
# self.lock.release()
def _inter_log(self, message: str) -> None:
self.interval_log += asctime() + "" + message + "\n"
async def do_render(
self,
url: str,
viewport: Optional[dict] = None,
target: Optional[str] = None,
operation: Optional[Callable[[Page], Awaitable[None]]] = None,
) -> Optional[bytes]:
async with self.lock:
async with async_playwright() as playwright:
self.browser = await self.get_browser(playwright)
self._inter_log("open browser")
if viewport:
constext = await self.browser.new_context(
viewport={
"width": viewport["width"],
"height": viewport["height"],
},
device_scale_factor=viewport.get("deviceScaleFactor", 1),
)
page = await constext.new_page()
else:
page = await self.browser.new_page()
if operation:
await operation(page)
else:
await page.goto(url)
self._inter_log("open page")
if target:
target_ele = page.locator(target)
if not target_ele:
return None
data = await target_ele.screenshot(type="jpeg")
else:
data = await page.screenshot(type="jpeg")
self._inter_log("screenshot")
await page.close()
self._inter_log("close page")
await self.close_browser()
self._inter_log("close browser")
assert isinstance(data, bytes)
return data
async def text_to_pic(self, text: str) -> Optional[bytes]:
lines = text.split("\n")
parsed_lines = list(map(lambda x: "<p>{}</p>".format(escape(x)), lines))
html_text = '<div style="width:17em;padding:1em">{}</div>'.format(
"".join(parsed_lines)
)
url = "data:text/html;charset=UTF-8;base64,{}".format(
base64.b64encode(html_text.encode()).decode()
)
data = await self.render(url, target="div")
return data
async def text_to_pic_cqcode(self, text: str) -> MessageSegment:
data = await self.text_to_pic(text)
# logger.debug('file size: {}'.format(len(data)))
if data:
# logger.debug(code)
return MessageSegment.image(data)
else:
return MessageSegment.text("生成图片错误")
async def parse_text(text: str) -> MessageSegment:
"return raw text if don't use pic, otherwise return rendered opcode"
if plugin_config.bison_use_pic:
render = Render()
return await render.text_to_pic_cqcode(text)
require("nonebot_plugin_htmlrender")
from nonebot_plugin_htmlrender import text_to_pic as _text_to_pic
return MessageSegment.image(await _text_to_pic(text))
else:
return MessageSegment.text(text)
if not plugin_config.bison_skip_browser_check:
require("nonebot_plugin_htmlrender")
def html_to_text(html: str, query_dict: dict = {}) -> str:
html = re.sub(r"<br\s*/?>", "<br>\n", html)
html = html.replace("</p>", "</p>\n")
@ -233,19 +85,3 @@ if plugin_config.bison_filter_log:
if config.log_level is None
else config.log_level
)
# monkey patch
def asyncio_setup():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
@property
def should_reload(self):
return False
if platform.system() == "Windows":
_asyncio.asyncio_setup = asyncio_setup
config.Config.should_reload = should_reload # type:ignore
logger.warning("检测到当前为 Windows 系统,已自动注入猴子补丁")

View File

@ -1,3 +1,4 @@
import asyncio
import typing
from pathlib import Path
@ -13,6 +14,9 @@ async def app(nonebug_init: None, tmp_path: Path, monkeypatch: pytest.MonkeyPatc
config = nonebot.get_driver().config
config.bison_config_path = str(tmp_path)
config.command_start = {""}
config.superusers = {"10001"}
config.log_level = "TRACE"
config.bison_filter_log = False
return App(monkeypatch)
@ -20,5 +24,24 @@ async def app(nonebug_init: None, tmp_path: Path, monkeypatch: pytest.MonkeyPatc
def dummy_user_subinfo(app: App):
from nonebot_bison.types import User, UserSubInfo
user = User("123", "group")
user = User(123, "group")
return UserSubInfo(user=user, category_getter=lambda _: [], tag_getter=lambda _: [])
@pytest.fixture
def task_watchdog(request):
def cancel_test_on_exception(task: asyncio.Task):
def maybe_cancel_clbk(t: asyncio.Task):
exception = t.exception()
if exception is None:
return
for task in asyncio.all_tasks():
coro = task.get_coro()
if coro.__qualname__ == request.function.__qualname__:
task.cancel()
return
task.add_done_callback(maybe_cancel_clbk)
return cancel_test_on_exception

View File

@ -0,0 +1,283 @@
import pytest
import respx
from httpx import Response
from nonebug.app import App
from .platforms.utils import get_json
from .utils import BotReply, fake_admin_user, fake_group_message_event
# 选择platform阶段中止
@pytest.mark.asyncio
@respx.mock
async def test_abort_add_on_platform(app: App):
from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config import Config
from nonebot_bison.config_manager import add_sub_matcher, common_platform
from nonebot_bison.platform import platform_manager
config = Config()
config.user_target.truncate()
ak_list_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=1005056279793937"
)
ak_list_router.mock(
return_value=Response(200, json=get_json("weibo_ak_profile.json"))
)
ak_list_bad_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=100505000"
)
ak_list_bad_router.mock(
return_value=Response(200, json=get_json("weibo_err_profile.json"))
)
async with app.test_matcher(add_sub_matcher) as ctx:
bot = ctx.create_bot()
event_1 = fake_group_message_event(
message=Message("添加订阅"),
sender=Sender(card="", nickname="test", role="admin"),
to_me=True,
)
ctx.receive_event(bot, event_1)
ctx.should_pass_rule()
ctx.should_call_send(
event_1,
Message(BotReply.add_reply_on_platform(platform_manager, common_platform)),
True,
)
event_abort = fake_group_message_event(
message=Message("取消"), sender=Sender(card="", nickname="test", role="admin")
)
ctx.receive_event(bot, event_abort)
ctx.should_call_send(
event_abort,
BotReply.add_reply_abort,
True,
)
ctx.should_finished()
# 输入id阶段中止
@pytest.mark.asyncio
@respx.mock
async def test_abort_add_on_id(app: App):
from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config import Config
from nonebot_bison.config_manager import add_sub_matcher, common_platform
from nonebot_bison.platform import platform_manager
config = Config()
config.user_target.truncate()
ak_list_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=1005056279793937"
)
ak_list_router.mock(
return_value=Response(200, json=get_json("weibo_ak_profile.json"))
)
ak_list_bad_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=100505000"
)
ak_list_bad_router.mock(
return_value=Response(200, json=get_json("weibo_err_profile.json"))
)
async with app.test_matcher(add_sub_matcher) as ctx:
bot = ctx.create_bot()
event_1 = fake_group_message_event(
message=Message("添加订阅"),
sender=Sender(card="", nickname="test", role="admin"),
to_me=True,
)
ctx.receive_event(bot, event_1)
ctx.should_pass_rule()
ctx.should_call_send(
event_1,
Message(BotReply.add_reply_on_platform(platform_manager, common_platform)),
True,
)
event_2 = fake_group_message_event(
message=Message("weibo"), sender=fake_admin_user
)
ctx.receive_event(bot, event_2)
ctx.should_call_send(
event_2,
Message(BotReply.add_reply_on_id),
True,
)
event_abort = fake_group_message_event(
message=Message("取消"), sender=Sender(card="", nickname="test", role="admin")
)
ctx.receive_event(bot, event_abort)
ctx.should_call_send(
event_abort,
BotReply.add_reply_abort,
True,
)
ctx.should_finished()
# 输入订阅类别阶段中止
@pytest.mark.asyncio
@respx.mock
async def test_abort_add_on_cats(app: App):
from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config import Config
from nonebot_bison.config_manager import add_sub_matcher, common_platform
from nonebot_bison.platform import platform_manager
config = Config()
config.user_target.truncate()
ak_list_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=1005056279793937"
)
ak_list_router.mock(
return_value=Response(200, json=get_json("weibo_ak_profile.json"))
)
ak_list_bad_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=100505000"
)
ak_list_bad_router.mock(
return_value=Response(200, json=get_json("weibo_err_profile.json"))
)
async with app.test_matcher(add_sub_matcher) as ctx:
bot = ctx.create_bot()
event_1 = fake_group_message_event(
message=Message("添加订阅"),
sender=Sender(card="", nickname="test", role="admin"),
to_me=True,
)
ctx.receive_event(bot, event_1)
ctx.should_pass_rule()
ctx.should_call_send(
event_1,
Message(
BotReply.add_reply_on_platform(
platform_manager=platform_manager, common_platform=common_platform
)
),
True,
)
event_2 = fake_group_message_event(
message=Message("weibo"), sender=fake_admin_user
)
ctx.receive_event(bot, event_2)
ctx.should_call_send(
event_2,
Message(BotReply.add_reply_on_id),
True,
)
event_3 = fake_group_message_event(
message=Message("6279793937"), sender=fake_admin_user
)
ctx.receive_event(bot, event_3)
ctx.should_call_send(
event_3,
BotReply.add_reply_on_target_confirm(
"weibo", "明日方舟Arknights", "6279793937"
),
True,
)
ctx.should_call_send(
event_3,
Message(BotReply.add_reply_on_cats(platform_manager, "weibo")),
True,
)
event_abort = fake_group_message_event(
message=Message("取消"), sender=Sender(card="", nickname="test", role="admin")
)
ctx.receive_event(bot, event_abort)
ctx.should_call_send(
event_abort,
BotReply.add_reply_abort,
True,
)
ctx.should_finished()
# 输入标签阶段中止
@pytest.mark.asyncio
@respx.mock
async def test_abort_add_on_tag(app: App):
from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config import Config
from nonebot_bison.config_manager import add_sub_matcher, common_platform
from nonebot_bison.platform import platform_manager
config = Config()
config.user_target.truncate()
ak_list_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=1005056279793937"
)
ak_list_router.mock(
return_value=Response(200, json=get_json("weibo_ak_profile.json"))
)
ak_list_bad_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=100505000"
)
ak_list_bad_router.mock(
return_value=Response(200, json=get_json("weibo_err_profile.json"))
)
async with app.test_matcher(add_sub_matcher) as ctx:
bot = ctx.create_bot()
event_1 = fake_group_message_event(
message=Message("添加订阅"),
sender=Sender(card="", nickname="test", role="admin"),
to_me=True,
)
ctx.receive_event(bot, event_1)
ctx.should_pass_rule()
ctx.should_call_send(
event_1,
Message(
BotReply.add_reply_on_platform(
platform_manager=platform_manager, common_platform=common_platform
)
),
True,
)
event_2 = fake_group_message_event(
message=Message("weibo"), sender=fake_admin_user
)
ctx.receive_event(bot, event_2)
ctx.should_call_send(
event_2,
Message(BotReply.add_reply_on_id),
True,
)
event_3 = fake_group_message_event(
message=Message("6279793937"), sender=fake_admin_user
)
ctx.receive_event(bot, event_3)
ctx.should_call_send(
event_3,
BotReply.add_reply_on_target_confirm(
"weibo", "明日方舟Arknights", "6279793937"
),
True,
)
ctx.should_call_send(
event_3,
Message(BotReply.add_reply_on_cats(platform_manager, "weibo")),
True,
)
event_4 = fake_group_message_event(
message=Message("图文 文字"), sender=fake_admin_user
)
ctx.receive_event(bot, event_4)
ctx.should_call_send(event_4, Message(BotReply.add_reply_on_tags), True)
event_abort = fake_group_message_event(
message=Message("取消"), sender=Sender(card="", nickname="test", role="admin")
)
ctx.receive_event(bot, event_abort)
ctx.should_call_send(
event_abort,
BotReply.add_reply_abort,
True,
)
ctx.should_finished()

View File

@ -4,7 +4,7 @@ from httpx import Response
from nonebug.app import App
from .platforms.utils import get_json
from .utils import fake_admin_user, fake_group_message_event
from .utils import BotReply, fake_admin_user, fake_group_message_event
@pytest.mark.asyncio
@ -49,18 +49,7 @@ async def test_configurable_at_me_false(app: App):
ctx.receive_event(bot, event)
ctx.should_call_send(
event,
Message(
"请输入想要订阅的平台,目前支持,请输入冒号左边的名称:\n"
+ "".join(
[
"{}{}\n".format(
platform_name, platform_manager[platform_name].name
)
for platform_name in common_platform
]
)
+ "要查看全部平台请输入:“全部”"
),
Message(BotReply.add_reply_on_platform(platform_manager, common_platform)),
True,
)
ctx.should_pass_rule()
@ -104,16 +93,9 @@ async def test_add_with_target(app: App):
ctx.should_call_send(
event_1,
Message(
"请输入想要订阅的平台,目前支持,请输入冒号左边的名称:\n"
+ "".join(
[
"{}{}\n".format(
platform_name, platform_manager[platform_name].name
)
for platform_name in common_platform
]
BotReply.add_reply_on_platform(
platform_manager=platform_manager, common_platform=common_platform
)
+ "要查看全部平台请输入:“全部”"
),
True,
)
@ -124,15 +106,7 @@ async def test_add_with_target(app: App):
ctx.should_rejected()
ctx.should_call_send(
event_2,
(
"全部平台\n"
+ "\n".join(
[
"{}{}".format(platform_name, platform.name)
for platform_name, platform in platform_manager.items()
]
)
),
BotReply.add_reply_on_platform_input_allplatform(platform_manager),
True,
)
event_3 = fake_group_message_event(
@ -141,16 +115,14 @@ async def test_add_with_target(app: App):
ctx.receive_event(bot, event_3)
ctx.should_call_send(
event_3,
Message(
"请输入订阅用户的id详情查阅https://nonebot-bison.vercel.app/usage/#%E6%89%80%E6%94%AF%E6%8C%81%E5%B9%B3%E5%8F%B0%E7%9A%84uid"
),
Message(BotReply.add_reply_on_id),
True,
)
event_4_err = fake_group_message_event(
message=Message("000"), sender=fake_admin_user
)
ctx.receive_event(bot, event_4_err)
ctx.should_call_send(event_4_err, "id输入错误", True)
ctx.should_call_send(event_4_err, BotReply.add_reply_on_id_input_error, True)
ctx.should_rejected()
event_4_ok = fake_group_message_event(
message=Message("6279793937"), sender=fake_admin_user
@ -158,29 +130,36 @@ async def test_add_with_target(app: App):
ctx.receive_event(bot, event_4_ok)
ctx.should_call_send(
event_4_ok,
Message(
"请输入要订阅的类别,以空格分隔,支持的类别有:{}".format(
" ".join(list(platform_manager["weibo"].categories.values()))
)
BotReply.add_reply_on_target_confirm(
"weibo", "明日方舟Arknights", "6279793937"
),
True,
)
ctx.should_call_send(
event_4_ok,
Message(BotReply.add_reply_on_cats(platform_manager, "weibo")),
True,
)
event_5_err = fake_group_message_event(
message=Message("图文 文字 err"), sender=fake_admin_user
)
ctx.receive_event(bot, event_5_err)
ctx.should_call_send(event_5_err, "不支持 err", True)
ctx.should_call_send(
event_5_err, BotReply.add_reply_on_cats_input_error("err"), True
)
ctx.should_rejected()
event_5_ok = fake_group_message_event(
message=Message("图文 文字"), sender=fake_admin_user
)
ctx.receive_event(bot, event_5_ok)
ctx.should_call_send(event_5_ok, Message('请输入要订阅的tag订阅所有tag输入"全部标签"'), True)
ctx.should_call_send(event_5_ok, Message(BotReply.add_reply_on_tags), True)
event_6 = fake_group_message_event(
message=Message("全部标签"), sender=fake_admin_user
)
ctx.receive_event(bot, event_6)
ctx.should_call_send(event_6, ("添加 明日方舟Arknights 成功"), True)
ctx.should_call_send(
event_6, BotReply.add_reply_subscribe_success("明日方舟Arknights"), True
)
ctx.should_finished()
subs = config.list_subscribe(10000, "group")
assert len(subs) == 1
@ -220,18 +199,7 @@ async def test_add_with_target_no_cat(app: App):
ctx.should_pass_rule()
ctx.should_call_send(
event_1,
Message(
"请输入想要订阅的平台,目前支持,请输入冒号左边的名称:\n"
+ "".join(
[
"{}{}\n".format(
platform_name, platform_manager[platform_name].name
)
for platform_name in common_platform
]
)
+ "要查看全部平台请输入:“全部”"
),
Message(BotReply.add_reply_on_platform(platform_manager, common_platform)),
True,
)
event_3 = fake_group_message_event(
@ -240,16 +208,21 @@ async def test_add_with_target_no_cat(app: App):
ctx.receive_event(bot, event_3)
ctx.should_call_send(
event_3,
Message(
"请输入订阅用户的id详情查阅https://nonebot-bison.vercel.app/usage/#%E6%89%80%E6%94%AF%E6%8C%81%E5%B9%B3%E5%8F%B0%E7%9A%84uid"
),
Message(BotReply.add_reply_on_id),
True,
)
event_4_ok = fake_group_message_event(
message=Message("32540734"), sender=fake_admin_user
)
ctx.receive_event(bot, event_4_ok)
ctx.should_call_send(event_4_ok, ("添加 塞壬唱片-MSR 成功"), True)
ctx.should_call_send(
event_4_ok,
BotReply.add_reply_on_target_confirm("ncm-artist", "塞壬唱片-MSR", "32540734"),
True,
)
ctx.should_call_send(
event_4_ok, BotReply.add_reply_subscribe_success("塞壬唱片-MSR"), True
)
ctx.should_finished()
subs = config.list_subscribe(10000, "group")
assert len(subs) == 1
@ -284,18 +257,7 @@ async def test_add_no_target(app: App):
ctx.should_pass_rule()
ctx.should_call_send(
event_1,
Message(
"请输入想要订阅的平台,目前支持,请输入冒号左边的名称:\n"
+ "".join(
[
"{}{}\n".format(
platform_name, platform_manager[platform_name].name
)
for platform_name in common_platform
]
)
+ "要查看全部平台请输入:“全部”"
),
Message(BotReply.add_reply_on_platform(platform_manager, common_platform)),
True,
)
event_3 = fake_group_message_event(
@ -304,18 +266,16 @@ async def test_add_no_target(app: App):
ctx.receive_event(bot, event_3)
ctx.should_call_send(
event_3,
Message(
"请输入要订阅的类别,以空格分隔,支持的类别有:{}".format(
" ".join(list(platform_manager["arknights"].categories.values()))
)
),
Message(BotReply.add_reply_on_cats(platform_manager, "arknights")),
True,
)
event_4 = fake_group_message_event(
message=Message("游戏公告"), sender=fake_admin_user
)
ctx.receive_event(bot, event_4)
ctx.should_call_send(event_4, ("添加 明日方舟游戏信息 成功"), True)
ctx.should_call_send(
event_4, BotReply.add_reply_subscribe_success("明日方舟游戏信息"), True
)
ctx.should_finished()
subs = config.list_subscribe(10000, "group")
assert len(subs) == 1
@ -348,18 +308,7 @@ async def test_platform_name_err(app: App):
ctx.should_pass_rule()
ctx.should_call_send(
event_1,
Message(
"请输入想要订阅的平台,目前支持,请输入冒号左边的名称:\n"
+ "".join(
[
"{}{}\n".format(
platform_name, platform_manager[platform_name].name
)
for platform_name in common_platform
]
)
+ "要查看全部平台请输入:“全部”"
),
Message(BotReply.add_reply_on_platform(platform_manager, common_platform)),
True,
)
event_2 = fake_group_message_event(
@ -370,86 +319,87 @@ async def test_platform_name_err(app: App):
ctx.should_rejected()
ctx.should_call_send(
event_2,
"平台输入错误",
BotReply.add_reply_on_platform_input_error,
True,
)
@pytest.mark.asyncio
async def test_query_sub(app: App):
from nonebot.adapters.onebot.v11.message import Message
@respx.mock
async def test_add_with_get_id(app: App):
from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11.message import Message, MessageSegment
from nonebot_bison.config import Config
from nonebot_bison.config_manager import query_sub_matcher
from nonebot_bison.config_manager import add_sub_matcher, common_platform
from nonebot_bison.platform import platform_manager
config = Config()
config.user_target.truncate()
config.add_subscribe(
10000,
"group",
"6279793937",
"明日方舟Arknights",
"weibo",
[platform_manager["weibo"].reverse_category["图文"]],
["明日方舟"],
ak_list_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=1005056279793937"
)
async with app.test_matcher(query_sub_matcher) as ctx:
ak_list_router.mock(
return_value=Response(200, json=get_json("weibo_ak_profile.json"))
)
ak_list_bad_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=100505000"
)
ak_list_bad_router.mock(
return_value=Response(200, json=get_json("weibo_err_profile.json"))
)
async with app.test_matcher(add_sub_matcher) as ctx:
bot = ctx.create_bot()
event = fake_group_message_event(message=Message("查询订阅"), to_me=True)
ctx.receive_event(bot, event)
ctx.should_pass_rule()
ctx.should_pass_permission()
ctx.should_call_send(
event, Message("订阅的帐号为:\nweibo 明日方舟Arknights 6279793937 [图文] 明日方舟\n"), True
event_1 = fake_group_message_event(
message=Message("添加订阅"),
sender=Sender(card="", nickname="test", role="admin"),
to_me=True,
)
@pytest.mark.asyncio
async def test_del_sub(app: App):
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config import Config
from nonebot_bison.config_manager import del_sub_matcher
from nonebot_bison.platform import platform_manager
config = Config()
config.user_target.truncate()
config.add_subscribe(
10000,
"group",
"6279793937",
"明日方舟Arknights",
"weibo",
[platform_manager["weibo"].reverse_category["图文"]],
["明日方舟"],
)
async with app.test_matcher(del_sub_matcher) as ctx:
bot = ctx.create_bot(base=Bot)
assert isinstance(bot, Bot)
event = fake_group_message_event(
message=Message("删除订阅"), to_me=True, sender=fake_admin_user
)
ctx.receive_event(bot, event)
ctx.receive_event(bot, event_1)
ctx.should_pass_rule()
ctx.should_pass_permission()
ctx.should_call_send(
event,
event_1,
Message(
"订阅的帐号为:\n1 weibo 明日方舟Arknights 6279793937\n [图文] 明日方舟\n请输入要删除的订阅的序号"
BotReply.add_reply_on_platform(
platform_manager=platform_manager, common_platform=common_platform
)
),
True,
)
event_1_err = fake_group_message_event(
message=Message("2"), sender=fake_admin_user
event_3 = fake_group_message_event(
message=Message("weibo"), sender=fake_admin_user
)
ctx.receive_event(bot, event_1_err)
ctx.should_call_send(event_1_err, "删除错误", True)
ctx.receive_event(bot, event_3)
ctx.should_call_send(
event_3,
Message(BotReply.add_reply_on_id),
True,
)
event_4_query = fake_group_message_event(
message=Message("查询"), sender=fake_admin_user
)
ctx.receive_event(bot, event_4_query)
ctx.should_rejected()
event_1_ok = fake_group_message_event(
message=Message("1"), sender=fake_admin_user
ctx.should_call_send(
event_4_query,
[MessageSegment(*BotReply.add_reply_on_id_input_search())],
True,
)
"""
line 362:
鬼知道为什么要在这里这样写
没有[]的话assert不了(should_call_send使用[MessageSegment(...)]的格式进行比较)
不在这里MessageSegment()的话也assert不了(指不能让add_reply_on_id_input_search直接返回一个MessageSegment对象)
amen
"""
event_abort = fake_group_message_event(
message=Message("取消"), sender=Sender(card="", nickname="test", role="admin")
)
ctx.receive_event(bot, event_abort)
ctx.should_call_send(
event_abort,
BotReply.add_reply_abort,
True,
)
ctx.receive_event(bot, event_1_ok)
ctx.should_call_send(event_1_ok, "删除成功", True)
ctx.should_finished()
subs = config.list_subscribe(10000, "group")
assert len(subs) == 0

View File

@ -0,0 +1,45 @@
from nonebug import App
from .utils import fake_admin_user, fake_private_message_event, fake_superuser
async def test_query(app: App):
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config_manager import group_manage_matcher
async with app.test_matcher(group_manage_matcher) as ctx:
bot = ctx.create_bot(base=Bot)
event = fake_private_message_event(
message=Message("群管理"), sender=fake_superuser
)
ctx.receive_event(bot, event)
ctx.should_pass_rule()
ctx.should_pass_permission()
ctx.should_call_api(
"get_group_list", {}, [{"group_id": 101, "group_name": "test group"}]
)
ctx.should_call_send(
event, Message("请选择需要管理的群:\n1. 101 - test group\n请输入左侧序号"), True
)
event_1_err = fake_private_message_event(
message=Message("0"), sender=fake_superuser
)
ctx.receive_event(bot, event_1_err)
ctx.should_rejected()
ctx.should_call_send(event_1_err, "请输入正确序号", True)
event_1_ok = fake_private_message_event(
message=Message("1"), sender=fake_superuser
)
ctx.receive_event(bot, event_1_ok)
ctx.should_call_send(event_1_ok, "请输入需要使用的命令:添加订阅,查询订阅,删除订阅", True)
event_2_err = fake_private_message_event(
message=Message("222"), sender=fake_superuser
)
ctx.receive_event(bot, event_2_err)
ctx.should_rejected()
ctx.should_call_send(event_2_err, "请输入正确的命令", True)
event_2_ok = fake_private_message_event(
message=Message("查询订阅"), sender=fake_superuser
)
ctx.receive_event(bot, event_2_ok)

View File

@ -0,0 +1,101 @@
import pytest
import respx
from httpx import Response
from nonebug.app import App
from .platforms.utils import get_json
from .utils import fake_admin_user, fake_group_message_event
@pytest.mark.asyncio
async def test_query_sub(app: App):
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config import Config
from nonebot_bison.config_manager import query_sub_matcher
from nonebot_bison.platform import platform_manager
config = Config()
config.user_target.truncate()
config.add_subscribe(
10000,
"group",
"6279793937",
"明日方舟Arknights",
"weibo",
[platform_manager["weibo"].reverse_category["图文"]],
["明日方舟"],
)
async with app.test_matcher(query_sub_matcher) as ctx:
bot = ctx.create_bot()
event = fake_group_message_event(message=Message("查询订阅"), to_me=True)
ctx.receive_event(bot, event)
ctx.should_pass_rule()
ctx.should_pass_permission()
ctx.should_call_send(
event, Message("订阅的帐号为:\nweibo 明日方舟Arknights 6279793937 [图文] 明日方舟\n"), True
)
@pytest.mark.asyncio
async def test_del_sub(app: App):
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config import Config
from nonebot_bison.config_manager import del_sub_matcher
from nonebot_bison.platform import platform_manager
config = Config()
config.user_target.truncate()
config.add_subscribe(
10000,
"group",
"6279793937",
"明日方舟Arknights",
"weibo",
[platform_manager["weibo"].reverse_category["图文"]],
["明日方舟"],
)
async with app.test_matcher(del_sub_matcher) as ctx:
bot = ctx.create_bot(base=Bot)
assert isinstance(bot, Bot)
event = fake_group_message_event(
message=Message("删除订阅"), to_me=True, sender=fake_admin_user
)
ctx.receive_event(bot, event)
ctx.should_pass_rule()
ctx.should_pass_permission()
ctx.should_call_send(
event,
Message(
"订阅的帐号为:\n1 weibo 明日方舟Arknights 6279793937\n [图文] 明日方舟\n请输入要删除的订阅的序号"
),
True,
)
event_1_err = fake_group_message_event(
message=Message("2"), sender=fake_admin_user
)
ctx.receive_event(bot, event_1_err)
ctx.should_call_send(event_1_err, "删除错误", True)
ctx.should_rejected()
event_1_ok = fake_group_message_event(
message=Message("1"), sender=fake_admin_user
)
ctx.receive_event(bot, event_1_ok)
ctx.should_call_send(event_1_ok, "删除成功", True)
ctx.should_finished()
subs = config.list_subscribe(10000, "group")
assert len(subs) == 0
async def test_test(app: App):
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config_manager import test_matcher
async with app.test_matcher(test_matcher) as ctx:
bot = ctx.create_bot(base=Bot)
event = fake_group_message_event(message=Message("testtt"))
ctx.receive_event(bot, event)
ctx.should_pass_permission()
ctx.should_pass_rule()
ctx.should_call_send(event, "666", True)

View File

@ -1,6 +1,7 @@
import typing
import pytest
from flaky import flaky
from nonebug.app import App
if typing.TYPE_CHECKING:
@ -40,7 +41,7 @@ merge_source_9_2 = [
]
@pytest.mark.asyncio
@flaky
async def test_9_merge(app: App):
from nonebot_bison.post import Post
@ -50,7 +51,7 @@ async def test_9_merge(app: App):
await post.generate_messages()
@pytest.mark.asyncio
@flaky
async def test_9_merge_2(app: App):
from nonebot_bison.post import Post
@ -60,7 +61,7 @@ async def test_9_merge_2(app: App):
await post.generate_messages()
@pytest.mark.asyncio
@flaky
async def test_6_merge(app: App):
from nonebot_bison.post import Post
@ -69,7 +70,7 @@ async def test_6_merge(app: App):
assert len(post.pics) == 5
@pytest.mark.asyncio
@flaky
async def test_3_merge(app: App):
from nonebot_bison.post import Post
@ -78,7 +79,7 @@ async def test_3_merge(app: App):
assert len(post.pics) == 5
@pytest.mark.asyncio
@flaky
async def test_6_merge_only(app: App):
from nonebot_bison.post import Post
@ -87,7 +88,7 @@ async def test_6_merge_only(app: App):
assert len(post.pics) == 1
@pytest.mark.asyncio
@flaky
async def test_3_merge_only(app: App):
from nonebot_bison.post import Post

View File

@ -3,20 +3,16 @@ import typing
import pytest
from nonebug.app import App
if typing.TYPE_CHECKING:
import sys
sys.path.append("./src/plugins")
import nonebot_bison
@pytest.mark.asyncio
@pytest.mark.render
async def test_render(app: App):
from nonebot_bison.utils import Render
from nonebot_bison.plugin_config import plugin_config
from nonebot_bison.utils import parse_text
render = Render()
res = await render.text_to_pic(
plugin_config.bison_use_pic = True
res = await parse_text(
"""a\nbbbbbbbbbbbbbbbbbbbbbb\ncd
<h1>中文</h1>
VuePress 由两部分组成第一部分是一个极简静态网站生成器
@ -26,3 +22,14 @@ VuePress 由两部分组成:第一部分是一个极简静态网站生成器
每一个由 VuePress 生成的页面都带有预渲染好的 HTML也因此具有非常好的加载性能和搜索引擎优化SEO同时一旦页面被加载Vue 将接管这些静态内容并将其转换成一个完整的单页应用SPA其他的页面则会只在用户浏览到的时候才按需加载
"""
)
@pytest.mark.asyncio
@pytest.mark.render
async def test_arknights(app: App):
from nonebot_bison.platform.arknights import Arknights
ak = Arknights()
res = await ak.parse(
{"webUrl": "https://ak.hycdn.cn/announce/IOS/announcement/854_1644580545.html"}
)

View File

@ -1,13 +1,13 @@
import asyncio
import pytest
from nonebot.adapters.onebot.v11.bot import Bot
from nonebug import App
@pytest.mark.asyncio
async def test_send_no_queue(app: App):
import nonebot
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot_bison.plugin_config import plugin_config
from nonebot_bison.send import send_msgs
@ -32,6 +32,7 @@ async def test_send_no_queue(app: App):
@pytest.mark.asyncio
async def test_send_queue(app: App):
import nonebot
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot_bison import send
from nonebot_bison.plugin_config import plugin_config
from nonebot_bison.send import LAST_SEND_TIME, do_send_msgs, send_msgs
@ -56,3 +57,163 @@ async def test_send_queue(app: App):
app.monkeypatch.setattr(send, "LAST_SEND_TIME", 0, True)
await do_send_msgs()
assert ctx.wait_list.empty()
@pytest.mark.asyncio
async def test_send_merge_no_queue(app: App):
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message, MessageSegment
from nonebot_bison.plugin_config import plugin_config
from nonebot_bison.send import send_msgs
plugin_config.bison_use_pic_merge = 1
plugin_config.bison_use_queue = False
async with app.test_api() as ctx:
bot = ctx.create_bot(base=Bot, self_id="8888")
assert isinstance(bot, Bot)
message = [
Message(MessageSegment.text("test msg")),
Message(MessageSegment.image("https://picsum.photos/200/300")),
]
ctx.should_call_api(
"send_group_msg",
{"group_id": 633, "message": Message(MessageSegment.text("test msg"))},
None,
)
ctx.should_call_api(
"send_group_msg",
{"group_id": 633, "message": message[1]},
None,
)
await send_msgs(bot, 633, "group", message)
message = [
Message(MessageSegment.text("test msg")),
Message(MessageSegment.image("https://picsum.photos/200/300")),
Message(MessageSegment.image("https://picsum.photos/200/300")),
]
ctx.should_call_api(
"send_group_msg",
{"group_id": 633, "message": Message(MessageSegment.text("test msg"))},
None,
)
ctx.should_call_api(
"get_group_member_info",
{"group_id": 633, "user_id": 8888, "no_cache": True},
{"user_id": 8888, "card": "admin", "nickname": "adminuser"},
)
merged_message = Message(
[
MessageSegment.node_custom(
user_id=8888, nickname="admin", content=message[1]
),
MessageSegment.node_custom(
user_id=8888, nickname="admin", content=message[2]
),
]
)
ctx.should_call_api(
"send_group_forward_msg",
{"group_id": 633, "messages": merged_message},
None,
)
await send_msgs(bot, 633, "group", message)
message = [
Message(MessageSegment.text("test msg")),
Message(MessageSegment.image("https://picsum.photos/200/300")),
Message(MessageSegment.image("https://picsum.photos/200/300")),
Message(MessageSegment.image("https://picsum.photos/200/300")),
]
ctx.should_call_api(
"send_group_msg",
{"group_id": 633, "message": Message(MessageSegment.text("test msg"))},
None,
)
ctx.should_call_api(
"get_group_member_info",
{"group_id": 633, "user_id": 8888, "no_cache": True},
{"user_id": 8888, "card": None, "nickname": "adminuser"},
)
merged_message = Message(
[
MessageSegment.node_custom(
user_id=8888, nickname="adminuser", content=message[1]
),
MessageSegment.node_custom(
user_id=8888, nickname="adminuser", content=message[2]
),
MessageSegment.node_custom(
user_id=8888, nickname="adminuser", content=message[3]
),
]
)
ctx.should_call_api(
"send_group_forward_msg",
{"group_id": 633, "messages": merged_message},
None,
)
await send_msgs(bot, 633, "group", message)
# private user should not send in forward
message = [
Message(MessageSegment.text("test msg")),
Message(MessageSegment.image("https://picsum.photos/200/300")),
Message(MessageSegment.image("https://picsum.photos/200/300")),
]
ctx.should_call_api(
"send_private_msg",
{"user_id": 633, "message": Message(MessageSegment.text("test msg"))},
None,
)
ctx.should_call_api(
"send_private_msg", {"user_id": 633, "message": message[1]}, None
)
ctx.should_call_api(
"send_private_msg", {"user_id": 633, "message": message[2]}, None
)
await send_msgs(bot, 633, "private", message)
async def test_send_merge2_no_queue(app: App):
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message, MessageSegment
from nonebot_bison.plugin_config import plugin_config
from nonebot_bison.send import send_msgs
plugin_config.bison_use_pic_merge = 2
plugin_config.bison_use_queue = False
async with app.test_api() as ctx:
bot = ctx.create_bot(base=Bot, self_id="8888")
assert isinstance(bot, Bot)
message = [
Message(MessageSegment.text("test msg")),
Message(MessageSegment.image("https://picsum.photos/200/300")),
Message(MessageSegment.image("https://picsum.photos/200/300")),
]
ctx.should_call_api(
"get_group_member_info",
{"group_id": 633, "user_id": 8888, "no_cache": True},
{"user_id": 8888, "card": "admin", "nickname": "adminuser"},
)
merged_message = Message(
[
MessageSegment.node_custom(
user_id=8888, nickname="admin", content=message[0]
),
MessageSegment.node_custom(
user_id=8888, nickname="admin", content=message[1]
),
MessageSegment.node_custom(
user_id=8888, nickname="admin", content=message[2]
),
]
)
ctx.should_call_api(
"send_group_forward_msg",
{"group_id": 633, "messages": merged_message},
None,
)
await send_msgs(bot, 633, "group", message)

View File

@ -68,3 +68,70 @@ def fake_private_message_event(**field) -> "PrivateMessageEvent":
from nonebot.adapters.onebot.v11.event import Sender
fake_admin_user = Sender(nickname="test", role="admin")
fake_superuser = Sender(user_id=10001, nickname="superuser")
class BotReply:
@staticmethod
def add_reply_on_platform(platform_manager, common_platform):
return (
"请输入想要订阅的平台,目前支持,请输入冒号左边的名称:\n"
+ "".join(
[
"{}{}\n".format(
platform_name, platform_manager[platform_name].name
)
for platform_name in common_platform
]
)
+ "要查看全部平台请输入:“全部”\n中止订阅过程请输入:“取消”"
)
@staticmethod
def add_reply_on_platform_input_allplatform(platform_manager):
return "全部平台\n" + "\n".join(
[
"{}{}".format(platform_name, platform.name)
for platform_name, platform in platform_manager.items()
]
)
@staticmethod
def add_reply_on_id_input_search():
search_url = "https://nonebot-bison.vercel.app/usage/#%E6%89%80%E6%94%AF%E6%8C%81%E5%B9%B3%E5%8F%B0%E7%9A%84-uid"
search_title = "Bison所支持的平台UID"
search_content = "查询相关平台的uid格式或获取方式"
search_image = "https://s3.bmp.ovh/imgs/2022/03/ab3cc45d83bd3dd3.jpg"
type = "share"
data = {
"url": search_url,
"title": search_title,
"content": search_content,
"image": search_image,
}
msg = [type, data]
return msg
@staticmethod
def add_reply_on_target_confirm(platform, name, id):
return f"即将订阅的用户为:{platform} {name} {id}\n如有错误请输入“取消”重新订阅"
@staticmethod
def add_reply_on_cats(platform_manager, platform: str):
return "请输入要订阅的类别,以空格分隔,支持的类别有:{}".format(
" ".join(list(platform_manager[platform].categories.values()))
)
@staticmethod
def add_reply_on_cats_input_error(cat: str):
return "不支持 {}".format(cat)
@staticmethod
def add_reply_subscribe_success(name):
return "添加 {} 成功".format(name)
add_reply_on_id_input_error = "id输入错误"
add_reply_on_platform_input_error = "平台输入错误"
add_reply_on_id = "请输入订阅用户的id:\n查询id获取方法请回复:“查询”"
add_reply_on_tags = '请输入要订阅的tag订阅所有tag输入"全部标签"'
add_reply_abort = "已中止订阅"

8568
yarn.lock

File diff suppressed because it is too large Load Diff