nonebot-bison/tests/config/test_config_operation.py
Azide 32e3bcc022
🐛 修正项目的代码警告 (#614)
* 🐛 调整ruff的pytest警告

* 🐛 调整导入关系警告

* 🐛 删除奇怪无用的赋值和取值逻辑

*  不同测试部分所用变量应加以区分

* 🐛 subs_io model添加默认值

* 🐛 修完所有的 ruff PT001 警告

* 🔧 按ruff建议修改ruff配置

warning: The top-level linter settings are deprecated in favour of their counterparts in the `lint` section. Please update the following options in `pyproject.toml`:
  - 'ignore' -> 'lint.ignore'
  - 'select' -> 'lint.select'

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
2024-08-17 18:24:20 +08:00

153 lines
5.1 KiB
Python

import pytest
from nonebug.app import App
async def test_add_subscribe(app: App, init_scheduler):
from nonebot_plugin_saa import TargetQQGroup
from sqlalchemy.sql.expression import select
from nonebot_plugin_datastore.db import get_engine
from sqlalchemy.ext.asyncio.session import AsyncSession
from nonebot_bison.config.db_config import config
from nonebot_bison.types import Target as TTarget
from nonebot_bison.config.db_model import User, Target, Subscribe
await config.add_subscribe(
TargetQQGroup(group_id=123),
target=TTarget("weibo_id"),
target_name="weibo_name",
platform_name="weibo",
cats=[],
tags=[],
)
await config.add_subscribe(
TargetQQGroup(group_id=234),
target=TTarget("weibo_id"),
target_name="weibo_name",
platform_name="weibo",
cats=[],
tags=[],
)
confs = await config.list_subscribe(TargetQQGroup(group_id=123))
assert len(confs) == 1
conf: Subscribe = confs[0]
async with AsyncSession(get_engine()) as sess:
related_user_obj = await sess.scalar(select(User).where(User.id == conf.user_id))
related_target_obj = await sess.scalar(select(Target).where(Target.id == conf.target_id))
assert related_user_obj
assert related_target_obj
assert related_user_obj.user_target["group_id"] == 123
assert related_target_obj.target_name == "weibo_name"
assert related_target_obj.target == "weibo_id"
assert conf.target.target == "weibo_id"
assert conf.categories == []
await config.update_subscribe(
TargetQQGroup(group_id=123),
target=TTarget("weibo_id"),
platform_name="weibo",
target_name="weibo_name2",
cats=[1],
tags=["tag"],
)
confs = await config.list_subscribe(TargetQQGroup(group_id=123))
assert len(confs) == 1
conf2: Subscribe = confs[0]
async with AsyncSession(get_engine()) as sess:
related_user_obj = await sess.scalar(select(User).where(User.id == conf2.user_id))
related_target_obj = await sess.scalar(select(Target).where(Target.id == conf2.target_id))
assert related_user_obj
assert related_target_obj
assert related_user_obj.user_target["group_id"] == 123
assert related_target_obj.target_name == "weibo_name2"
assert related_target_obj.target == "weibo_id"
assert conf2.target.target == "weibo_id"
assert conf2.categories == [1]
assert conf2.tags == ["tag"]
async def test_add_dup_sub(init_scheduler):
from nonebot_plugin_saa import TargetQQGroup
from nonebot_bison.types import Target as TTarget
from nonebot_bison.config.db_config import SubscribeDupException, config
await config.add_subscribe(
TargetQQGroup(group_id=123),
target=TTarget("weibo_id"),
target_name="weibo_name",
platform_name="weibo",
cats=[],
tags=[],
)
with pytest.raises(SubscribeDupException):
await config.add_subscribe(
TargetQQGroup(group_id=123),
target=TTarget("weibo_id"),
target_name="weibo_name",
platform_name="weibo",
cats=[],
tags=[],
)
async def test_del_subsribe(init_scheduler):
from sqlalchemy.sql.functions import func
from nonebot_plugin_saa import TargetQQGroup
from sqlalchemy.sql.expression import select
from nonebot_plugin_datastore.db import get_engine
from sqlalchemy.ext.asyncio.session import AsyncSession
from nonebot_bison.config.db_config import config
from nonebot_bison.types import Target as TTarget
from nonebot_bison.config.db_model import Target, Subscribe
await config.add_subscribe(
TargetQQGroup(group_id=123),
target=TTarget("weibo_id"),
target_name="weibo_name",
platform_name="weibo",
cats=[],
tags=[],
)
await config.del_subscribe(
TargetQQGroup(group_id=123),
target=TTarget("weibo_id"),
platform_name="weibo",
)
async with AsyncSession(get_engine()) as sess:
assert (await sess.scalar(select(func.count()).select_from(Subscribe))) == 0
assert (await sess.scalar(select(func.count()).select_from(Target))) == 1
await config.add_subscribe(
TargetQQGroup(group_id=123),
target=TTarget("weibo_id"),
target_name="weibo_name",
platform_name="weibo",
cats=[],
tags=[],
)
await config.add_subscribe(
TargetQQGroup(group_id=124),
target=TTarget("weibo_id"),
target_name="weibo_name_new",
platform_name="weibo",
cats=[],
tags=[],
)
await config.del_subscribe(
TargetQQGroup(group_id=123),
target=TTarget("weibo_id"),
platform_name="weibo",
)
async with AsyncSession(get_engine()) as sess:
assert (await sess.scalar(select(func.count()).select_from(Subscribe))) == 1
assert (await sess.scalar(select(func.count()).select_from(Target))) == 1
target = await sess.scalar(select(Target))
assert target
assert target.target_name == "weibo_name_new"