nonebot-bison/tests/config/test_config_operation.py
uy/sun 8da8f66fcf
💥 适配最新的 DataStore 插件,并切换模型为 SQLModel (#178)
* 使用 SQLModel

* 处理数据库迁移

* 与之前的模型相匹配

* sqlmodel 和 sqlalchemy 的导入移入测试函数内

并且使用 init_db 且测试前加载插件

* 重命名 alembic_version 表之前先检查是否存在且 version_num 属于插件

* 降级应该是把名称重新命名回去而不是删掉

* 不再设置 arbitrary_types_allowed 为 True
2023-01-30 22:52:11 +08:00

160 lines
4.9 KiB
Python

import pytest
from nonebug.app import App
async def test_add_subscribe(app: App, init_scheduler):
from nonebot_bison.config.db_config import config
from nonebot_bison.config.db_model import Subscribe, Target, User
from nonebot_bison.types import Target as TTarget
from nonebot_plugin_datastore.db import get_engine
from sqlalchemy.ext.asyncio.session import AsyncSession
from sqlmodel.sql.expression import select
await config.add_subscribe(
user=123,
user_type="group",
target=TTarget("weibo_id"),
target_name="weibo_name",
platform_name="weibo",
cats=[],
tags=[],
)
await config.add_subscribe(
user=234,
user_type="group",
target=TTarget("weibo_id"),
target_name="weibo_name",
platform_name="weibo",
cats=[],
tags=[],
)
confs = await config.list_subscribe(123, "group")
assert len(confs) == 1
conf: Subscribe = confs[0]
async with AsyncSession(get_engine()) as sess:
related_user_obj = await sess.scalar(
select(User).where(User.id == conf.user_id)
)
related_target_obj = await sess.scalar(
select(Target).where(Target.id == conf.target_id)
)
assert related_user_obj.uid == 123
assert related_target_obj.target_name == "weibo_name"
assert related_target_obj.target == "weibo_id"
assert conf.target.target == "weibo_id"
assert conf.categories == []
await config.update_subscribe(
user=123,
user_type="group",
target=TTarget("weibo_id"),
platform_name="weibo",
target_name="weibo_name2",
cats=[1],
tags=["tag"],
)
confs = await config.list_subscribe(123, "group")
assert len(confs) == 1
conf: Subscribe = confs[0]
async with AsyncSession(get_engine()) as sess:
related_user_obj = await sess.scalar(
select(User).where(User.id == conf.user_id)
)
related_target_obj = await sess.scalar(
select(Target).where(Target.id == conf.target_id)
)
assert related_user_obj.uid == 123
assert related_target_obj.target_name == "weibo_name2"
assert related_target_obj.target == "weibo_id"
assert conf.target.target == "weibo_id"
assert conf.categories == [1]
assert conf.tags == ["tag"]
async def test_add_dup_sub(init_scheduler):
from nonebot_bison.config.db_config import SubscribeDupException, config
from nonebot_bison.types import Target as TTarget
await config.add_subscribe(
user=123,
user_type="group",
target=TTarget("weibo_id"),
target_name="weibo_name",
platform_name="weibo",
cats=[],
tags=[],
)
with pytest.raises(SubscribeDupException):
await config.add_subscribe(
user=123,
user_type="group",
target=TTarget("weibo_id"),
target_name="weibo_name",
platform_name="weibo",
cats=[],
tags=[],
)
async def test_del_subsribe(init_scheduler):
from nonebot_bison.config.db_config import config
from nonebot_bison.config.db_model import Subscribe, Target
from nonebot_bison.types import Target as TTarget
from nonebot_plugin_datastore.db import get_engine
from sqlalchemy.ext.asyncio.session import AsyncSession
from sqlalchemy.sql.functions import func
from sqlmodel.sql.expression import select
await config.add_subscribe(
user=123,
user_type="group",
target=TTarget("weibo_id"),
target_name="weibo_name",
platform_name="weibo",
cats=[],
tags=[],
)
await config.del_subscribe(
user=123,
user_type="group",
target=TTarget("weibo_id"),
platform_name="weibo",
)
async with AsyncSession(get_engine()) as sess:
assert (await sess.scalar(select(func.count()).select_from(Subscribe))) == 0
assert (await sess.scalar(select(func.count()).select_from(Target))) == 1
await config.add_subscribe(
user=123,
user_type="group",
target=TTarget("weibo_id"),
target_name="weibo_name",
platform_name="weibo",
cats=[],
tags=[],
)
await config.add_subscribe(
user=124,
user_type="group",
target=TTarget("weibo_id"),
target_name="weibo_name_new",
platform_name="weibo",
cats=[],
tags=[],
)
await config.del_subscribe(
user=123,
user_type="group",
target=TTarget("weibo_id"),
platform_name="weibo",
)
async with AsyncSession(get_engine()) as sess:
assert (await sess.scalar(select(func.count()).select_from(Subscribe))) == 1
assert (await sess.scalar(select(func.count()).select_from(Target))) == 1
target: Target = await sess.scalar(select(Target))
assert target.target_name == "weibo_name_new"