🎨 按ruff调整测试代码

This commit is contained in:
Azide 2023-07-16 01:35:25 +08:00 committed by felinae98
parent dba8f2a9cb
commit 7d4eb7785c
28 changed files with 414 additions and 842 deletions

View File

@ -79,7 +79,7 @@ asyncio_mode = "auto"
[tool.ruff]
select = ["E", "W", "F", "UP", "C", "T", "PYI", "PT", "Q"]
ignore = ["E402", "C901"]
ignore = ["E402", "C901", "PT023"]
line-length = 120
target-version = "py310"

View File

@ -1,5 +1,4 @@
import typing
from pathlib import Path
import pytest
from nonebug.app import App
@ -11,9 +10,8 @@ if typing.TYPE_CHECKING:
from nonebot_bison.config.config_legacy import Config
@pytest.fixture
@pytest.fixture()
def config_legacy(app: App, use_legacy_config):
from nonebot_bison import config
from nonebot_bison.config import config_legacy as config
config.start_up()
@ -38,9 +36,7 @@ def test_create_and_get(config_legacy: "Config", app: App):
)
confs = config_legacy.list_subscribe(123, "group")
assert len(confs) == 1
assert config_legacy.target_user_cache["weibo"][Target("weibo_id")] == [
types.User(123, "group")
]
assert config_legacy.target_user_cache["weibo"][Target("weibo_id")] == [types.User(123, "group")]
assert confs[0]["cats"] == []
config_legacy.update_subscribe(
user=123,

View File

@ -5,12 +5,10 @@ from pytest_mock import MockerFixture
async def test_create_config(init_scheduler):
from nonebot_plugin_datastore.db import get_engine
from nonebot_plugin_saa import TargetQQGroup
from nonebot_bison.config.db_config import TimeWeightConfig, WeightConfig, config
from nonebot_bison.config.db_model import Subscribe, Target, User
from nonebot_bison.types import Target as T_Target
from nonebot_bison.config.db_config import WeightConfig, TimeWeightConfig, config
await config.add_subscribe(
TargetQQGroup(group_id=123),
@ -33,22 +31,14 @@ async def test_create_config(init_scheduler):
platform_name="weibo",
conf=WeightConfig(
default=10,
time_config=[
TimeWeightConfig(start_time=time(1, 0), end_time=time(2, 0), weight=20)
],
time_config=[TimeWeightConfig(start_time=time(1, 0), end_time=time(2, 0), weight=20)],
),
)
test_config = await config.get_time_weight_config(
target=T_Target("weibo_id"), platform_name="weibo"
)
test_config = await config.get_time_weight_config(target=T_Target("weibo_id"), platform_name="weibo")
assert test_config.default == 10
assert test_config.time_config == [
TimeWeightConfig(start_time=time(1, 0), end_time=time(2, 0), weight=20)
]
test_config1 = await config.get_time_weight_config(
target=T_Target("weibo_id1"), platform_name="weibo"
)
assert test_config.time_config == [TimeWeightConfig(start_time=time(1, 0), end_time=time(2, 0), weight=20)]
test_config1 = await config.get_time_weight_config(target=T_Target("weibo_id1"), platform_name="weibo")
assert test_config1.default == 10
assert test_config1.time_config == []
@ -56,13 +46,11 @@ async def test_create_config(init_scheduler):
async def test_get_current_weight(init_scheduler, mocker: MockerFixture):
from datetime import time
from nonebot_plugin_datastore.db import get_engine
from nonebot_plugin_saa import TargetQQGroup
from nonebot_bison.config import db_config
from nonebot_bison.config.db_config import TimeWeightConfig, WeightConfig, config
from nonebot_bison.config.db_model import Subscribe, Target, User
from nonebot_bison.types import Target as T_Target
from nonebot_bison.config.db_config import WeightConfig, TimeWeightConfig, config
await config.add_subscribe(
TargetQQGroup(group_id=123),
@ -120,14 +108,13 @@ async def test_get_current_weight(init_scheduler, mocker: MockerFixture):
async def test_get_platform_target(app: App, init_scheduler):
from nonebot_plugin_datastore.db import get_engine
from nonebot_plugin_saa import TargetQQGroup
from sqlalchemy.ext.asyncio.session import AsyncSession
from sqlalchemy.sql.expression import select
from nonebot_plugin_datastore.db import get_engine
from sqlalchemy.ext.asyncio.session import AsyncSession
from nonebot_bison.config import db_config
from nonebot_bison.config.db_config import TimeWeightConfig, WeightConfig, config
from nonebot_bison.config.db_model import Subscribe, Target, User
from nonebot_bison.config.db_model import Target
from nonebot_bison.config.db_config import config
from nonebot_bison.types import Target as T_Target
await config.add_subscribe(
@ -156,14 +143,10 @@ async def test_get_platform_target(app: App, init_scheduler):
)
res = await config.get_platform_target("weibo")
assert len(res) == 2
await config.del_subscribe(
TargetQQGroup(group_id=123), T_Target("weibo_id1"), "weibo"
)
await config.del_subscribe(TargetQQGroup(group_id=123), T_Target("weibo_id1"), "weibo")
res = await config.get_platform_target("weibo")
assert len(res) == 2
await config.del_subscribe(
TargetQQGroup(group_id=123), T_Target("weibo_id"), "weibo"
)
await config.del_subscribe(TargetQQGroup(group_id=123), T_Target("weibo_id"), "weibo")
res = await config.get_platform_target("weibo")
assert len(res) == 1
@ -173,16 +156,11 @@ async def test_get_platform_target(app: App, init_scheduler):
async def test_get_platform_target_subscribers(app: App, init_scheduler):
from nonebot_plugin_datastore.db import get_engine
from nonebot_plugin_saa import TargetQQGroup
from sqlalchemy.ext.asyncio.session import AsyncSession
from sqlalchemy.sql.expression import select
from nonebot_bison.config import db_config
from nonebot_bison.config.db_config import TimeWeightConfig, WeightConfig, config
from nonebot_bison.config.db_model import Subscribe, Target, User
from nonebot_bison.types import Target as T_Target
from nonebot_bison.types import UserSubInfo
from nonebot_bison.config.db_config import config
from nonebot_bison.types import Target as T_Target
await config.add_subscribe(
TargetQQGroup(group_id=123),

View File

@ -1,12 +1,12 @@
import sys
from pathlib import Path
import nonebot
import pytest
from nonebot.adapters.onebot.v11 import Adapter as OnebotV11Adapter
import nonebot
from sqlalchemy import delete
from nonebug import NONEBOT_INIT_KWARGS, App
from pytest_mock.plugin import MockerFixture
from sqlalchemy import delete
from nonebot.adapters.onebot.v11 import Adapter as OnebotV11Adapter
from .utils import AppReq
@ -24,24 +24,20 @@ def pytest_configure(config: pytest.Config) -> None:
def load_adapters(nonebug_init: None):
driver = nonebot.get_driver()
driver.register_adapter(OnebotV11Adapter)
return driver
@pytest.fixture
@pytest.fixture()
async def app(tmp_path: Path, request: pytest.FixtureRequest, mocker: MockerFixture):
sys.path.append(str(Path(__file__).parent.parent / "src" / "plugins"))
nonebot.require("nonebot_bison")
from nonebot_plugin_datastore.config import plugin_config as datastore_config
from nonebot_plugin_datastore.db import create_session, init_db
from nonebot_plugin_htmlrender.browser import shutdown_browser
from nonebot_plugin_datastore.db import init_db, create_session
from nonebot_plugin_datastore.config import plugin_config as datastore_config
from nonebot_bison import plugin_config
from nonebot_bison.config.db_model import (
ScheduleTimeWeight,
Subscribe,
Target,
User,
)
from nonebot_bison.config.db_model import User, Target, Subscribe, ScheduleTimeWeight
plugin_config.bison_config_path = str(tmp_path / "legacy_config")
plugin_config.bison_filter_log = False
@ -72,7 +68,7 @@ async def app(tmp_path: Path, request: pytest.FixtureRequest, mocker: MockerFixt
await shutdown_browser()
@pytest.fixture
@pytest.fixture()
def dummy_user_subinfo(app: App):
from nonebot_plugin_saa import TargetQQGroup
@ -82,19 +78,19 @@ def dummy_user_subinfo(app: App):
return UserSubInfo(user=user, categories=[], tags=[])
@pytest.fixture
@pytest.fixture()
async def init_scheduler(app: App):
from nonebot_bison.scheduler.manager import init_scheduler
await init_scheduler()
return await init_scheduler()
@pytest.fixture
@pytest.fixture()
async def use_legacy_config(app: App):
import aiofiles
from nonebot_bison.config.config_legacy import Config, get_config_path
from nonebot_bison.utils import Singleton
from nonebot_bison.config.config_legacy import Config, get_config_path
# 默认不创建配置所在的文件夹
# 如果需要测试需要手动创建相关文件夹
@ -106,7 +102,7 @@ async def use_legacy_config(app: App):
Config()._do_init()
yield
yield None
# 清除单例的缓存
Singleton._instances.clear()

View File

@ -1,15 +1,15 @@
import pytest
import respx
from httpx import AsyncClient, Response
import pytest
from nonebug.app import App
from httpx import Response, AsyncClient
from .utils import get_file, get_json
@pytest.fixture
@pytest.fixture()
def arknights(app: App):
from nonebot_bison.platform import platform_manager
from nonebot_bison.utils import ProcessContext
from nonebot_bison.platform import platform_manager
return platform_manager["arknights"](ProcessContext(), AsyncClient())
@ -39,7 +39,7 @@ def monster_siren_list_1():
return get_json("monster-siren_list_1.json")
@pytest.mark.asyncio
@pytest.mark.asyncio()
@respx.mock
async def test_fetch_new(
arknights,
@ -49,30 +49,18 @@ async def test_fetch_new(
monster_siren_list_0,
monster_siren_list_1,
):
ak_list_router = respx.get(
"https://ak-conf.hypergryph.com/config/prod/announce_meta/IOS/announcement.meta.json"
)
detail_router = respx.get(
"https://ak.hycdn.cn/announce/IOS/announcement/807_1640060583.html"
)
version_router = respx.get(
"https://ak-conf.hypergryph.com/config/prod/official/IOS/version"
)
ak_list_router = respx.get("https://ak-conf.hypergryph.com/config/prod/announce_meta/IOS/announcement.meta.json")
detail_router = respx.get("https://ak.hycdn.cn/announce/IOS/announcement/807_1640060583.html")
version_router = respx.get("https://ak-conf.hypergryph.com/config/prod/official/IOS/version")
preannouncement_router = respx.get(
"https://ak-conf.hypergryph.com/config/prod/announce_meta/IOS/preannouncement.meta.json"
)
monster_siren_router = respx.get("https://monster-siren.hypergryph.com/api/news")
terra_list = respx.get("https://terra-historicus.hypergryph.com/api/recentUpdate")
ak_list_router.mock(return_value=Response(200, json=arknights_list__1))
detail_router.mock(
return_value=Response(200, text=get_file("arknights-detail-807"))
)
version_router.mock(
return_value=Response(200, json=get_json("arknights-version-0.json"))
)
preannouncement_router.mock(
return_value=Response(200, json=get_json("arknights-pre-0.json"))
)
detail_router.mock(return_value=Response(200, text=get_file("arknights-detail-807")))
version_router.mock(return_value=Response(200, json=get_json("arknights-version-0.json")))
preannouncement_router.mock(return_value=Response(200, json=get_json("arknights-pre-0.json")))
monster_siren_router.mock(return_value=Response(200, json=monster_siren_list_0))
terra_list.mock(return_value=Response(200, json=get_json("terra-hist-0.json")))
target = ""
@ -92,7 +80,6 @@ async def test_fetch_new(
assert post.target_name == "明日方舟游戏内公告"
assert len(post.pics) == 1
# assert(post.pics == ['https://ak-fs.hypergryph.com/announce/images/20210623/e6f49aeb9547a2278678368a43b95b07.jpg'])
print(res3[0][1])
await post.generate_messages()
terra_list.mock(return_value=Response(200, json=get_json("terra-hist-1.json")))
res = await arknights.fetch_new_post(target, [dummy_user_subinfo])
@ -101,12 +88,10 @@ async def test_fetch_new(
assert post.target_type == "terra-historicus"
assert post.text == "123罗德岛 - 「掠风」篇"
assert post.url == "https://terra-historicus.hypergryph.com/comic/6253/episode/4938"
assert post.pics == [
"https://web.hycdn.cn/comic/pic/20220507/ab8a2ff408ec7d587775aed70b178ec0.png"
]
assert post.pics == ["https://web.hycdn.cn/comic/pic/20220507/ab8a2ff408ec7d587775aed70b178ec0.png"]
@pytest.mark.render
@pytest.mark.render()
@respx.mock
async def test_send_with_render(
arknights,
@ -116,30 +101,18 @@ async def test_send_with_render(
monster_siren_list_0,
monster_siren_list_1,
):
ak_list_router = respx.get(
"https://ak-conf.hypergryph.com/config/prod/announce_meta/IOS/announcement.meta.json"
)
detail_router = respx.get(
"https://ak.hycdn.cn/announce/IOS/announcement/805_1640074952.html"
)
version_router = respx.get(
"https://ak-conf.hypergryph.com/config/prod/official/IOS/version"
)
ak_list_router = respx.get("https://ak-conf.hypergryph.com/config/prod/announce_meta/IOS/announcement.meta.json")
detail_router = respx.get("https://ak.hycdn.cn/announce/IOS/announcement/805_1640074952.html")
version_router = respx.get("https://ak-conf.hypergryph.com/config/prod/official/IOS/version")
preannouncement_router = respx.get(
"https://ak-conf.hypergryph.com/config/prod/announce_meta/IOS/preannouncement.meta.json"
)
monster_siren_router = respx.get("https://monster-siren.hypergryph.com/api/news")
terra_list = respx.get("https://terra-historicus.hypergryph.com/api/recentUpdate")
ak_list_router.mock(return_value=Response(200, json=arknights_list_0))
detail_router.mock(
return_value=Response(200, text=get_file("arknights-detail-805"))
)
version_router.mock(
return_value=Response(200, json=get_json("arknights-version-0.json"))
)
preannouncement_router.mock(
return_value=Response(200, json=get_json("arknights-pre-0.json"))
)
detail_router.mock(return_value=Response(200, text=get_file("arknights-detail-805")))
version_router.mock(return_value=Response(200, json=get_json("arknights-version-0.json")))
preannouncement_router.mock(return_value=Response(200, json=get_json("arknights-pre-0.json")))
monster_siren_router.mock(return_value=Response(200, json=monster_siren_list_0))
terra_list.mock(return_value=Response(200, json=get_json("terra-hist-0.json")))
target = ""
@ -159,5 +132,5 @@ async def test_send_with_render(
assert post.target_name == "明日方舟游戏内公告"
assert len(post.pics) == 1
# assert(post.pics == ['https://ak-fs.hypergryph.com/announce/images/20210623/e6f49aeb9547a2278678368a43b95b07.jpg'])
print(res3[0][1])
r = await post.generate_messages()
assert r

View File

@ -1,13 +1,12 @@
import typing
from datetime import datetime
import pytest
import respx
from httpx import AsyncClient, Response
import pytest
from nonebug.app import App
from pytz import timezone
from httpx import Response, AsyncClient
from .utils import get_file, get_json
from .utils import get_json
@pytest.fixture(scope="module")
@ -19,10 +18,10 @@ if typing.TYPE_CHECKING:
from nonebot_bison.platform.bilibili import Bilibili
@pytest.fixture
@pytest.fixture()
def bilibili(app: App):
from nonebot_bison.platform import platform_manager
from nonebot_bison.utils import ProcessContext
from nonebot_bison.platform import platform_manager
return platform_manager["bilibili"](ProcessContext(), AsyncClient())
@ -46,7 +45,7 @@ async def test_video_forward(bilibili, bing_dy_list):
post = await bilibili.parse(bing_dy_list[1])
assert (
post.text
== "答案揭晓:宿舍!来看看投票结果\nhttps://t.bilibili.com/568093580488553786\n--------------\n#可露希尔的秘密档案# \n11来宿舍休息一下吧 \n档案来源lambda:\\罗德岛内务\\秘密档案 \n发布时间9/12 1:00 P.M. \n档案类型:可见 \n档案描述:今天请了病假在宿舍休息。很舒适。 \n提供者:赫默\n=================\n《可露希尔的秘密档案》11话来宿舍休息一下吧"
== "答案揭晓:宿舍!来看看投票结果\nhttps://t.bilibili.com/568093580488553786\n--------------\n#可露希尔的秘密档案# \n11来宿舍休息一下吧 \n档案来源lambda:\\罗德岛内务\\秘密档案 \n发布时间9/12 1:00 P.M. \n档案类型:可见 \n档案描述:今天请了病假在宿舍休息。很舒适。 \n提供者:赫默\n=================\n《可露希尔的秘密档案》11话来宿舍休息一下吧" # noqa: E501
)
@ -57,7 +56,7 @@ async def test_article_forward(bilibili, bing_dy_list):
post.text
== "#明日方舟##饼学大厦#\n9.11专栏更新完毕,这还塌了实属没跟新运营对上\n后边除了周日发饼和PV没提及的中文语音稳了\n别忘了来参加#可露希尔的秘密档案#的主题投票\nhttps://t.bilibili.com/568093580488553786?tab=2"
+ "\n--------------\n"
+ "【明日方舟】饼学大厦#12~14风暴瞭望&玛莉娅·临光&红松林&感谢庆典9.11更新 更新记录09.11更新覆盖09.10更新以及排期更新猜测周一周五开活动09.10更新以周五开活动为底PV/公告调整位置整体结构更新09.08更新:饼学大厦#12更新新增一件六星商店服饰周日发饼09.06更新饼学大厦整栋整栋翻新改为9.16开主线四日无饼09.05凌晨更新10.13后的排期(两日无饼,鹰角背刺,心狠手辣)前言感谢楪筱祈ぺ的动态-哔哩哔哩 (bilibili.com) 对饼学的贡献后续排期9.17【风暴瞭望】、10.01【玛莉娅·临光】复刻、10.1"
+ "【明日方舟】饼学大厦#12~14风暴瞭望&玛莉娅·临光&红松林&感谢庆典9.11更新 更新记录09.11更新覆盖09.10更新以及排期更新猜测周一周五开活动09.10更新以周五开活动为底PV/公告调整位置整体结构更新09.08更新:饼学大厦#12更新新增一件六星商店服饰周日发饼09.06更新饼学大厦整栋整栋翻新改为9.16开主线四日无饼09.05凌晨更新10.13后的排期(两日无饼,鹰角背刺,心狠手辣)前言感谢楪筱祈ぺ的动态-哔哩哔哩 (bilibili.com) 对饼学的贡献后续排期9.17【风暴瞭望】、10.01【玛莉娅·临光】复刻、10.1" # noqa: E501
)
@ -66,9 +65,9 @@ async def test_dynamic_forward(bilibili, bing_dy_list):
post = await bilibili.parse(bing_dy_list[5])
assert (
post.text
== "饼组主线饼学预测——9.11版\n①今日结果\n9.11 殿堂上的游禽-星极(x新运营实锤了)\n②后续预测\n9.12 #罗德岛相簿#+#可露希尔的秘密档案#11话\n9.13 六星先锋(执旗手)干员-琴柳\n9.14 宣传策略-空弦+家具\n9.15 轮换池(+中文语音前瞻)\n9.16 停机\n9.17 #罗德岛闲逛部#+新六星EP+EP09·风暴瞭望开启\n9.19 #罗德岛相簿#"
== "饼组主线饼学预测——9.11版\n①今日结果\n9.11 殿堂上的游禽-星极(x新运营实锤了)\n②后续预测\n9.12 #罗德岛相簿#+#可露希尔的秘密档案#11话\n9.13 六星先锋(执旗手)干员-琴柳\n9.14 宣传策略-空弦+家具\n9.15 轮换池(+中文语音前瞻)\n9.16 停机\n9.17 #罗德岛闲逛部#+新六星EP+EP09·风暴瞭望开启\n9.19 #罗德岛相簿#" # noqa: E501
+ "\n--------------\n"
+ "#明日方舟#\n【新增服饰】\n//殿堂上的游禽 - 星极\n塞壬唱片偶像企划《闪耀阶梯》特供服饰/殿堂上的游禽。星极自费参加了这项企划,尝试着用大众能接受的方式演绎天空之上的故事。\n\n_____________\n谦逊留给观众,骄傲发自歌喉,此夜,唯我璀璨。 "
+ "#明日方舟#\n【新增服饰】\n//殿堂上的游禽 - 星极\n塞壬唱片偶像企划《闪耀阶梯》特供服饰/殿堂上的游禽。星极自费参加了这项企划,尝试着用大众能接受的方式演绎天空之上的故事。\n\n_____________\n谦逊留给观众,骄傲发自歌喉,此夜,唯我璀璨。 " # noqa: E501
)
@ -93,9 +92,7 @@ async def test_fetch_new(bilibili, dummy_user_subinfo):
post_router = respx.get(
"https://api.vc.bilibili.com/dynamic_svr/v1/dynamic_svr/space_history?host_uid=161775300&offset=0&need_top=0"
)
post_router.mock(
return_value=Response(200, json=get_json("bilibili_strange_post-0.json"))
)
post_router.mock(return_value=Response(200, json=get_json("bilibili_strange_post-0.json")))
bilibili_main_page_router = respx.get("https://www.bilibili.com/")
bilibili_main_page_router.mock(return_value=Response(200))
target = "161775300"
@ -111,7 +108,7 @@ async def test_fetch_new(bilibili, dummy_user_subinfo):
post = res2[0][1][0]
assert (
post.text
== "#罗德厨房——回甘##明日方舟#\r\n明日方舟官方美食漫画,正式开餐。\r\n往事如烟,安然即好。\r\nMenu 01高脚羽兽烤串与罗德岛的领袖\r\n\r\n哔哩哔哩漫画阅读https://manga.bilibili.com/detail/mc31998?from=manga_search\r\n\r\n关注并转发本动态我们将会在5月27日抽取10位博士赠送【兔兔奇境】周边礼盒一份。 互动抽奖"
== "#罗德厨房——回甘##明日方舟#\r\n明日方舟官方美食漫画,正式开餐。\r\n往事如烟,安然即好。\r\nMenu 01高脚羽兽烤串与罗德岛的领袖\r\n\r\n哔哩哔哩漫画阅读https://manga.bilibili.com/detail/mc31998?from=manga_search\r\n\r\n关注并转发本动态我们将会在5月27日抽取10位博士赠送【兔兔奇境】周边礼盒一份。 互动抽奖" # noqa: E501
)
@ -127,9 +124,7 @@ async def test_parse_target(bilibili: "Bilibili"):
)
assert res2 == "161775300"
with pytest.raises(Platform.ParseTargetException):
await bilibili.parse_target(
"https://www.bilibili.com/video/BV1qP4y1g738?spm_id_from=333.999.0.0"
)
await bilibili.parse_target("https://www.bilibili.com/video/BV1qP4y1g738?spm_id_from=333.999.0.0")
@pytest.fixture(scope="module")
@ -140,7 +135,6 @@ def post_list():
# 测试新tag机制的平台推送情况
@pytest.mark.asyncio
async def test_filter_user_custom(bilibili, post_list):
only_banned_tags = ["~可露希尔的秘密档案"]
res0 = await bilibili.filter_user_custom(post_list, [], only_banned_tags)
assert len(res0) == 8

View File

@ -1,9 +1,9 @@
import typing
import pytest
import respx
from httpx import AsyncClient, Response
import pytest
from nonebug.app import App
from httpx import Response, AsyncClient
from .utils import get_json
@ -11,10 +11,10 @@ if typing.TYPE_CHECKING:
from nonebot_bison.platform.bilibili import BilibiliBangumi
@pytest.fixture
@pytest.fixture()
def bili_bangumi(app: App):
from nonebot_bison.platform import platform_manager
from nonebot_bison.utils import ProcessContext
from nonebot_bison.platform import platform_manager
return platform_manager["bilibili-bangumi"](ProcessContext(), AsyncClient())
@ -26,32 +26,20 @@ async def test_parse_target(bili_bangumi: "BilibiliBangumi"):
assert res1 == "28339726"
res2 = await bili_bangumi.parse_target("md28339726")
assert res2 == "28339726"
res3 = await bili_bangumi.parse_target(
"https://www.bilibili.com/bangumi/media/md28339726"
)
res3 = await bili_bangumi.parse_target("https://www.bilibili.com/bangumi/media/md28339726")
assert res3 == "28339726"
with pytest.raises(Platform.ParseTargetException):
await bili_bangumi.parse_target(
"https://www.bilibili.com/bangumi/play/ep683045"
)
await bili_bangumi.parse_target("https://www.bilibili.com/bangumi/play/ep683045")
@pytest.mark.asyncio
@respx.mock
async def test_fetch_bilibili_bangumi_status(
bili_bangumi: "BilibiliBangumi", dummy_user_subinfo
):
async def test_fetch_bilibili_bangumi_status(bili_bangumi: "BilibiliBangumi", dummy_user_subinfo):
from nonebot_bison.types import Target
bili_bangumi_router = respx.get(
"https://api.bilibili.com/pgc/review/user?media_id=28235413"
)
bili_bangumi_detail_router = respx.get(
"https://api.bilibili.com/pgc/view/web/season?season_id=39719"
)
bili_bangumi_router.mock(
return_value=Response(200, json=get_json("bilibili-gangumi-hanhua0.json"))
)
bili_bangumi_router = respx.get("https://api.bilibili.com/pgc/review/user?media_id=28235413")
bili_bangumi_detail_router = respx.get("https://api.bilibili.com/pgc/view/web/season?season_id=39719")
bili_bangumi_router.mock(return_value=Response(200, json=get_json("bilibili-gangumi-hanhua0.json")))
bilibili_main_page_router = respx.get("https://www.bilibili.com/")
bilibili_main_page_router.mock(return_value=Response(200))
target = Target("28235413")
@ -61,14 +49,8 @@ async def test_fetch_bilibili_bangumi_status(
res = await bili_bangumi.fetch_new_post(target, [dummy_user_subinfo])
assert len(res) == 0
bili_bangumi_router.mock(
return_value=Response(200, json=get_json("bilibili-gangumi-hanhua1.json"))
)
bili_bangumi_detail_router.mock(
return_value=Response(
200, json=get_json("bilibili-gangumi-hanhua1-detail.json")
)
)
bili_bangumi_router.mock(return_value=Response(200, json=get_json("bilibili-gangumi-hanhua1.json")))
bili_bangumi_detail_router.mock(return_value=Response(200, json=get_json("bilibili-gangumi-hanhua1-detail.json")))
res2 = await bili_bangumi.fetch_new_post(target, [dummy_user_subinfo])
post = res2[0][1][0]
@ -76,7 +58,5 @@ async def test_fetch_bilibili_bangumi_status(
assert post.text == "《汉化日记 第三季》第2话 什么是战区导弹防御系统工作日"
assert post.url == "https://www.bilibili.com/bangumi/play/ep519207"
assert post.target_name == "汉化日记 第三季"
assert post.pics == [
"http://i0.hdslb.com/bfs/archive/ea0a302c954f9dbc3d593e676486396c551529c9.jpg"
]
assert post.compress == True
assert post.pics == ["http://i0.hdslb.com/bfs/archive/ea0a302c954f9dbc3d593e676486396c551529c9.jpg"]
assert post.compress is True

View File

@ -2,21 +2,21 @@ from copy import deepcopy
import pytest
import respx
from httpx import AsyncClient, Response
from nonebug.app import App
from httpx import Response, AsyncClient
from .utils import get_json
@pytest.fixture
@pytest.fixture()
def bili_live(app: App):
from nonebot_bison.platform import platform_manager
from nonebot_bison.utils import ProcessContext
from nonebot_bison.platform import platform_manager
return platform_manager["bilibili-live"](ProcessContext(), AsyncClient())
@pytest.fixture
@pytest.fixture()
def dummy_only_open_user_subinfo(app: App):
from nonebot_plugin_saa import TargetQQGroup
@ -77,7 +77,7 @@ async def test_fetch_first_live(bili_live, dummy_only_open_user_subinfo):
assert post.pics == [
"https://i0.hdslb.com/bfs/live/new_room_cover/fd357f0f3cbbb48e9acfbcda616b946c2454c56c.jpg"
]
assert post.compress == True
assert post.compress is True
@pytest.mark.asyncio
@ -85,9 +85,7 @@ async def test_fetch_first_live(bili_live, dummy_only_open_user_subinfo):
async def test_fetch_bililive_only_live_open(bili_live, dummy_only_open_user_subinfo):
mock_bili_live_status = get_json("bili_live_status.json")
bili_live_router = respx.get(
"https://api.live.bilibili.com/room/v1/Room/get_status_info_by_uids?uids[]=13164144"
)
bili_live_router = respx.get("https://api.live.bilibili.com/room/v1/Room/get_status_info_by_uids?uids[]=13164144")
bili_live_router.mock(return_value=Response(200, json=mock_bili_live_status))
bilibili_main_page_router = respx.get("https://www.bilibili.com/")
@ -106,10 +104,8 @@ async def test_fetch_bililive_only_live_open(bili_live, dummy_only_open_user_sub
assert post.text == "[开播] 【Zc】从0挑战到15肉鸽目前10难度"
assert post.url == "https://live.bilibili.com/3044248"
assert post.target_name == "魔法Zc目录 其他单机"
assert post.pics == [
"https://i0.hdslb.com/bfs/live/new_room_cover/fd357f0f3cbbb48e9acfbcda616b946c2454c56c.jpg"
]
assert post.compress == True
assert post.pics == ["https://i0.hdslb.com/bfs/live/new_room_cover/fd357f0f3cbbb48e9acfbcda616b946c2454c56c.jpg"]
assert post.compress is True
# 标题变更
mock_bili_live_status["data"][target]["title"] = "【Zc】从0挑战到15肉鸽目前11难度"
bili_live_router.mock(return_value=Response(200, json=mock_bili_live_status))
@ -124,7 +120,7 @@ async def test_fetch_bililive_only_live_open(bili_live, dummy_only_open_user_sub
assert len(res4[0][1]) == 0
@pytest.fixture
@pytest.fixture()
def dummy_only_title_user_subinfo(app: App):
from nonebot_plugin_saa import TargetQQGroup
@ -134,17 +130,13 @@ def dummy_only_title_user_subinfo(app: App):
return UserSubInfo(user=user, categories=[2], tags=[])
@pytest.mark.asyncio
@pytest.mark.asyncio()
@respx.mock
async def test_fetch_bililive_only_title_change(
bili_live, dummy_only_title_user_subinfo
):
async def test_fetch_bililive_only_title_change(bili_live, dummy_only_title_user_subinfo):
mock_bili_live_status = get_json("bili_live_status.json")
target = "13164144"
bili_live_router = respx.get(
"https://api.live.bilibili.com/room/v1/Room/get_status_info_by_uids?uids[]=13164144"
)
bili_live_router = respx.get("https://api.live.bilibili.com/room/v1/Room/get_status_info_by_uids?uids[]=13164144")
bili_live_router.mock(return_value=Response(200, json=mock_bili_live_status))
bilibili_main_page_router = respx.get("https://www.bilibili.com/")
@ -174,10 +166,8 @@ async def test_fetch_bililive_only_title_change(
assert post.text == "[标题更新] 【Zc】从0挑战到15肉鸽目前12难度"
assert post.url == "https://live.bilibili.com/3044248"
assert post.target_name == "魔法Zc目录 其他单机"
assert post.pics == [
"https://i0.hdslb.com/bfs/live-key-frame/keyframe10170435000003044248mwowx0.jpg"
]
assert post.compress == True
assert post.pics == ["https://i0.hdslb.com/bfs/live-key-frame/keyframe10170435000003044248mwowx0.jpg"]
assert post.compress is True
# 直播状态更新-下播
mock_bili_live_status["data"][target]["live_status"] = 0
bili_live_router.mock(return_value=Response(200, json=mock_bili_live_status))
@ -186,7 +176,7 @@ async def test_fetch_bililive_only_title_change(
assert len(res4[0][1]) == 0
@pytest.fixture
@pytest.fixture()
def dummy_only_close_user_subinfo(app: App):
from nonebot_plugin_saa import TargetQQGroup
@ -202,9 +192,7 @@ async def test_fetch_bililive_only_close(bili_live, dummy_only_close_user_subinf
mock_bili_live_status = get_json("bili_live_status.json")
target = "13164144"
bili_live_router = respx.get(
"https://api.live.bilibili.com/room/v1/Room/get_status_info_by_uids?uids[]=13164144"
)
bili_live_router = respx.get("https://api.live.bilibili.com/room/v1/Room/get_status_info_by_uids?uids[]=13164144")
bili_live_router.mock(return_value=Response(200, json=mock_bili_live_status))
bilibili_main_page_router = respx.get("https://www.bilibili.com/")
@ -241,13 +229,11 @@ async def test_fetch_bililive_only_close(bili_live, dummy_only_close_user_subinf
assert post.text == "[下播] 【Zc】从0挑战到15肉鸽目前12难度"
assert post.url == "https://live.bilibili.com/3044248"
assert post.target_name == "魔法Zc目录 其他单机"
assert post.pics == [
"https://i0.hdslb.com/bfs/live-key-frame/keyframe10170435000003044248mwowx0.jpg"
]
assert post.compress == True
assert post.pics == ["https://i0.hdslb.com/bfs/live-key-frame/keyframe10170435000003044248mwowx0.jpg"]
assert post.compress is True
@pytest.fixture
@pytest.fixture()
def dummy_bililive_user_subinfo(app: App):
from nonebot_plugin_saa import TargetQQGroup
@ -263,9 +249,7 @@ async def test_fetch_bililive_combo(bili_live, dummy_bililive_user_subinfo):
mock_bili_live_status = get_json("bili_live_status.json")
target = "13164144"
bili_live_router = respx.get(
"https://api.live.bilibili.com/room/v1/Room/get_status_info_by_uids?uids[]=13164144"
)
bili_live_router = respx.get("https://api.live.bilibili.com/room/v1/Room/get_status_info_by_uids?uids[]=13164144")
bili_live_router.mock(return_value=Response(200, json=mock_bili_live_status))
bilibili_main_page_router = respx.get("https://www.bilibili.com/")
@ -289,10 +273,8 @@ async def test_fetch_bililive_combo(bili_live, dummy_bililive_user_subinfo):
assert post2.text == "[开播] 【Zc】从0挑战到15肉鸽目前11难度"
assert post2.url == "https://live.bilibili.com/3044248"
assert post2.target_name == "魔法Zc目录 其他单机"
assert post2.pics == [
"https://i0.hdslb.com/bfs/live/new_room_cover/fd357f0f3cbbb48e9acfbcda616b946c2454c56c.jpg"
]
assert post2.compress == True
assert post2.pics == ["https://i0.hdslb.com/bfs/live/new_room_cover/fd357f0f3cbbb48e9acfbcda616b946c2454c56c.jpg"]
assert post2.compress is True
# 标题变更
mock_bili_live_status["data"][target]["title"] = "【Zc】从0挑战到15肉鸽目前12难度"
bili_live_router.mock(return_value=Response(200, json=mock_bili_live_status))
@ -302,10 +284,8 @@ async def test_fetch_bililive_combo(bili_live, dummy_bililive_user_subinfo):
assert post3.text == "[标题更新] 【Zc】从0挑战到15肉鸽目前12难度"
assert post3.url == "https://live.bilibili.com/3044248"
assert post3.target_name == "魔法Zc目录 其他单机"
assert post3.pics == [
"https://i0.hdslb.com/bfs/live-key-frame/keyframe10170435000003044248mwowx0.jpg"
]
assert post3.compress == True
assert post3.pics == ["https://i0.hdslb.com/bfs/live-key-frame/keyframe10170435000003044248mwowx0.jpg"]
assert post3.compress is True
# 直播状态更新-下播
mock_bili_live_status["data"][target]["live_status"] = 0
bili_live_router.mock(return_value=Response(200, json=mock_bili_live_status))
@ -315,7 +295,5 @@ async def test_fetch_bililive_combo(bili_live, dummy_bililive_user_subinfo):
assert post4.text == "[下播] 【Zc】从0挑战到15肉鸽目前12难度"
assert post4.url == "https://live.bilibili.com/3044248"
assert post4.target_name == "魔法Zc目录 其他单机"
assert post4.pics == [
"https://i0.hdslb.com/bfs/live-key-frame/keyframe10170435000003044248mwowx0.jpg"
]
assert post4.compress == True
assert post4.pics == ["https://i0.hdslb.com/bfs/live-key-frame/keyframe10170435000003044248mwowx0.jpg"]
assert post4.compress is True

View File

@ -1,15 +1,15 @@
import pytest
import respx
from httpx import AsyncClient, Response
import pytest
from nonebug.app import App
from httpx import Response, AsyncClient
from .utils import get_json
@pytest.fixture
@pytest.fixture()
def ff14(app: App):
from nonebot_bison.platform import platform_manager
from nonebot_bison.utils import ProcessContext
from nonebot_bison.platform import platform_manager
return platform_manager["ff14"](ProcessContext(), AsyncClient())
@ -26,9 +26,7 @@ def ff14_newdata_json_1():
@pytest.mark.asyncio
@respx.mock
async def test_fetch_new(
ff14, dummy_user_subinfo, ff14_newdata_json_0, ff14_newdata_json_1
):
async def test_fetch_new(ff14, dummy_user_subinfo, ff14_newdata_json_0, ff14_newdata_json_1):
newdata = respx.get(
"https://cqnews.web.sdo.com/api/news/newsList?gameCode=ff&CategoryCode=5309,5310,5311,5312,5313&pageIndex=0&pageSize=5"
)

View File

@ -1,16 +1,16 @@
import pytest
import respx
import pytest
from flaky import flaky
from httpx import AsyncClient, Response
from nonebug.app import App
from httpx import Response, AsyncClient
from .utils import get_file, get_json
@pytest.fixture
@pytest.fixture()
def mcbbsnews(app: App):
from nonebot_bison.platform import platform_manager
from nonebot_bison.utils import ProcessContext
from nonebot_bison.platform import platform_manager
return platform_manager["mcbbsnews"](ProcessContext(), AsyncClient())
@ -26,26 +26,14 @@ def raw_post_list():
@flaky(max_runs=3, min_passes=1)
async def test_fetch_new(mcbbsnews, dummy_user_subinfo, raw_post_list):
news_router = respx.get("https://www.mcbbs.net/forum-news-1.html")
news_router.mock(
return_value=Response(
200, text=get_file("mcbbsnews/mock/mcbbsnews_post_list_html-0.html")
)
)
news_router.mock(return_value=Response(200, text=get_file("mcbbsnews/mock/mcbbsnews_post_list_html-0.html")))
new_post = respx.get("https://www.mcbbs.net/thread-1340927-1-1.html")
new_post.mock(
return_value=Response(
200, text=get_file("mcbbsnews/mock/mcbbsnews_new_post_html.html")
)
)
new_post.mock(return_value=Response(200, text=get_file("mcbbsnews/mock/mcbbsnews_new_post_html.html")))
target = ""
res = await mcbbsnews.fetch_new_post(target, [dummy_user_subinfo])
assert news_router.called
assert len(res) == 0
news_router.mock(
return_value=Response(
200, text=get_file("mcbbsnews/mock/mcbbsnews_post_list_html-1.html")
)
)
news_router.mock(return_value=Response(200, text=get_file("mcbbsnews/mock/mcbbsnews_post_list_html-1.html")))
res = await mcbbsnews.fetch_new_post(target, [dummy_user_subinfo])
assert news_router.called
post = res[0][1][0]
@ -63,20 +51,12 @@ async def test_fetch_new(mcbbsnews, dummy_user_subinfo, raw_post_list):
@flaky(max_runs=3, min_passes=1)
async def test_news_render(mcbbsnews, dummy_user_subinfo):
new_post = respx.get("https://www.mcbbs.net/thread-1340927-1-1.html")
new_post.mock(
return_value=Response(
200, text=get_file("mcbbsnews/mock/mcbbsnews_new_post_html.html")
)
)
pics = await mcbbsnews._news_render(
"https://www.mcbbs.net/thread-1340927-1-1.html", "#post_25849603"
)
new_post.mock(return_value=Response(200, text=get_file("mcbbsnews/mock/mcbbsnews_new_post_html.html")))
pics = await mcbbsnews._news_render("https://www.mcbbs.net/thread-1340927-1-1.html", "#post_25849603")
assert len(pics) == 1
pics_err_on_assert = await mcbbsnews._news_render("", "##post_25849603")
assert len(pics_err_on_assert) == 2
pics_err_on_other = await mcbbsnews._news_render(
"https://www.mcbbs.net/thread-1340927-1-1.html", "#post_err"
)
pics_err_on_other = await mcbbsnews._news_render("https://www.mcbbs.net/thread-1340927-1-1.html", "#post_err")
assert len(pics_err_on_other) == 2

View File

@ -1,10 +1,10 @@
import time
import typing
import pytest
import respx
from httpx import AsyncClient, Response
import pytest
from nonebug.app import App
from httpx import Response, AsyncClient
from .utils import get_json
@ -12,10 +12,10 @@ if typing.TYPE_CHECKING:
from nonebot_bison.platform.ncm import NcmArtist
@pytest.fixture
@pytest.fixture()
def ncm_artist(app: App):
from nonebot_bison.platform import platform_manager
from nonebot_bison.utils import ProcessContext
from nonebot_bison.platform import platform_manager
return platform_manager["ncm-artist"](ProcessContext(), AsyncClient())

View File

@ -1,10 +1,10 @@
import time
import typing
import pytest
import respx
from httpx import AsyncClient, Response
import pytest
from nonebug.app import App
from httpx import Response, AsyncClient
from .utils import get_json
@ -12,10 +12,10 @@ if typing.TYPE_CHECKING:
from nonebot_bison.platform.ncm import NcmRadio
@pytest.fixture
@pytest.fixture()
def ncm_radio(app: App):
from nonebot_bison.platform import platform_manager
from nonebot_bison.utils import ProcessContext
from nonebot_bison.platform import platform_manager
return platform_manager["ncm-radio"](ProcessContext(), AsyncClient())
@ -50,13 +50,10 @@ async def test_fetch_new(ncm_radio, ncm_radio_0, ncm_radio_1, dummy_user_subinfo
ncm_router.mock(return_value=Response(200, json=ncm_radio_1))
res2 = await ncm_radio.fetch_new_post(target, [dummy_user_subinfo])
post = res2[0][1][0]
print(post)
assert post.target_type == "ncm-radio"
assert post.text == "网易云电台更新:「松烟行动」灰齐山麓"
assert post.url == "https://music.163.com/#/program/2494997688"
assert post.pics == [
"http://p1.music.126.net/H5em5xUNIYXcjJhOmeaSqQ==/109951166647436789.jpg"
]
assert post.pics == ["http://p1.music.126.net/H5em5xUNIYXcjJhOmeaSqQ==/109951166647436789.jpg"]
assert post.target_name == "《明日方舟》游戏原声OST"

View File

@ -1,19 +1,14 @@
from time import time
from typing import TYPE_CHECKING, Any
from typing import Any
import pytest
from httpx import AsyncClient
from nonebug.app import App
if TYPE_CHECKING:
from nonebot_bison.platform import Platform
from httpx import AsyncClient
now = time()
passed = now - 3 * 60 * 60
raw_post_list_1 = [
{"id": 1, "text": "p1", "date": now, "tags": ["tag1"], "category": 1}
]
raw_post_list_1 = [{"id": 1, "text": "p1", "date": now, "tags": ["tag1"], "category": 1}]
raw_post_list_2 = raw_post_list_1 + [
{"id": 2, "text": "p2", "date": now, "tags": ["tag1"], "category": 1},
@ -22,7 +17,7 @@ raw_post_list_2 = raw_post_list_1 + [
]
@pytest.fixture
@pytest.fixture()
def dummy_user(app: App):
from nonebot_plugin_saa import TargetQQGroup
@ -30,7 +25,7 @@ def dummy_user(app: App):
return user
@pytest.fixture
@pytest.fixture()
def user_info_factory(app: App, dummy_user):
from nonebot_bison.types import UserSubInfo
@ -40,14 +35,13 @@ def user_info_factory(app: App, dummy_user):
return _user_info
@pytest.fixture
@pytest.fixture()
def mock_platform_without_cats_tags(app: App):
from nonebot_bison.platform.platform import NewMessage
from nonebot_bison.post import Post
from nonebot_bison.types import RawPost, Target
from nonebot_bison.types import Target, RawPost
from nonebot_bison.platform.platform import NewMessage
class MockPlatform(NewMessage):
platform_name = "mock_platform"
name = "Mock Platform"
enabled = True
@ -88,21 +82,19 @@ def mock_platform_without_cats_tags(app: App):
return MockPlatform
@pytest.fixture
@pytest.fixture()
def mock_platform(app: App):
from nonebot_bison.platform.platform import NewMessage
from nonebot_bison.post import Post
from nonebot_bison.types import Category, RawPost, Tag, Target
from nonebot_bison.utils import SchedulerConfig
from nonebot_bison.platform.platform import NewMessage
from nonebot_bison.types import Tag, Target, RawPost, Category
class MockPlatformSchedConf(SchedulerConfig):
name = "mock"
schedule_type = "interval"
schedule_setting = {"seconds": 100}
class MockPlatform(NewMessage):
platform_name = "mock_platform"
name = "Mock Platform"
enabled = True
@ -152,12 +144,11 @@ def mock_platform(app: App):
return MockPlatform
@pytest.fixture
@pytest.fixture()
def mock_scheduler_conf(app):
from nonebot_bison.utils import SchedulerConfig
class MockPlatformSchedConf(SchedulerConfig):
name = "mock"
schedule_type = "interval"
schedule_setting = {"seconds": 100}
@ -165,14 +156,13 @@ def mock_scheduler_conf(app):
return MockPlatformSchedConf
@pytest.fixture
@pytest.fixture()
def mock_platform_no_target(app: App, mock_scheduler_conf):
from nonebot_bison.platform.platform import CategoryNotSupport, NewMessage
from nonebot_bison.post import Post
from nonebot_bison.types import Category, RawPost, Tag, Target
from nonebot_bison.types import Tag, Target, RawPost, Category
from nonebot_bison.platform.platform import NewMessage, CategoryNotSupport
class MockPlatform(NewMessage):
platform_name = "mock_platform"
name = "Mock Platform"
enabled = True
@ -221,15 +211,13 @@ def mock_platform_no_target(app: App, mock_scheduler_conf):
return MockPlatform
@pytest.fixture
@pytest.fixture()
def mock_platform_no_target_2(app: App, mock_scheduler_conf):
from nonebot_bison.platform.platform import NewMessage
from nonebot_bison.post import Post
from nonebot_bison.types import Category, RawPost, Tag, Target
from nonebot_bison.utils import SchedulerConfig
from nonebot_bison.platform.platform import NewMessage
from nonebot_bison.types import Tag, Target, RawPost, Category
class MockPlatform(NewMessage):
platform_name = "mock_platform"
name = "Mock Platform"
enabled = True
@ -270,9 +258,7 @@ def mock_platform_no_target_2(app: App, mock_scheduler_conf):
@classmethod
async def get_sub_list(cls, _: "Target"):
list_1 = [
{"id": 5, "text": "p5", "date": now, "tags": ["tag1"], "category": 4}
]
list_1 = [{"id": 5, "text": "p5", "date": now, "tags": ["tag1"], "category": 4}]
list_2 = list_1 + [
{"id": 6, "text": "p6", "date": now, "tags": ["tag1"], "category": 4},
@ -287,14 +273,13 @@ def mock_platform_no_target_2(app: App, mock_scheduler_conf):
return MockPlatform
@pytest.fixture
@pytest.fixture()
def mock_status_change(app: App):
from nonebot_bison.platform.platform import StatusChange
from nonebot_bison.post import Post
from nonebot_bison.types import Category, RawPost, Tag, Target
from nonebot_bison.platform.platform import StatusChange
from nonebot_bison.types import Target, RawPost, Category
class MockPlatform(StatusChange):
platform_name = "mock_platform"
name = "Mock Platform"
enabled = True
@ -322,9 +307,9 @@ def mock_status_change(app: App):
return {"s": False}
def compare_status(self, target, old_status, new_status) -> list["RawPost"]:
if old_status["s"] == False and new_status["s"] == True:
if old_status["s"] is False and new_status["s"] is True:
return [{"text": "on", "cat": 1}]
elif old_status["s"] == True and new_status["s"] == False:
elif old_status["s"] is True and new_status["s"] is False:
return [{"text": "off", "cat": 2}]
return []
@ -338,18 +323,14 @@ def mock_status_change(app: App):
@pytest.mark.asyncio
async def test_new_message_target_without_cats_tags(
mock_platform_without_cats_tags, user_info_factory
):
async def test_new_message_target_without_cats_tags(mock_platform_without_cats_tags, user_info_factory):
from nonebot_bison.utils import ProcessContext
res1 = await mock_platform_without_cats_tags(
ProcessContext(), AsyncClient()
).fetch_new_post("dummy", [user_info_factory([1, 2], [])])
res1 = await mock_platform_without_cats_tags(ProcessContext(), AsyncClient()).fetch_new_post(
"dummy", [user_info_factory([1, 2], [])]
)
assert len(res1) == 0
res2 = await mock_platform_without_cats_tags(
ProcessContext(), AsyncClient()
).fetch_new_post(
res2 = await mock_platform_without_cats_tags(ProcessContext(), AsyncClient()).fetch_new_post(
"dummy",
[
user_info_factory([], []),
@ -358,8 +339,10 @@ async def test_new_message_target_without_cats_tags(
assert len(res2) == 1
posts_1 = res2[0][1]
assert len(posts_1) == 3
id_set_1 = set(map(lambda x: x.text, posts_1))
assert "p2" in id_set_1 and "p3" in id_set_1 and "p4" in id_set_1
id_set_1 = {x.text for x in posts_1}
assert "p2" in id_set_1
assert "p3" in id_set_1
assert "p4" in id_set_1
@pytest.mark.asyncio
@ -385,10 +368,11 @@ async def test_new_message_target(mock_platform, user_info_factory):
assert len(posts_1) == 2
assert len(posts_2) == 1
assert len(posts_3) == 1
id_set_1 = set(map(lambda x: x.text, posts_1))
id_set_2 = set(map(lambda x: x.text, posts_2))
id_set_3 = set(map(lambda x: x.text, posts_3))
assert "p2" in id_set_1 and "p3" in id_set_1
id_set_1 = {x.text for x in posts_1}
id_set_2 = {x.text for x in posts_2}
id_set_3 = {x.text for x in posts_3}
assert "p2" in id_set_1
assert "p3" in id_set_1
assert "p2" in id_set_2
assert "p2" in id_set_3
@ -397,13 +381,11 @@ async def test_new_message_target(mock_platform, user_info_factory):
async def test_new_message_no_target(mock_platform_no_target, user_info_factory):
from nonebot_bison.utils import ProcessContext
res1 = await mock_platform_no_target(
ProcessContext(), AsyncClient()
).fetch_new_post("dummy", [user_info_factory([1, 2], [])])
res1 = await mock_platform_no_target(ProcessContext(), AsyncClient()).fetch_new_post(
"dummy", [user_info_factory([1, 2], [])]
)
assert len(res1) == 0
res2 = await mock_platform_no_target(
ProcessContext(), AsyncClient()
).fetch_new_post(
res2 = await mock_platform_no_target(ProcessContext(), AsyncClient()).fetch_new_post(
"dummy",
[
user_info_factory([1, 2], []),
@ -418,15 +400,16 @@ async def test_new_message_no_target(mock_platform_no_target, user_info_factory)
assert len(posts_1) == 2
assert len(posts_2) == 1
assert len(posts_3) == 1
id_set_1 = set(map(lambda x: x.text, posts_1))
id_set_2 = set(map(lambda x: x.text, posts_2))
id_set_3 = set(map(lambda x: x.text, posts_3))
assert "p2" in id_set_1 and "p3" in id_set_1
id_set_1 = {x.text for x in posts_1}
id_set_2 = {x.text for x in posts_2}
id_set_3 = {x.text for x in posts_3}
assert "p2" in id_set_1
assert "p3" in id_set_1
assert "p2" in id_set_2
assert "p2" in id_set_3
res3 = await mock_platform_no_target(
ProcessContext(), AsyncClient()
).fetch_new_post("dummy", [user_info_factory([1, 2], [])])
res3 = await mock_platform_no_target(ProcessContext(), AsyncClient()).fetch_new_post(
"dummy", [user_info_factory([1, 2], [])]
)
assert len(res3) == 0
@ -469,23 +452,22 @@ async def test_group(
mock_platform_no_target_2,
user_info_factory,
):
from nonebot_bison.types import Target
from nonebot_bison.utils import ProcessContext, http_client
from nonebot_bison.platform.platform import make_no_target_group
from nonebot_bison.post import Post
from nonebot_bison.types import Category, RawPost, Tag, Target
from nonebot_bison.utils import ProcessContext
group_platform_class = make_no_target_group(
[mock_platform_no_target, mock_platform_no_target_2]
)
group_platform = group_platform_class(ProcessContext(), None)
res1 = await group_platform.fetch_new_post("dummy", [user_info_factory([1, 4], [])])
dummy = Target("dummy")
group_platform_class = make_no_target_group([mock_platform_no_target, mock_platform_no_target_2])
group_platform = group_platform_class(ProcessContext(), http_client())
res1 = await group_platform.fetch_new_post(dummy, [user_info_factory([1, 4], [])])
assert len(res1) == 0
res2 = await group_platform.fetch_new_post("dummy", [user_info_factory([1, 4], [])])
res2 = await group_platform.fetch_new_post(dummy, [user_info_factory([1, 4], [])])
assert len(res2) == 1
posts = res2[0][1]
assert len(posts) == 2
id_set_2 = set(map(lambda x: x.text, posts))
assert "p2" in id_set_2 and "p6" in id_set_2
res3 = await group_platform.fetch_new_post("dummy", [user_info_factory([1, 4], [])])
id_set_2 = {x.text for x in posts}
assert "p2" in id_set_2
assert "p6" in id_set_2
res3 = await group_platform.fetch_new_post(dummy, [user_info_factory([1, 4], [])])
assert len(res3) == 0

View File

@ -1,27 +1,25 @@
import typing
from datetime import datetime
import feedparser
import pytest
import respx
from httpx import AsyncClient, Response
from nonebug.app import App
import pytest
import feedparser
from pytz import timezone
from nonebug.app import App
from httpx import Response, AsyncClient
from .utils import get_file, get_json
if typing.TYPE_CHECKING:
from nonebot_bison.platform.weibo import Weibo
image_cdn_router = respx.route(
host__regex=r"wx\d.sinaimg.cn", path__startswith="/large/"
)
image_cdn_router = respx.route(host__regex=r"wx\d.sinaimg.cn", path__startswith="/large/")
@pytest.fixture
@pytest.fixture()
def weibo(app: App):
from nonebot_bison.platform import platform_manager
from nonebot_bison.utils import ProcessContext
from nonebot_bison.platform import platform_manager
return platform_manager["weibo"](ProcessContext(), AsyncClient())
@ -34,12 +32,8 @@ def weibo_ak_list_1():
@pytest.mark.asyncio
@respx.mock
async def test_get_name(weibo):
profile_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=1005056279793937"
)
profile_router.mock(
return_value=Response(200, json=get_json("weibo_ak_profile.json"))
)
profile_router = respx.get("https://m.weibo.cn/api/container/getIndex?containerid=1005056279793937")
profile_router.mock(return_value=Response(200, json=get_json("weibo_ak_profile.json")))
name = await weibo.get_target_name(AsyncClient(), "6279793937")
assert name == "明日方舟Arknights"
@ -47,16 +41,10 @@ async def test_get_name(weibo):
@pytest.mark.asyncio
@respx.mock
async def test_fetch_new(weibo, dummy_user_subinfo):
ak_list_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=1076036279793937"
)
ak_list_router = respx.get("https://m.weibo.cn/api/container/getIndex?containerid=1076036279793937")
detail_router = respx.get("https://m.weibo.cn/detail/4649031014551911")
ak_list_router.mock(
return_value=Response(200, json=get_json("weibo_ak_list_0.json"))
)
detail_router.mock(
return_value=Response(200, text=get_file("weibo_detail_4649031014551911"))
)
ak_list_router.mock(return_value=Response(200, json=get_json("weibo_ak_list_0.json")))
detail_router.mock(return_value=Response(200, text=get_file("weibo_detail_4649031014551911")))
image_cdn_router.mock(Response(200, content=b""))
target = "6279793937"
res = await weibo.fetch_new_post(target, [dummy_user_subinfo])
@ -67,9 +55,9 @@ async def test_fetch_new(weibo, dummy_user_subinfo):
ak_list_router.mock(return_value=Response(200, json=mock_data))
res2 = await weibo.fetch_new_post(target, [dummy_user_subinfo])
assert len(res2) == 0
mock_data["data"]["cards"][1]["mblog"]["created_at"] = datetime.now(
timezone("Asia/Shanghai")
).strftime("%a %b %d %H:%M:%S %z %Y")
mock_data["data"]["cards"][1]["mblog"]["created_at"] = datetime.now(timezone("Asia/Shanghai")).strftime(
"%a %b %d %H:%M:%S %z %Y"
)
ak_list_router.mock(return_value=Response(200, json=mock_data))
res3 = await weibo.fetch_new_post(target, [dummy_user_subinfo])
assert len(res3[0][1]) == 1
@ -100,12 +88,10 @@ async def test_classification(weibo):
@respx.mock
async def test_parse_long(weibo):
detail_router = respx.get("https://m.weibo.cn/detail/4645748019299849")
detail_router.mock(
return_value=Response(200, text=get_file("weibo_detail_4645748019299849"))
)
detail_router.mock(return_value=Response(200, text=get_file("weibo_detail_4645748019299849")))
raw_post = get_json("weibo_ak_list_1.json")["data"]["cards"][0]
post = await weibo.parse(raw_post)
assert not "全文" in post.text
assert "全文" not in post.text
assert detail_router.called
@ -122,7 +108,7 @@ async def test_rsshub_compare(weibo):
posts = []
for raw_post in raw_posts:
posts.append(await weibo.parse(raw_post))
url_set = set(map(lambda x: x.url, posts))
url_set = {x.url for x in posts}
feedres = feedparser.parse("https://rsshub.app/weibo/user/6279793937")
for entry in feedres.entries[:5]:
# print(entry)
@ -131,7 +117,7 @@ async def test_rsshub_compare(weibo):
test_post = {
"mblog": {
"text": '<a href="https://m.weibo.cn/search?containerid=231522type%3D1%26t%3D10%26q%3D%23%E5%88%9A%E5%87%BA%E7%94%9F%E7%9A%84%E5%B0%8F%E7%BE%8A%E9%A9%BC%E9%95%BF%E5%95%A5%E6%A0%B7%23&extparam=%23%E5%88%9A%E5%87%BA%E7%94%9F%E7%9A%84%E5%B0%8F%E7%BE%8A%E9%A9%BC%E9%95%BF%E5%95%A5%E6%A0%B7%23&luicode=10000011&lfid=1076036003966749" data-hide=""><span class="surl-text">#刚出生的小羊驼长啥样#</span></a> <br />小羊驼三三来也<span class="url-icon"><img alt=[好喜欢] src="https://h5.sinaimg.cn/m/emoticon/icon/lxh/lxh_haoxihuan-51860b62e6.png" style="width:1em; height:1em;" /></span><br /><a href="https://m.weibo.cn/p/index?extparam=%E5%B0%8F%E7%BE%8A%E9%A9%BC%E4%B8%89%E4%B8%89&containerid=1008085ae16d2046db677de1b8491d2b708597&luicode=10000011&lfid=1076036003966749" data-hide=""><span class=\'url-icon\'><img style=\'width: 1rem;height: 1rem\' src=\'https://n.sinaimg.cn/photo/5213b46e/20180926/timeline_card_small_super_default.png\'></span><span class="surl-text">小羊驼三三</span></a> ',
"text": '<a href="https://m.weibo.cn/search?containerid=231522type%3D1%26t%3D10%26q%3D%23%E5%88%9A%E5%87%BA%E7%94%9F%E7%9A%84%E5%B0%8F%E7%BE%8A%E9%A9%BC%E9%95%BF%E5%95%A5%E6%A0%B7%23&extparam=%23%E5%88%9A%E5%87%BA%E7%94%9F%E7%9A%84%E5%B0%8F%E7%BE%8A%E9%A9%BC%E9%95%BF%E5%95%A5%E6%A0%B7%23&luicode=10000011&lfid=1076036003966749" data-hide=""><span class="surl-text">#刚出生的小羊驼长啥样#</span></a> <br />小羊驼三三来也<span class="url-icon"><img alt=[好喜欢] src="https://h5.sinaimg.cn/m/emoticon/icon/lxh/lxh_haoxihuan-51860b62e6.png" style="width:1em; height:1em;" /></span><br /><a href="https://m.weibo.cn/p/index?extparam=%E5%B0%8F%E7%BE%8A%E9%A9%BC%E4%B8%89%E4%B8%89&containerid=1008085ae16d2046db677de1b8491d2b708597&luicode=10000011&lfid=1076036003966749" data-hide=""><span class=\'url-icon\'><img style=\'width: 1rem;height: 1rem\' src=\'https://n.sinaimg.cn/photo/5213b46e/20180926/timeline_card_small_super_default.png\'></span><span class="surl-text">小羊驼三三</span></a> ', # noqa
"bid": "KnssqeqKK",
}
}

View File

@ -5,12 +5,12 @@ path = Path(__file__).parent / "static"
def get_json(file_name: str):
with open(path / file_name, "r", encoding="utf8") as f:
with open(path / file_name, encoding="utf8") as f:
file_text = f.read()
return json.loads(file_text)
def get_file(file_name: str):
with open(path / file_name, "r", encoding="utf8") as f:
with open(path / file_name, encoding="utf8") as f:
file_text = f.read()
return file_text

View File

@ -1,6 +1,5 @@
import typing
from datetime import time
from typing import Type
from nonebug import App
from pytest_mock import MockerFixture
@ -9,11 +8,7 @@ if typing.TYPE_CHECKING:
from nonebot_bison.utils.scheduler_config import SchedulerConfig
async def get_schedule_times(
scheduler_config: Type["SchedulerConfig"], time: int
) -> dict[str, int]:
from nonebot_plugin_saa import TargetQQGroup
async def get_schedule_times(scheduler_config: type["SchedulerConfig"], time: int) -> dict[str, int]:
from nonebot_bison.scheduler import scheduler_dict
scheduler = scheduler_dict[scheduler_config]
@ -30,27 +25,17 @@ async def test_scheduler_without_time(init_scheduler):
from nonebot_plugin_saa import TargetQQGroup
from nonebot_bison.config import config
from nonebot_bison.config.db_config import WeightConfig
from nonebot_bison.platform.bilibili import BilibiliSchedConf
from nonebot_bison.scheduler.manager import init_scheduler
from nonebot_bison.types import Target as T_Target
from nonebot_bison.config.db_config import WeightConfig
from nonebot_bison.scheduler.manager import init_scheduler
from nonebot_bison.platform.bilibili import BilibiliSchedConf
await config.add_subscribe(
TargetQQGroup(group_id=123), T_Target("t1"), "target1", "bilibili", [], []
)
await config.add_subscribe(
TargetQQGroup(group_id=123), T_Target("t2"), "target1", "bilibili", [], []
)
await config.add_subscribe(
TargetQQGroup(group_id=123), T_Target("t2"), "target1", "bilibili-live", [], []
)
await config.add_subscribe(TargetQQGroup(group_id=123), T_Target("t1"), "target1", "bilibili", [], [])
await config.add_subscribe(TargetQQGroup(group_id=123), T_Target("t2"), "target1", "bilibili", [], [])
await config.add_subscribe(TargetQQGroup(group_id=123), T_Target("t2"), "target1", "bilibili-live", [], [])
await config.update_time_weight_config(
T_Target("t2"), "bilibili", WeightConfig(default=20, time_config=[])
)
await config.update_time_weight_config(
T_Target("t2"), "bilibili-live", WeightConfig(default=30, time_config=[])
)
await config.update_time_weight_config(T_Target("t2"), "bilibili", WeightConfig(default=20, time_config=[]))
await config.update_time_weight_config(T_Target("t2"), "bilibili-live", WeightConfig(default=30, time_config=[]))
await init_scheduler()
@ -69,34 +54,24 @@ async def test_scheduler_with_time(app: App, init_scheduler, mocker: MockerFixtu
from nonebot_plugin_saa import TargetQQGroup
from nonebot_bison.config import config, db_config
from nonebot_bison.config.db_config import TimeWeightConfig, WeightConfig
from nonebot_bison.platform.bilibili import BilibiliSchedConf
from nonebot_bison.scheduler.manager import init_scheduler
from nonebot_bison.types import Target as T_Target
from nonebot_bison.scheduler.manager import init_scheduler
from nonebot_bison.platform.bilibili import BilibiliSchedConf
from nonebot_bison.config.db_config import WeightConfig, TimeWeightConfig
await config.add_subscribe(
TargetQQGroup(group_id=123), T_Target("t1"), "target1", "bilibili", [], []
)
await config.add_subscribe(
TargetQQGroup(group_id=123), T_Target("t2"), "target1", "bilibili", [], []
)
await config.add_subscribe(
TargetQQGroup(group_id=123), T_Target("t2"), "target1", "bilibili-live", [], []
)
await config.add_subscribe(TargetQQGroup(group_id=123), T_Target("t1"), "target1", "bilibili", [], [])
await config.add_subscribe(TargetQQGroup(group_id=123), T_Target("t2"), "target1", "bilibili", [], [])
await config.add_subscribe(TargetQQGroup(group_id=123), T_Target("t2"), "target1", "bilibili-live", [], [])
await config.update_time_weight_config(
T_Target("t2"),
"bilibili",
WeightConfig(
default=20,
time_config=[
TimeWeightConfig(start_time=time(10), end_time=time(11), weight=1000)
],
time_config=[TimeWeightConfig(start_time=time(10), end_time=time(11), weight=1000)],
),
)
await config.update_time_weight_config(
T_Target("t2"), "bilibili-live", WeightConfig(default=30, time_config=[])
)
await config.update_time_weight_config(T_Target("t2"), "bilibili-live", WeightConfig(default=30, time_config=[]))
await init_scheduler()
@ -122,22 +97,16 @@ async def test_scheduler_add_new(init_scheduler):
from nonebot_plugin_saa import TargetQQGroup
from nonebot_bison.config import config
from nonebot_bison.platform.bilibili import BilibiliSchedConf
from nonebot_bison.scheduler.manager import init_scheduler
from nonebot_bison.types import Target as T_Target
from nonebot_bison.scheduler.manager import init_scheduler
from nonebot_bison.platform.bilibili import BilibiliSchedConf
await config.add_subscribe(
TargetQQGroup(group_id=123), T_Target("t1"), "target1", "bilibili", [], []
)
await config.add_subscribe(TargetQQGroup(group_id=123), T_Target("t1"), "target1", "bilibili", [], [])
await init_scheduler()
await config.add_subscribe(
TargetQQGroup(group_id=2345), T_Target("t1"), "target1", "bilibili", [], []
)
await config.add_subscribe(
TargetQQGroup(group_id=123), T_Target("t2"), "target2", "bilibili", [], []
)
await config.add_subscribe(TargetQQGroup(group_id=2345), T_Target("t1"), "target1", "bilibili", [], [])
await config.add_subscribe(TargetQQGroup(group_id=123), T_Target("t2"), "target2", "bilibili", [], [])
stat_res = await get_schedule_times(BilibiliSchedConf, 1)
assert stat_res["bilibili-t2"] == 1
@ -146,16 +115,12 @@ async def test_schedule_delete(init_scheduler):
from nonebot_plugin_saa import TargetQQGroup
from nonebot_bison.config import config
from nonebot_bison.platform.bilibili import BilibiliSchedConf
from nonebot_bison.scheduler.manager import init_scheduler
from nonebot_bison.types import Target as T_Target
from nonebot_bison.scheduler.manager import init_scheduler
from nonebot_bison.platform.bilibili import BilibiliSchedConf
await config.add_subscribe(
TargetQQGroup(group_id=123), T_Target("t1"), "target1", "bilibili", [], []
)
await config.add_subscribe(
TargetQQGroup(group_id=123), T_Target("t2"), "target1", "bilibili", [], []
)
await config.add_subscribe(TargetQQGroup(group_id=123), T_Target("t1"), "target1", "bilibili", [], [])
await config.add_subscribe(TargetQQGroup(group_id=123), T_Target("t2"), "target1", "bilibili", [], [])
await init_scheduler()

View File

@ -1,5 +1,5 @@
import pytest
import respx
import pytest
from httpx import Response
from nonebug.app import App
from nonebug_saa import should_send_saa
@ -18,18 +18,10 @@ async def test_abort_add_on_platform(app: App, init_scheduler):
from nonebot_bison.platform import platform_manager
from nonebot_bison.sub_manager import add_sub_matcher, common_platform
ak_list_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=1005056279793937"
)
ak_list_router.mock(
return_value=Response(200, json=get_json("weibo_ak_profile.json"))
)
ak_list_bad_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=100505000"
)
ak_list_bad_router.mock(
return_value=Response(200, json=get_json("weibo_err_profile.json"))
)
ak_list_router = respx.get("https://m.weibo.cn/api/container/getIndex?containerid=1005056279793937")
ak_list_router.mock(return_value=Response(200, json=get_json("weibo_ak_profile.json")))
ak_list_bad_router = respx.get("https://m.weibo.cn/api/container/getIndex?containerid=100505000")
ak_list_bad_router.mock(return_value=Response(200, json=get_json("weibo_err_profile.json")))
async with app.test_matcher(add_sub_matcher) as ctx:
bot = ctx.create_bot()
event_1 = fake_group_message_event(
@ -63,22 +55,14 @@ async def test_abort_add_on_id(app: App, init_scheduler):
from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.platform import platform_manager
from nonebot_bison.platform.weibo import Weibo
from nonebot_bison.platform import platform_manager
from nonebot_bison.sub_manager import add_sub_matcher, common_platform
ak_list_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=1005056279793937"
)
ak_list_router.mock(
return_value=Response(200, json=get_json("weibo_ak_profile.json"))
)
ak_list_bad_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=100505000"
)
ak_list_bad_router.mock(
return_value=Response(200, json=get_json("weibo_err_profile.json"))
)
ak_list_router = respx.get("https://m.weibo.cn/api/container/getIndex?containerid=1005056279793937")
ak_list_router.mock(return_value=Response(200, json=get_json("weibo_ak_profile.json")))
ak_list_bad_router = respx.get("https://m.weibo.cn/api/container/getIndex?containerid=100505000")
ak_list_bad_router.mock(return_value=Response(200, json=get_json("weibo_err_profile.json")))
async with app.test_matcher(add_sub_matcher) as ctx:
bot = ctx.create_bot()
event_1 = fake_group_message_event(
@ -93,9 +77,7 @@ async def test_abort_add_on_id(app: App, init_scheduler):
BotReply.add_reply_on_platform(platform_manager, common_platform),
True,
)
event_2 = fake_group_message_event(
message=Message("weibo"), sender=fake_admin_user
)
event_2 = fake_group_message_event(message=Message("weibo"), sender=fake_admin_user)
ctx.receive_event(bot, event_2)
ctx.should_call_send(
event_2,
@ -121,22 +103,14 @@ async def test_abort_add_on_cats(app: App, init_scheduler):
from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.platform import platform_manager
from nonebot_bison.platform.weibo import Weibo
from nonebot_bison.platform import platform_manager
from nonebot_bison.sub_manager import add_sub_matcher, common_platform
ak_list_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=1005056279793937"
)
ak_list_router.mock(
return_value=Response(200, json=get_json("weibo_ak_profile.json"))
)
ak_list_bad_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=100505000"
)
ak_list_bad_router.mock(
return_value=Response(200, json=get_json("weibo_err_profile.json"))
)
ak_list_router = respx.get("https://m.weibo.cn/api/container/getIndex?containerid=1005056279793937")
ak_list_router.mock(return_value=Response(200, json=get_json("weibo_ak_profile.json")))
ak_list_bad_router = respx.get("https://m.weibo.cn/api/container/getIndex?containerid=100505000")
ak_list_bad_router.mock(return_value=Response(200, json=get_json("weibo_err_profile.json")))
async with app.test_matcher(add_sub_matcher) as ctx:
bot = ctx.create_bot()
event_1 = fake_group_message_event(
@ -148,29 +122,21 @@ async def test_abort_add_on_cats(app: App, init_scheduler):
ctx.should_pass_rule()
ctx.should_call_send(
event_1,
BotReply.add_reply_on_platform(
platform_manager=platform_manager, common_platform=common_platform
),
BotReply.add_reply_on_platform(platform_manager=platform_manager, common_platform=common_platform),
True,
)
event_2 = fake_group_message_event(
message=Message("weibo"), sender=fake_admin_user
)
event_2 = fake_group_message_event(message=Message("weibo"), sender=fake_admin_user)
ctx.receive_event(bot, event_2)
ctx.should_call_send(
event_2,
BotReply.add_reply_on_id(Weibo),
True,
)
event_3 = fake_group_message_event(
message=Message("6279793937"), sender=fake_admin_user
)
event_3 = fake_group_message_event(message=Message("6279793937"), sender=fake_admin_user)
ctx.receive_event(bot, event_3)
ctx.should_call_send(
event_3,
BotReply.add_reply_on_target_confirm(
"weibo", "明日方舟Arknights", "6279793937"
),
BotReply.add_reply_on_target_confirm("weibo", "明日方舟Arknights", "6279793937"),
True,
)
ctx.should_call_send(
@ -197,22 +163,14 @@ async def test_abort_add_on_tag(app: App, init_scheduler):
from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.platform import platform_manager
from nonebot_bison.platform.weibo import Weibo
from nonebot_bison.platform import platform_manager
from nonebot_bison.sub_manager import add_sub_matcher, common_platform
ak_list_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=1005056279793937"
)
ak_list_router.mock(
return_value=Response(200, json=get_json("weibo_ak_profile.json"))
)
ak_list_bad_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=100505000"
)
ak_list_bad_router.mock(
return_value=Response(200, json=get_json("weibo_err_profile.json"))
)
ak_list_router = respx.get("https://m.weibo.cn/api/container/getIndex?containerid=1005056279793937")
ak_list_router.mock(return_value=Response(200, json=get_json("weibo_ak_profile.json")))
ak_list_bad_router = respx.get("https://m.weibo.cn/api/container/getIndex?containerid=100505000")
ak_list_bad_router.mock(return_value=Response(200, json=get_json("weibo_err_profile.json")))
async with app.test_matcher(add_sub_matcher) as ctx:
bot = ctx.create_bot()
event_1 = fake_group_message_event(
@ -224,29 +182,21 @@ async def test_abort_add_on_tag(app: App, init_scheduler):
ctx.should_pass_rule()
ctx.should_call_send(
event_1,
BotReply.add_reply_on_platform(
platform_manager=platform_manager, common_platform=common_platform
),
BotReply.add_reply_on_platform(platform_manager=platform_manager, common_platform=common_platform),
True,
)
event_2 = fake_group_message_event(
message=Message("weibo"), sender=fake_admin_user
)
event_2 = fake_group_message_event(message=Message("weibo"), sender=fake_admin_user)
ctx.receive_event(bot, event_2)
ctx.should_call_send(
event_2,
BotReply.add_reply_on_id(Weibo),
True,
)
event_3 = fake_group_message_event(
message=Message("6279793937"), sender=fake_admin_user
)
event_3 = fake_group_message_event(message=Message("6279793937"), sender=fake_admin_user)
ctx.receive_event(bot, event_3)
ctx.should_call_send(
event_3,
BotReply.add_reply_on_target_confirm(
"weibo", "明日方舟Arknights", "6279793937"
),
BotReply.add_reply_on_target_confirm("weibo", "明日方舟Arknights", "6279793937"),
True,
)
ctx.should_call_send(
@ -254,9 +204,7 @@ async def test_abort_add_on_tag(app: App, init_scheduler):
BotReply.add_reply_on_cats(platform_manager, "weibo"),
True,
)
event_4 = fake_group_message_event(
message=Message("图文 文字"), sender=fake_admin_user
)
event_4 = fake_group_message_event(message=Message("图文 文字"), sender=fake_admin_user)
ctx.receive_event(bot, event_4)
ctx.should_call_send(event_4, BotReply.add_reply_on_tags, True)
event_abort = fake_group_message_event(
@ -276,12 +224,12 @@ async def test_abort_add_on_tag(app: App, init_scheduler):
async def test_abort_del_sub(app: App, init_scheduler):
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message
from nonebot_plugin_saa import MessageFactory, TargetQQGroup
from nonebot_plugin_saa import TargetQQGroup, MessageFactory
from nonebot_bison.config import config
from nonebot_bison.types import Target as T_Target
from nonebot_bison.platform import platform_manager
from nonebot_bison.sub_manager import del_sub_matcher
from nonebot_bison.types import Target as T_Target
await config.add_subscribe(
TargetQQGroup(group_id=10000),
@ -294,23 +242,19 @@ async def test_abort_del_sub(app: App, init_scheduler):
async with app.test_matcher(del_sub_matcher) as ctx:
bot = ctx.create_bot(base=Bot)
assert isinstance(bot, Bot)
event = fake_group_message_event(
message=Message("删除订阅"), to_me=True, sender=fake_admin_user
)
event = fake_group_message_event(message=Message("删除订阅"), to_me=True, sender=fake_admin_user)
ctx.receive_event(bot, event)
ctx.should_pass_rule()
ctx.should_pass_permission()
should_send_saa(
ctx,
MessageFactory(
"订阅的帐号为:\n1 weibo 明日方舟Arknights 6279793937\n [图文] 明日方舟\n请输入要删除的订阅的序号\n输入'取消'中止"
"订阅的帐号为:\n1 weibo 明日方舟Arknights 6279793937\n [图文] 明日方舟\n请输入要删除的订阅的序号\n输入'取消'中止" # noqa: E501
),
bot,
event=event,
)
event_abort = fake_group_message_event(
message=Message("取消"), sender=fake_admin_user
)
event_abort = fake_group_message_event(message=Message("取消"), sender=fake_admin_user)
ctx.receive_event(bot, event_abort)
ctx.should_call_send(event_abort, "删除中止", True)
ctx.should_finished()

View File

@ -1,17 +1,11 @@
import pytest
import respx
import pytest
from httpx import Response
from nonebug.app import App
from nonebug_saa import should_send_saa
from pytest_mock import MockerFixture
from ..platforms.utils import get_json
from ..utils import (
BotReply,
add_reply_on_id_input_search,
fake_admin_user,
fake_group_message_event,
)
from ..utils import BotReply, fake_admin_user, fake_group_message_event, add_reply_on_id_input_search
@pytest.mark.asyncio
@ -25,9 +19,7 @@ async def test_configurable_at_me_true_failed(app: App):
plugin_config.bison_to_me = True
async with app.test_matcher(add_sub_matcher) as ctx:
bot = ctx.create_bot(base=Bot)
event = fake_group_message_event(
message=Message("添加订阅"), sender=fake_admin_user
)
event = fake_group_message_event(message=Message("添加订阅"), sender=fake_admin_user)
ctx.receive_event(bot, event)
ctx.should_not_pass_rule()
ctx.should_pass_permission()
@ -52,9 +44,7 @@ async def test_configurable_at_me_false(app: App):
plugin_config.bison_to_me = False
async with app.test_matcher(add_sub_matcher) as ctx:
bot = ctx.create_bot(base=Bot)
event = fake_group_message_event(
message=Message("添加订阅"), sender=fake_admin_user
)
event = fake_group_message_event(message=Message("添加订阅"), sender=fake_admin_user)
ctx.receive_event(bot, event)
ctx.should_call_send(
event,
@ -68,27 +58,19 @@ async def test_configurable_at_me_false(app: App):
@pytest.mark.asyncio
@respx.mock
async def test_add_with_target(app: App, init_scheduler):
from nonebot_plugin_saa import TargetQQGroup
from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11.message import Message
from nonebot_plugin_saa import TargetQQGroup
from nonebot_bison.config import config
from nonebot_bison.platform import platform_manager
from nonebot_bison.platform.weibo import Weibo
from nonebot_bison.platform import platform_manager
from nonebot_bison.sub_manager import add_sub_matcher, common_platform
ak_list_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=1005056279793937"
)
ak_list_router.mock(
return_value=Response(200, json=get_json("weibo_ak_profile.json"))
)
ak_list_bad_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=100505000"
)
ak_list_bad_router.mock(
return_value=Response(200, json=get_json("weibo_err_profile.json"))
)
ak_list_router = respx.get("https://m.weibo.cn/api/container/getIndex?containerid=1005056279793937")
ak_list_router.mock(return_value=Response(200, json=get_json("weibo_ak_profile.json")))
ak_list_bad_router = respx.get("https://m.weibo.cn/api/container/getIndex?containerid=100505000")
ak_list_bad_router.mock(return_value=Response(200, json=get_json("weibo_err_profile.json")))
async with app.test_matcher(add_sub_matcher) as ctx:
bot = ctx.create_bot()
@ -101,9 +83,7 @@ async def test_add_with_target(app: App, init_scheduler):
ctx.should_pass_rule()
ctx.should_call_send(
event_1,
BotReply.add_reply_on_platform(
platform_manager=platform_manager, common_platform=common_platform
),
BotReply.add_reply_on_platform(platform_manager=platform_manager, common_platform=common_platform),
True,
)
event_2 = fake_group_message_event(
@ -116,30 +96,22 @@ async def test_add_with_target(app: App, init_scheduler):
BotReply.add_reply_on_platform_input_allplatform(platform_manager),
True,
)
event_3 = fake_group_message_event(
message=Message("weibo"), sender=fake_admin_user
)
event_3 = fake_group_message_event(message=Message("weibo"), sender=fake_admin_user)
ctx.receive_event(bot, event_3)
ctx.should_call_send(
event_3,
BotReply.add_reply_on_id(Weibo),
True,
)
event_4_err = fake_group_message_event(
message=Message("000"), sender=fake_admin_user
)
event_4_err = fake_group_message_event(message=Message("000"), sender=fake_admin_user)
ctx.receive_event(bot, event_4_err)
ctx.should_call_send(event_4_err, BotReply.add_reply_on_id_input_error, True)
ctx.should_rejected()
event_4_ok = fake_group_message_event(
message=Message("6279793937"), sender=fake_admin_user
)
event_4_ok = fake_group_message_event(message=Message("6279793937"), sender=fake_admin_user)
ctx.receive_event(bot, event_4_ok)
ctx.should_call_send(
event_4_ok,
BotReply.add_reply_on_target_confirm(
"weibo", "明日方舟Arknights", "6279793937"
),
BotReply.add_reply_on_target_confirm("weibo", "明日方舟Arknights", "6279793937"),
True,
)
ctx.should_call_send(
@ -147,34 +119,20 @@ async def test_add_with_target(app: App, init_scheduler):
BotReply.add_reply_on_cats(platform_manager, "weibo"),
True,
)
event_5_err = fake_group_message_event(
message=Message("图文 文字 err"), sender=fake_admin_user
)
event_5_err = fake_group_message_event(message=Message("图文 文字 err"), sender=fake_admin_user)
ctx.receive_event(bot, event_5_err)
ctx.should_call_send(
event_5_err, BotReply.add_reply_on_cats_input_error("err"), True
)
ctx.should_call_send(event_5_err, BotReply.add_reply_on_cats_input_error("err"), True)
ctx.should_rejected()
event_5_ok = fake_group_message_event(
message=Message("图文 文字"), sender=fake_admin_user
)
event_5_ok = fake_group_message_event(message=Message("图文 文字"), sender=fake_admin_user)
ctx.receive_event(bot, event_5_ok)
ctx.should_call_send(event_5_ok, BotReply.add_reply_on_tags, True)
event_6_more_info = fake_group_message_event(
message=Message("详情"), sender=fake_admin_user
)
event_6_more_info = fake_group_message_event(message=Message("详情"), sender=fake_admin_user)
ctx.receive_event(bot, event_6_more_info)
ctx.should_call_send(
event_6_more_info, BotReply.add_reply_on_tags_need_more_info, True
)
ctx.should_call_send(event_6_more_info, BotReply.add_reply_on_tags_need_more_info, True)
ctx.should_rejected()
event_6_ok = fake_group_message_event(
message=Message("全部标签"), sender=fake_admin_user
)
event_6_ok = fake_group_message_event(message=Message("全部标签"), sender=fake_admin_user)
ctx.receive_event(bot, event_6_ok)
ctx.should_call_send(
event_6_ok, BotReply.add_reply_subscribe_success("明日方舟Arknights"), True
)
ctx.should_call_send(event_6_ok, BotReply.add_reply_subscribe_success("明日方舟Arknights"), True)
ctx.should_finished()
subs = await config.list_subscribe(TargetQQGroup(group_id=10000))
assert len(subs) == 1
@ -191,13 +149,13 @@ async def test_add_with_target(app: App, init_scheduler):
@pytest.mark.asyncio
@respx.mock
async def test_add_with_target_no_cat(app: App, init_scheduler):
from nonebot_plugin_saa import TargetQQGroup
from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11.message import Message
from nonebot_plugin_saa import TargetQQGroup
from nonebot_bison.config import config
from nonebot_bison.platform import platform_manager
from nonebot_bison.platform.ncm import NcmArtist
from nonebot_bison.platform import platform_manager
from nonebot_bison.sub_manager import add_sub_matcher, common_platform
ncm_router = respx.get("https://music.163.com/api/artist/albums/32540734")
@ -217,27 +175,21 @@ async def test_add_with_target_no_cat(app: App, init_scheduler):
BotReply.add_reply_on_platform(platform_manager, common_platform),
True,
)
event_3 = fake_group_message_event(
message=Message("ncm-artist"), sender=fake_admin_user
)
event_3 = fake_group_message_event(message=Message("ncm-artist"), sender=fake_admin_user)
ctx.receive_event(bot, event_3)
ctx.should_call_send(
event_3,
BotReply.add_reply_on_id(NcmArtist),
True,
)
event_4_ok = fake_group_message_event(
message=Message("32540734"), sender=fake_admin_user
)
event_4_ok = fake_group_message_event(message=Message("32540734"), sender=fake_admin_user)
ctx.receive_event(bot, event_4_ok)
ctx.should_call_send(
event_4_ok,
BotReply.add_reply_on_target_confirm("ncm-artist", "塞壬唱片-MSR", "32540734"),
True,
)
ctx.should_call_send(
event_4_ok, BotReply.add_reply_subscribe_success("塞壬唱片-MSR"), True
)
ctx.should_call_send(event_4_ok, BotReply.add_reply_subscribe_success("塞壬唱片-MSR"), True)
ctx.should_finished()
subs = await config.list_subscribe(TargetQQGroup(group_id=10000))
assert len(subs) == 1
@ -252,9 +204,9 @@ async def test_add_with_target_no_cat(app: App, init_scheduler):
@pytest.mark.asyncio
@respx.mock
async def test_add_no_target(app: App, init_scheduler):
from nonebot_plugin_saa import TargetQQGroup
from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11.message import Message
from nonebot_plugin_saa import TargetQQGroup
from nonebot_bison.config import config
from nonebot_bison.platform import platform_manager
@ -274,22 +226,16 @@ async def test_add_no_target(app: App, init_scheduler):
BotReply.add_reply_on_platform(platform_manager, common_platform),
True,
)
event_3 = fake_group_message_event(
message=Message("arknights"), sender=fake_admin_user
)
event_3 = fake_group_message_event(message=Message("arknights"), sender=fake_admin_user)
ctx.receive_event(bot, event_3)
ctx.should_call_send(
event_3,
BotReply.add_reply_on_cats(platform_manager, "arknights"),
True,
)
event_4 = fake_group_message_event(
message=Message("游戏公告"), sender=fake_admin_user
)
event_4 = fake_group_message_event(message=Message("游戏公告"), sender=fake_admin_user)
ctx.receive_event(bot, event_4)
ctx.should_call_send(
event_4, BotReply.add_reply_subscribe_success("明日方舟游戏信息"), True
)
ctx.should_call_send(event_4, BotReply.add_reply_subscribe_success("明日方舟游戏信息"), True)
ctx.should_finished()
subs = await config.list_subscribe(TargetQQGroup(group_id=10000))
assert len(subs) == 1
@ -340,32 +286,18 @@ async def test_platform_name_err(app: App):
@respx.mock
async def test_add_with_get_id(app: App):
from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11.message import Message, MessageSegment
from nonebot_plugin_saa import (
Custom,
MessageFactory,
SupportedAdapters,
TargetQQGroup,
Text,
)
from nonebot.adapters.onebot.v11.message import Message
from nonebot_plugin_saa import Text, TargetQQGroup, MessageFactory, SupportedAdapters
from nonebot_bison.config import config
from nonebot_bison.platform import platform_manager
from nonebot_bison.platform.weibo import Weibo
from nonebot_bison.platform import platform_manager
from nonebot_bison.sub_manager import add_sub_matcher, common_platform
ak_list_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=1005056279793937"
)
ak_list_router.mock(
return_value=Response(200, json=get_json("weibo_ak_profile.json"))
)
ak_list_bad_router = respx.get(
"https://m.weibo.cn/api/container/getIndex?containerid=100505000"
)
ak_list_bad_router.mock(
return_value=Response(200, json=get_json("weibo_err_profile.json"))
)
ak_list_router = respx.get("https://m.weibo.cn/api/container/getIndex?containerid=1005056279793937")
ak_list_router.mock(return_value=Response(200, json=get_json("weibo_ak_profile.json")))
ak_list_bad_router = respx.get("https://m.weibo.cn/api/container/getIndex?containerid=100505000")
ak_list_bad_router.mock(return_value=Response(200, json=get_json("weibo_err_profile.json")))
async with app.test_matcher(add_sub_matcher) as ctx:
bot = ctx.create_bot()
@ -378,23 +310,17 @@ async def test_add_with_get_id(app: App):
ctx.should_pass_rule()
ctx.should_call_send(
event_1,
BotReply.add_reply_on_platform(
platform_manager=platform_manager, common_platform=common_platform
),
BotReply.add_reply_on_platform(platform_manager=platform_manager, common_platform=common_platform),
True,
)
event_3 = fake_group_message_event(
message=Message("weibo"), sender=fake_admin_user
)
event_3 = fake_group_message_event(message=Message("weibo"), sender=fake_admin_user)
ctx.receive_event(bot, event_3)
ctx.should_call_send(
event_3,
BotReply.add_reply_on_id(Weibo),
True,
)
event_4_query = fake_group_message_event(
message=Message("查询"), sender=fake_admin_user
)
event_4_query = fake_group_message_event(message=Message("查询"), sender=fake_admin_user)
ctx.receive_event(bot, event_4_query)
should_send_saa(
ctx,
@ -427,21 +353,17 @@ async def test_add_with_get_id(app: App):
@pytest.mark.asyncio
@respx.mock
async def test_add_with_bilibili_target_parser(app: App, init_scheduler):
from nonebot_plugin_saa import TargetQQGroup
from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11.message import Message
from nonebot_plugin_saa import TargetQQGroup
from nonebot_bison.config import config
from nonebot_bison.platform import platform_manager
from nonebot_bison.platform.bilibili import Bilibili
from nonebot_bison.sub_manager import add_sub_matcher, common_platform
ak_list_router = respx.get(
"https://api.bilibili.com/x/web-interface/card?mid=161775300"
)
ak_list_router.mock(
return_value=Response(200, json=get_json("bilibili_arknights_profile.json"))
)
ak_list_router = respx.get("https://api.bilibili.com/x/web-interface/card?mid=161775300")
ak_list_router.mock(return_value=Response(200, json=get_json("bilibili_arknights_profile.json")))
bilibili_main_page_router = respx.get("https://www.bilibili.com/")
bilibili_main_page_router.mock(return_value=Response(200))
@ -457,9 +379,7 @@ async def test_add_with_bilibili_target_parser(app: App, init_scheduler):
ctx.should_pass_rule()
ctx.should_call_send(
event_1,
BotReply.add_reply_on_platform(
platform_manager=platform_manager, common_platform=common_platform
),
BotReply.add_reply_on_platform(platform_manager=platform_manager, common_platform=common_platform),
True,
)
event_2 = fake_group_message_event(
@ -472,9 +392,7 @@ async def test_add_with_bilibili_target_parser(app: App, init_scheduler):
BotReply.add_reply_on_platform_input_allplatform(platform_manager),
True,
)
event_3 = fake_group_message_event(
message=Message("bilibili"), sender=fake_admin_user
)
event_3 = fake_group_message_event(message=Message("bilibili"), sender=fake_admin_user)
ctx.receive_event(bot, event_3)
assert Bilibili.parse_target_promot
ctx.should_call_send(
@ -489,9 +407,7 @@ async def test_add_with_bilibili_target_parser(app: App, init_scheduler):
sender=fake_admin_user,
)
ctx.receive_event(bot, event_4_err1)
ctx.should_call_send(
event_4_err1, BotReply.add_reply_on_target_parse_input_error, True
)
ctx.should_call_send(event_4_err1, BotReply.add_reply_on_target_parse_input_error, True)
ctx.should_rejected()
event_4_err1 = fake_group_message_event(
@ -501,9 +417,7 @@ async def test_add_with_bilibili_target_parser(app: App, init_scheduler):
sender=fake_admin_user,
)
ctx.receive_event(bot, event_4_err1)
ctx.should_call_send(
event_4_err1, BotReply.add_reply_on_target_parse_input_error, True
)
ctx.should_call_send(event_4_err1, BotReply.add_reply_on_target_parse_input_error, True)
ctx.should_rejected()
event_4_ok = fake_group_message_event(
@ -523,18 +437,12 @@ async def test_add_with_bilibili_target_parser(app: App, init_scheduler):
BotReply.add_reply_on_cats(platform_manager, "bilibili"),
True,
)
event_5_ok = fake_group_message_event(
message=Message("视频"), sender=fake_admin_user
)
event_5_ok = fake_group_message_event(message=Message("视频"), sender=fake_admin_user)
ctx.receive_event(bot, event_5_ok)
ctx.should_call_send(event_5_ok, BotReply.add_reply_on_tags, True)
event_6 = fake_group_message_event(
message=Message("全部标签"), sender=fake_admin_user
)
event_6 = fake_group_message_event(message=Message("全部标签"), sender=fake_admin_user)
ctx.receive_event(bot, event_6)
ctx.should_call_send(
event_6, BotReply.add_reply_subscribe_success("明日方舟"), True
)
ctx.should_call_send(event_6, BotReply.add_reply_subscribe_success("明日方舟"), True)
ctx.should_finished()
subs = await config.list_subscribe(TargetQQGroup(group_id=10000))
assert len(subs) == 1
@ -549,21 +457,17 @@ async def test_add_with_bilibili_target_parser(app: App, init_scheduler):
@pytest.mark.asyncio
@respx.mock
async def test_add_with_bilibili_live_target_parser(app: App, init_scheduler):
from nonebot_plugin_saa import TargetQQGroup
from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11.message import Message
from nonebot_plugin_saa import TargetQQGroup
from nonebot_bison.config import config
from nonebot_bison.platform import platform_manager
from nonebot_bison.platform.bilibili import Bilibililive
from nonebot_bison.sub_manager import add_sub_matcher, common_platform
ak_list_router = respx.get(
"https://api.bilibili.com/x/web-interface/card?mid=161775300"
)
ak_list_router.mock(
return_value=Response(200, json=get_json("bilibili_arknights_profile.json"))
)
ak_list_router = respx.get("https://api.bilibili.com/x/web-interface/card?mid=161775300")
ak_list_router.mock(return_value=Response(200, json=get_json("bilibili_arknights_profile.json")))
bilibili_main_page_router = respx.get("https://www.bilibili.com/")
bilibili_main_page_router.mock(return_value=Response(200))
@ -579,9 +483,7 @@ async def test_add_with_bilibili_live_target_parser(app: App, init_scheduler):
ctx.should_pass_rule()
ctx.should_call_send(
event_1,
BotReply.add_reply_on_platform(
platform_manager=platform_manager, common_platform=common_platform
),
BotReply.add_reply_on_platform(platform_manager=platform_manager, common_platform=common_platform),
True,
)
event_2 = fake_group_message_event(
@ -594,9 +496,7 @@ async def test_add_with_bilibili_live_target_parser(app: App, init_scheduler):
BotReply.add_reply_on_platform_input_allplatform(platform_manager),
True,
)
event_3 = fake_group_message_event(
message=Message("bilibili-live"), sender=fake_admin_user
)
event_3 = fake_group_message_event(message=Message("bilibili-live"), sender=fake_admin_user)
ctx.receive_event(bot, event_3)
ctx.should_call_send(
event_3,
@ -619,22 +519,16 @@ async def test_add_with_bilibili_live_target_parser(app: App, init_scheduler):
BotReply.add_reply_on_cats(platform_manager, "bilibili-live"),
True,
)
event_5_ok = fake_group_message_event(
message=Message("开播提醒"), sender=fake_admin_user
)
event_5_ok = fake_group_message_event(message=Message("开播提醒"), sender=fake_admin_user)
ctx.receive_event(bot, event_5_ok)
ctx.should_call_send(
event_5_ok, BotReply.add_reply_subscribe_success("明日方舟"), True
)
ctx.should_call_send(event_5_ok, BotReply.add_reply_subscribe_success("明日方舟"), True)
ctx.should_finished()
subs = await config.list_subscribe(TargetQQGroup(group_id=10000))
assert len(subs) == 1
sub = subs[0]
assert sub.target.target == "161775300"
assert sub.tags == []
assert sub.categories == [
platform_manager["bilibili-live"].reverse_category["开播提醒"]
]
assert sub.categories == [platform_manager["bilibili-live"].reverse_category["开播提醒"]]
assert sub.target.platform_name == "bilibili-live"
assert sub.target.target_name == "明日方舟"
@ -642,21 +536,17 @@ async def test_add_with_bilibili_live_target_parser(app: App, init_scheduler):
@pytest.mark.asyncio
@respx.mock
async def test_add_with_bilibili_bangumi_target_parser(app: App, init_scheduler):
from nonebot_plugin_saa import TargetQQGroup
from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11.message import Message
from nonebot_plugin_saa import TargetQQGroup
from nonebot_bison.config import config
from nonebot_bison.platform import platform_manager
from nonebot_bison.platform.bilibili import BilibiliBangumi
from nonebot_bison.sub_manager import add_sub_matcher, common_platform
ak_list_router = respx.get(
"https://api.bilibili.com/pgc/review/user?media_id=28235413"
)
ak_list_router.mock(
return_value=Response(200, json=get_json("bilibili-gangumi-hanhua1.json"))
)
ak_list_router = respx.get("https://api.bilibili.com/pgc/review/user?media_id=28235413")
ak_list_router.mock(return_value=Response(200, json=get_json("bilibili-gangumi-hanhua1.json")))
bilibili_main_page_router = respx.get("https://www.bilibili.com/")
bilibili_main_page_router.mock(return_value=Response(200))
@ -672,9 +562,7 @@ async def test_add_with_bilibili_bangumi_target_parser(app: App, init_scheduler)
ctx.should_pass_rule()
ctx.should_call_send(
event_1,
BotReply.add_reply_on_platform(
platform_manager=platform_manager, common_platform=common_platform
),
BotReply.add_reply_on_platform(platform_manager=platform_manager, common_platform=common_platform),
True,
)
event_2 = fake_group_message_event(
@ -687,9 +575,7 @@ async def test_add_with_bilibili_bangumi_target_parser(app: App, init_scheduler)
BotReply.add_reply_on_platform_input_allplatform(platform_manager),
True,
)
event_3 = fake_group_message_event(
message=Message("bilibili-bangumi"), sender=fake_admin_user
)
event_3 = fake_group_message_event(message=Message("bilibili-bangumi"), sender=fake_admin_user)
ctx.receive_event(bot, event_3)
ctx.should_call_send(
event_3,
@ -704,14 +590,10 @@ async def test_add_with_bilibili_bangumi_target_parser(app: App, init_scheduler)
ctx.receive_event(bot, event_4_ok)
ctx.should_call_send(
event_4_ok,
BotReply.add_reply_on_target_confirm(
"bilibili-bangumi", "汉化日记 第三季", "28235413"
),
BotReply.add_reply_on_target_confirm("bilibili-bangumi", "汉化日记 第三季", "28235413"),
True,
)
ctx.should_call_send(
event_4_ok, BotReply.add_reply_subscribe_success("汉化日记 第三季"), True
)
ctx.should_call_send(event_4_ok, BotReply.add_reply_subscribe_success("汉化日记 第三季"), True)
ctx.should_finished()
subs = await config.list_subscribe(TargetQQGroup(group_id=10000))
assert len(subs) == 1

View File

@ -2,20 +2,18 @@ import pytest
from nonebug.app import App
from nonebug_saa import should_send_saa
from ..platforms.utils import get_json
from ..utils import fake_admin_user, fake_group_message_event
@pytest.mark.asyncio
async def test_query_sub(app: App, init_scheduler):
from nonebot import get_driver
from nonebot.adapters.onebot.v11 import Bot, Message
from nonebot_plugin_saa import MessageFactory, SupportedAdapters, TargetQQGroup
from nonebot_plugin_saa import TargetQQGroup, MessageFactory
from nonebot_bison.types import Target
from nonebot_bison.config import config
from nonebot_bison.platform import platform_manager
from nonebot_bison.sub_manager import query_sub_matcher
from nonebot_bison.types import Target
await config.add_subscribe(
TargetQQGroup(group_id=10000),
@ -44,12 +42,12 @@ async def test_query_sub(app: App, init_scheduler):
async def test_del_sub(app: App, init_scheduler):
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message
from nonebot_plugin_saa import MessageFactory, TargetQQGroup
from nonebot_plugin_saa import TargetQQGroup, MessageFactory
from nonebot_bison.types import Target
from nonebot_bison.config import config
from nonebot_bison.platform import platform_manager
from nonebot_bison.sub_manager import del_sub_matcher
from nonebot_bison.types import Target
await config.add_subscribe(
TargetQQGroup(group_id=10000),
@ -62,29 +60,23 @@ async def test_del_sub(app: App, init_scheduler):
async with app.test_matcher(del_sub_matcher) as ctx:
bot = ctx.create_bot(base=Bot)
assert isinstance(bot, Bot)
event = fake_group_message_event(
message=Message("删除订阅"), to_me=True, sender=fake_admin_user
)
event = fake_group_message_event(message=Message("删除订阅"), to_me=True, sender=fake_admin_user)
ctx.receive_event(bot, event)
ctx.should_pass_rule()
ctx.should_pass_permission()
should_send_saa(
ctx,
MessageFactory(
"订阅的帐号为:\n1 weibo 明日方舟Arknights 6279793937\n [图文] 明日方舟\n请输入要删除的订阅的序号\n输入'取消'中止"
"订阅的帐号为:\n1 weibo 明日方舟Arknights 6279793937\n [图文] 明日方舟\n请输入要删除的订阅的序号\n输入'取消'中止" # noqa: E501
),
bot,
event=event,
)
event_1_err = fake_group_message_event(
message=Message("2"), sender=fake_admin_user
)
event_1_err = fake_group_message_event(message=Message("2"), sender=fake_admin_user)
ctx.receive_event(bot, event_1_err)
ctx.should_call_send(event_1_err, "删除错误", True)
ctx.should_rejected()
event_1_ok = fake_group_message_event(
message=Message("1"), sender=fake_admin_user
)
event_1_ok = fake_group_message_event(message=Message("1"), sender=fake_admin_user)
ctx.receive_event(bot, event_1_ok)
ctx.should_call_send(event_1_ok, "删除成功", True)
ctx.should_finished()
@ -97,16 +89,12 @@ async def test_del_empty_sub(app: App, init_scheduler):
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.message import Message
from nonebot_bison.config import config
from nonebot_bison.platform import platform_manager
from nonebot_bison.sub_manager import del_sub_matcher
async with app.test_matcher(del_sub_matcher) as ctx:
bot = ctx.create_bot(base=Bot)
assert isinstance(bot, Bot)
event = fake_group_message_event(
message=Message("删除订阅"), to_me=True, sender=fake_admin_user
)
event = fake_group_message_event(message=Message("删除订阅"), to_me=True, sender=fake_admin_user)
ctx.receive_event(bot, event)
ctx.should_pass_rule()
ctx.should_pass_permission()

View File

@ -1,8 +1,10 @@
from typing import cast
from pathlib import Path
from unittest.mock import patch
from click.testing import CliRunner
from nonebug.app import App
from click.core import BaseCommand
from click.testing import CliRunner
from .utils import get_file
@ -10,6 +12,7 @@ from .utils import get_file
def test_cli_help(app: App):
from nonebot_bison.script.cli import cli
cli = cast(BaseCommand, cli)
runner = CliRunner()
result = runner.invoke(cli, ["--help"])
@ -38,8 +41,10 @@ async def test_subs_export(app: App, tmp_path: Path):
from nonebot_plugin_saa import TargetQQGroup
from nonebot_bison.config.db_config import config
from nonebot_bison.script.cli import cli, run_sync
from nonebot_bison.types import Target as TTarget
from nonebot_bison.script.cli import cli, run_sync
cli = cast(BaseCommand, cli)
await config.add_subscribe(
TargetQQGroup(group_id=123),
@ -74,7 +79,7 @@ async def test_subs_export(app: App, tmp_path: Path):
runner = CliRunner()
# 是否默认导出到工作目录
with runner.isolated_filesystem(temp_dir=tmp_path) as td:
with runner.isolated_filesystem(temp_dir=tmp_path):
result = await run_sync(runner.invoke)(cli, ["export"])
assert result.exit_code == 0
file_path = Path.cwd() / "bison_subscribes_export_1.json"
@ -93,15 +98,11 @@ async def test_subs_export(app: App, tmp_path: Path):
assert '"group_id": 123' in file_path2.read_text()
# 是否拒绝导出到不存在的文件夹
result = await run_sync(runner.invoke)(
cli, ["export", "-p", str(tmp_path / "data2")]
)
result = await run_sync(runner.invoke)(cli, ["export", "-p", str(tmp_path / "data2")])
assert result.exit_code == 1
# 是否可以以yaml格式导出
result = await run_sync(runner.invoke)(
cli, ["export", "-p", str(tmp_path), "--format", "yaml"]
)
result = await run_sync(runner.invoke)(cli, ["export", "-p", str(tmp_path), "--format", "yaml"])
assert result.exit_code == 0
file_path3 = tmp_path / "bison_subscribes_export_1.yaml"
assert file_path3.exists()
@ -110,9 +111,7 @@ async def test_subs_export(app: App, tmp_path: Path):
assert "platform_type: QQ Group" in file_path3.read_text()
# 是否允许以未支持的格式导出
result = await run_sync(runner.invoke)(
cli, ["export", "-p", str(tmp_path), "--format", "toml"]
)
result = await run_sync(runner.invoke)(cli, ["export", "-p", str(tmp_path), "--format", "toml"])
assert result.exit_code == 2
@ -120,6 +119,8 @@ async def test_subs_import_v1(app: App, tmp_path):
from nonebot_bison.config.db_config import config
from nonebot_bison.script.cli import cli, run_sync
cli = cast(BaseCommand, cli)
assert len(await config.list_subs_with_all_info()) == 0
mock_file: Path = tmp_path / "1.json"
@ -140,9 +141,7 @@ async def test_subs_import_v1(app: App, tmp_path):
mock_file: Path = tmp_path / "2.yaml"
mock_file.write_text(get_file("v1/subs_export.yaml"))
result = await run_sync(runner.invoke)(
cli, ["import", "-p", str(mock_file), "--format=yml"]
)
result = await run_sync(runner.invoke)(cli, ["import", "-p", str(mock_file), "--format=yml"])
assert result.exit_code == 0
assert len(await config.list_subs_with_all_info()) == 6
@ -151,6 +150,8 @@ async def test_sub_import_v2(app: App, tmp_path):
from nonebot_bison.config.db_config import config
from nonebot_bison.script.cli import cli, run_sync
cli = cast(BaseCommand, cli)
assert len(await config.list_subs_with_all_info()) == 0
mock_file: Path = tmp_path / "1.json"
@ -171,8 +172,6 @@ async def test_sub_import_v2(app: App, tmp_path):
mock_file: Path = tmp_path / "2.yaml"
mock_file.write_text(get_file("v2/subs_export.yaml"))
result = await run_sync(runner.invoke)(
cli, ["import", "-p", str(mock_file), "--format=yml"]
)
result = await run_sync(runner.invoke)(cli, ["import", "-p", str(mock_file), "--format=yml"])
assert result.exit_code == 0
assert len(await config.list_subs_with_all_info()) == 6

View File

@ -5,14 +5,12 @@ from .utils import get_json
async def test_subs_export(app: App, init_scheduler):
import time
from nonebot_plugin_saa import TargetQQGroup
from nonebot_bison.config.db_config import config
from nonebot_bison.config.db_model import User
from nonebot_bison.config.subs_io import subscribes_export
from nonebot_bison.config.db_config import config
from nonebot_bison.types import Target as TTarget
from nonebot_bison.config.subs_io import subscribes_export
await config.add_subscribe(
TargetQQGroup(group_id=1232),
@ -43,13 +41,10 @@ async def test_subs_export(app: App, init_scheduler):
assert len(data) == 3
nbesf_data = await subscribes_export(lambda x: x)
print(nbesf_data.dict())
assert nbesf_data.dict() == get_json("v2/subs_export.json")
nbesf_data_user_234 = await subscribes_export(
lambda stmt: stmt.where(
User.user_target == {"platform_type": "QQ Group", "group_id": 2342}
)
lambda stmt: stmt.where(User.user_target == {"platform_type": "QQ Group", "group_id": 2342})
)
assert len(nbesf_data_user_234.groups) == 1
assert len(nbesf_data_user_234.groups[0].subs) == 2
@ -81,7 +76,6 @@ async def test_subs_import(app: App, init_scheduler):
async def test_subs_import_dup_err(app: App, init_scheduler):
from nonebot_bison.config.db_config import config
from nonebot_bison.config.subs_io import subscribes_import
from nonebot_bison.config.subs_io.nbesf_model import v1, v2
@ -106,20 +100,19 @@ async def test_subs_import_dup_err(app: App, init_scheduler):
async def test_subs_import_version_disorder(app: App, init_scheduler):
from nonebot_bison.config.db_config import config
from nonebot_bison.config.subs_io import subscribes_import
from nonebot_bison.config.subs_io.nbesf_model import v1, v2
from nonebot_bison.config.subs_io.utils import NBESFParseErr
# use v2 parse v1
with pytest.raises(NBESFParseErr):
nbesf_data_v1 = v1.nbesf_parser(get_json("v2/subs_export_has_subdup_err.json"))
v1.nbesf_parser(get_json("v2/subs_export_has_subdup_err.json"))
# use v1 parse v2
with pytest.raises(NBESFParseErr):
nbesf_data_v2 = v2.nbesf_parser(get_json("v1/subs_export_has_subdup_err.json"))
v2.nbesf_parser(get_json("v1/subs_export_has_subdup_err.json"))
with pytest.raises(AssertionError):
with pytest.raises(AssertionError): # noqa: PT012
nbesf_data = v2.nbesf_parser(get_json("v2/subs_export_has_subdup_err.json"))
nbesf_data.version = 1
await subscribes_import(nbesf_data)
@ -131,7 +124,7 @@ async def test_subs_import_all_fail(app: App, init_scheduler):
from nonebot_bison.config.subs_io.nbesf_model.v1 import NBESFParseErr
with pytest.raises(NBESFParseErr):
nbesf_data = v1.nbesf_parser(get_json("v1/subs_export_all_illegal.json"))
v1.nbesf_parser(get_json("v1/subs_export_all_illegal.json"))
with pytest.raises(NBESFParseErr):
nbesf_data = v2.nbesf_parser(get_json("v2/subs_export_all_illegal.json"))
v2.nbesf_parser(get_json("v2/subs_export_all_illegal.json"))

View File

@ -6,13 +6,13 @@ path = Path(__file__).parent / "static"
def get_json(file_name: str):
"""返回该目录下static目录下的<file_name>解析得到的json"""
with open(path / file_name, "r", encoding="utf8") as f:
with open(path / file_name, encoding="utf8") as f:
json_ = json.load(f)
return json_
def get_file(file_name: str):
"""返回该目录下static目录下的<file_name>的文件内容"""
with open(path / file_name, "r", encoding="utf8") as f:
with open(path / file_name, encoding="utf8") as f:
file_text = f.read()
return file_text

View File

@ -16,5 +16,5 @@ async def test_http_error(app: App):
await client.get("https://example.com")
assert ctx.gen_req_records() == [
"https://example.com Headers({'host': 'example.com', 'accept': '*/*', 'accept-encoding': 'gzip, deflate', 'connection': 'keep-alive', 'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36'}) | [403] Headers({'content-length': '15', 'content-type': 'application/json'}) {\"error\": \"gg\"}"
"https://example.com Headers({'host': 'example.com', 'accept': '*/*', 'accept-encoding': 'gzip, deflate', 'connection': 'keep-alive', 'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36'}) | [403] Headers({'content-length': '15', 'content-type': 'application/json'}) {\"error\": \"gg\"}" # noqa: E501
]

View File

@ -1,13 +1,13 @@
from pathlib import Path
import pytest
import respx
import pytest
from httpx import Response
from nonebot_plugin_saa import Image, MessageSegmentFactory, Text
from nonebug.app import App
from nonebot_plugin_saa import Text, Image, MessageSegmentFactory
@pytest.fixture
@pytest.fixture()
def ms_list():
msg_segments: list[MessageSegmentFactory] = []
msg_segments.append(Text("【Zc】每早合约日替攻略"))
@ -22,9 +22,9 @@ def ms_list():
return msg_segments
@pytest.fixture
@pytest.fixture()
def expected_md():
return "【Zc】每早合约日替攻略<br>![Image](http://i0.hdslb.com/bfs/live/new_room_cover/cf7d4d3b2f336c6dba299644c3af952c5db82612.jpg)\n来源: Bilibili直播 魔法Zc目录<br>详情: https://live.bilibili.com/3044248<br>"
return "【Zc】每早合约日替攻略<br>![Image](http://i0.hdslb.com/bfs/live/new_room_cover/cf7d4d3b2f336c6dba299644c3af952c5db82612.jpg)\n来源: Bilibili直播 魔法Zc目录<br>详情: https://live.bilibili.com/3044248<br>" # noqa: E501
def test_gene_md(app: App, expected_md, ms_list):
@ -40,15 +40,13 @@ def test_gene_md(app: App, expected_md, ms_list):
async def test_gene_pic(app: App, ms_list, expected_md):
from nonebot_bison.post.custom_post import CustomPost
pic_router = respx.get(
"http://i0.hdslb.com/bfs/live/new_room_cover/cf7d4d3b2f336c6dba299644c3af952c5db82612.jpg"
)
pic_router = respx.get("http://i0.hdslb.com/bfs/live/new_room_cover/cf7d4d3b2f336c6dba299644c3af952c5db82612.jpg")
pic_path = Path(__file__).parent / "platforms" / "static" / "custom_post_pic.jpg"
with open(pic_path, mode="rb") as f:
mock_pic = f.read()
pic_router.mock(return_value=Response(200, stream=mock_pic))
pic_router.mock(return_value=Response(200, stream=mock_pic)) # type: ignore
cp = CustomPost(ms_factories=ms_list)
cp_pic_msg_md: str = cp._generate_md()

View File

@ -1,5 +1,4 @@
import typing
from sys import dont_write_bytecode
import pytest
from flaky import flaky
@ -9,7 +8,6 @@ if typing.TYPE_CHECKING:
import sys
sys.path.append("./src/plugins")
import nonebot_bison
merge_source_9 = [
"https://wx1.sinaimg.cn/large/0071VPLMgy1gq0vib7zooj30dx0dxmz5.jpg",
@ -54,12 +52,12 @@ async def download_imgs(url_list: list[str]) -> list[bytes]:
return img_contents
@pytest.fixture
@pytest.fixture()
async def downloaded_resource():
return await download_imgs(merge_source_9)
@pytest.fixture
@pytest.fixture()
async def downloaded_resource_2():
return await download_imgs(merge_source_9_2)
@ -91,9 +89,7 @@ async def test_9_merge_2(app: App, downloaded_resource_2: list[bytes]):
async def test_6_merge(app: App, downloaded_resource: list[bytes]):
from nonebot_bison.post import Post
post = Post(
"", "", "", pics=list(downloaded_resource[0:6] + downloaded_resource[9:])
)
post = Post("", "", "", pics=list(downloaded_resource[0:6] + downloaded_resource[9:]))
await post._pic_merge()
assert len(post.pics) == 5
@ -103,9 +99,7 @@ async def test_6_merge(app: App, downloaded_resource: list[bytes]):
async def test_3_merge(app: App, downloaded_resource: list[bytes]):
from nonebot_bison.post import Post
post = Post(
"", "", "", pics=list(downloaded_resource[0:3] + downloaded_resource[9:])
)
post = Post("", "", "", pics=list(downloaded_resource[0:3] + downloaded_resource[9:]))
await post._pic_merge()
assert len(post.pics) == 5

View File

@ -1,7 +1,4 @@
import typing
import pytest
from httpx import AsyncClient
from nonebug.app import App
@ -10,8 +7,8 @@ from nonebug.app import App
async def test_render(app: App):
from nonebot_plugin_saa import Image
from nonebot_bison.plugin_config import plugin_config
from nonebot_bison.utils import parse_text
from nonebot_bison.plugin_config import plugin_config
plugin_config.bison_use_pic = True
@ -23,6 +20,6 @@ VuePress 由两部分组成:第一部分是一个极简静态网站生成器
(opens new window)它包含由 Vue 驱动的主题系统和插件 API另一个部分是为书写技术文档而优化的默认主题它的诞生初衷是为了支持 Vue 及其子项目的文档需求
每一个由 VuePress 生成的页面都带有预渲染好的 HTML也因此具有非常好的加载性能和搜索引擎优化SEO同时一旦页面被加载Vue 将接管这些静态内容并将其转换成一个完整的单页应用SPA其他的页面则会只在用户浏览到的时候才按需加载
"""
""" # noqa: E501
)
assert isinstance(res, Image)

View File

@ -1,8 +1,6 @@
import asyncio
import typing
import pytest
from flaky import flaky
from nonebug import App
from nonebug_saa import should_send_saa
from pytest_mock.plugin import MockerFixture
@ -11,11 +9,10 @@ from pytest_mock.plugin import MockerFixture
@pytest.mark.asyncio
async def test_send_no_queue(app: App, mocker: MockerFixture):
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot_plugin_saa import MessageFactory, TargetQQGroup, TargetQQPrivate
from nonebot_plugin_saa.utils.auto_select_bot import refresh_bots
from nonebot_plugin_saa import TargetQQGroup, MessageFactory, TargetQQPrivate
from nonebot_bison.plugin_config import plugin_config
from nonebot_bison.send import send_msgs
from nonebot_bison.plugin_config import plugin_config
mocker.patch.object(plugin_config, "bison_use_queue", False)
@ -37,7 +34,7 @@ async def test_send_no_queue(app: App, mocker: MockerFixture):
async def test_send_queue(app: App, mocker: MockerFixture):
import nonebot
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot_plugin_saa import MessageFactory, TargetQQGroup
from nonebot_plugin_saa import TargetQQGroup, MessageFactory
from nonebot_plugin_saa.utils.auto_select_bot import refresh_bots
from nonebot_bison.plugin_config import plugin_config
@ -47,7 +44,7 @@ async def test_send_queue(app: App, mocker: MockerFixture):
async with app.test_api() as ctx:
new_bot = ctx.create_bot(base=Bot)
await refresh_bots()
mocker.patch.object(nonebot, "get_bot", lambda: new_bot)
mocker.patch.object(nonebot, "get_bot", return_value=new_bot)
bot = nonebot.get_bot()
assert isinstance(bot, Bot)
assert bot == new_bot
@ -66,17 +63,11 @@ async def test_send_queue(app: App, mocker: MockerFixture):
@pytest.mark.asyncio
async def test_send_merge_no_queue(app: App):
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot_plugin_saa import (
AggregatedMessageFactory,
Image,
MessageFactory,
TargetQQGroup,
Text,
)
from nonebot_plugin_saa.utils.auto_select_bot import refresh_bots
from nonebot_plugin_saa import Text, Image, TargetQQGroup, MessageFactory, AggregatedMessageFactory
from nonebot_bison.plugin_config import plugin_config
from nonebot_bison.send import send_msgs
from nonebot_bison.plugin_config import plugin_config
plugin_config.bison_use_pic_merge = 1
plugin_config.bison_use_queue = False
@ -117,17 +108,11 @@ async def test_send_merge_no_queue(app: App):
async def test_send_merge2_no_queue(app: App):
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot_plugin_saa import (
AggregatedMessageFactory,
Image,
MessageFactory,
TargetQQGroup,
Text,
)
from nonebot_plugin_saa.utils.auto_select_bot import refresh_bots
from nonebot_plugin_saa import Text, Image, TargetQQGroup, MessageFactory, AggregatedMessageFactory
from nonebot_bison.plugin_config import plugin_config
from nonebot_bison.send import send_msgs
from nonebot_bison.plugin_config import plugin_config
plugin_config.bison_use_pic_merge = 2
plugin_config.bison_use_queue = False

View File

@ -1,6 +1,4 @@
from typing import TYPE_CHECKING, TypedDict
from typing_extensions import Literal
from typing import TYPE_CHECKING, Literal, TypedDict
if TYPE_CHECKING:
from nonebot.adapters.onebot.v11 import GroupMessageEvent, PrivateMessageEvent
@ -12,9 +10,9 @@ class AppReq(TypedDict, total=False):
def fake_group_message_event(**field) -> "GroupMessageEvent":
from nonebot.adapters.onebot.v11 import GroupMessageEvent, Message
from nonebot.adapters.onebot.v11.event import Sender
from pydantic import create_model
from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11 import Message, GroupMessageEvent
_Fake = create_model("_Fake", __base__=GroupMessageEvent)
@ -44,9 +42,9 @@ def fake_group_message_event(**field) -> "GroupMessageEvent":
def fake_private_message_event(**field) -> "PrivateMessageEvent":
from nonebot.adapters.onebot.v11 import Message, PrivateMessageEvent
from nonebot.adapters.onebot.v11.event import Sender
from pydantic import create_model
from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11 import Message, PrivateMessageEvent
_Fake = create_model("_Fake", __base__=PrivateMessageEvent)
@ -75,7 +73,9 @@ from nonebot.adapters.onebot.v11.event import Sender
fake_admin_user = Sender(nickname="test", role="admin")
fake_superuser = Sender(user_id=10001, nickname="superuser")
add_reply_on_id_input_search = "https://nonebot-bison.netlify.app/usage/#%E6%89%80%E6%94%AF%E6%8C%81%E5%B9%B3%E5%8F%B0%E7%9A%84-uid"
add_reply_on_id_input_search = (
"https://nonebot-bison.netlify.app/usage/#%E6%89%80%E6%94%AF%E6%8C%81%E5%B9%B3%E5%8F%B0%E7%9A%84-uid"
)
class BotReply:
@ -84,12 +84,7 @@ class BotReply:
return (
"请输入想要订阅的平台,目前支持,请输入冒号左边的名称:\n"
+ "".join(
[
"{}{}\n".format(
platform_name, platform_manager[platform_name].name
)
for platform_name in common_platform
]
[f"{platform_name}: {platform_manager[platform_name].name}\n" for platform_name in common_platform]
)
+ "要查看全部平台请输入:“全部”\n中止订阅过程请输入:“取消”"
)
@ -97,15 +92,12 @@ class BotReply:
@staticmethod
def add_reply_on_platform_input_allplatform(platform_manager):
return "全部平台\n" + "\n".join(
[
"{}{}".format(platform_name, platform.name)
for platform_name, platform in platform_manager.items()
]
[f"{platform_name}: {platform.name}" for platform_name, platform in platform_manager.items()]
)
@staticmethod
def add_reply_on_id_input_search_ob11():
from nonebot.adapters.onebot.v11 import Message, MessageSegment
from nonebot.adapters.onebot.v11 import MessageSegment
search_title = "Bison所支持的平台UID"
search_content = "查询相关平台的uid格式或获取方式"
@ -124,31 +116,28 @@ class BotReply:
@staticmethod
def add_reply_on_cats(platform_manager, platform: str):
return "请输入要订阅的类别,以空格分隔,支持的类别有:{}".format(
" ".join(list(platform_manager[platform].categories.values()))
return (
f"请输入要订阅的类别,以空格分隔,支持的类别有:{' '.join(list(platform_manager[platform].categories.values()))}" # noqa: E501
)
@staticmethod
def add_reply_on_cats_input_error(cat: str):
return "不支持 {}".format(cat)
return f"不支持 {cat}"
@staticmethod
def add_reply_subscribe_success(name):
return "添加 {} 成功".format(name)
return f"添加 {name} 成功"
@staticmethod
def add_reply_on_id(platform: object) -> str:
target_promot: str = platform.parse_target_promot # type: ignore
base_text = "请输入订阅用户的id\n查询id获取方法请回复:“查询”"
extra_text = (
("1." + platform.parse_target_promot + "\n2.")
if platform.parse_target_promot
else ""
)
extra_text = ("1." + target_promot + "\n2.") if target_promot else ""
return extra_text + base_text
add_reply_on_id_input_error = "id输入错误"
add_reply_on_target_parse_input_error = "不能从你的输入中提取出id请检查你输入的内容是否符合预期"
add_reply_on_platform_input_error = "平台输入错误"
add_reply_on_tags = '请输入要订阅/屏蔽的标签(不含#号)\n多个标签请使用空格隔开\n订阅所有标签输入"全部标签"\n具体规则回复"详情"'
add_reply_on_tags_need_more_info = "订阅标签直接输入标签内容\n屏蔽标签请在标签名称前添加~号\n详见https://nonebot-bison.netlify.app/usage/#%E5%B9%B3%E5%8F%B0%E8%AE%A2%E9%98%85%E6%A0%87%E7%AD%BE-tag"
add_reply_on_tags = '请输入要订阅/屏蔽的标签(不含#号)\n多个标签请使用空格隔开\n订阅所有标签输入"全部标签"\n具体规则回复"详情"' # noqa: E501
add_reply_on_tags_need_more_info = "订阅标签直接输入标签内容\n屏蔽标签请在标签名称前添加~号\n详见https://nonebot-bison.netlify.app/usage/#%E5%B9%B3%E5%8F%B0%E8%AE%A2%E9%98%85%E6%A0%87%E7%AD%BE-tag" # noqa: E501
add_reply_abort = "已中止订阅"