nonebot-bison/tests/subs_io/test_subs_io.py
AzideCupric 4e304a43b1
通过 nb-cli 实现数据库一键导入导出 (#210)
* feat: 实现导出存储的订阅信息的功能

* test: 编写导出功能测试

* test: 使用tmp_path

* feat: 实现导入订阅文件功能

* refactor: 将订阅导入导出部分独立出来

* fix: 修复一些拼写错误
test: 完成import的第一个测试

* feat: 将订阅导入导出函数加入nb script

test: 添加cli测试

* test: 完善subs import测试

* 🐛 fix nb cli entrypoint name error

* fix: 修改错误的entry_point, 关闭yaml导出时对键名的排序

* fix: 使用更简短的命令名

* 🚚 将subs_io迁移到config下

* ♻️ 不再使用抛出异常的方式创建目录

* refactor: 将subscribe_export类转为函数

* refactor: 将subscribe_import类转为函数

* refactor: 根据重写的subs_io重新调整cli

* test: 调整重写subs_io后的test

* chore: 清理未使用的import内容

* feat(cli): 将--yaml更改为--format

* test: 调整测试

* fix(cli): 为import添加不支持格式的报错

*  improve export performace

* feat: subscribes_import函数不再需要传入参数函数,而是指定为add_subscribes

fix: nbesf_parser在传入str时将调用parse_raw, 否则调用parse_obj

* feat: subscribes_import现在会根据nbesf_data的版本选择合适的导入方式

* fix(test): 调整测试

* feat: nb bison export命令不再将文件导出到data目录,而是当前工作目录

* docs: 增添相关文档

* fix(test): 修复错误的变量名

---------

Co-authored-by: felinae98 <731499577@qq.com>
2023-03-19 16:29:05 +08:00

92 lines
2.7 KiB
Python

from pathlib import Path
import pytest
from nonebug.app import App
from .utils import get_file, get_json
async def test_subs_export(app: App, init_scheduler):
import time
from nonebot_bison.config.db_config import config
from nonebot_bison.config.db_model import User
from nonebot_bison.config.subs_io import subscribes_export
from nonebot_bison.types import Target as TTarget
await config.add_subscribe(
user=123,
user_type="group",
target=TTarget("weibo_id"),
target_name="weibo_name",
platform_name="weibo",
cats=[],
tags=[],
)
await config.add_subscribe(
user=234,
user_type="group",
target=TTarget("weibo_id"),
target_name="weibo_name",
platform_name="weibo",
cats=[],
tags=["kaltsit", "amiya"],
)
await config.add_subscribe(
user=234,
user_type="group",
target=TTarget("bilibili_id"),
target_name="bilibili_name",
platform_name="bilibili",
cats=[1, 2],
tags=[],
)
data = await config.list_subs_with_all_info()
assert len(data) == 3
nbesf_data = await subscribes_export(lambda x: x)
assert nbesf_data.dict() == get_json("subs_export.json")
nbesf_data_user_234 = await subscribes_export(
lambda stmt: stmt.where(User.uid == 234, User.type == "group")
)
assert len(nbesf_data_user_234.groups) == 1
assert len(nbesf_data_user_234.groups[0].subs) == 2
assert nbesf_data_user_234.groups[0].user.dict() == {"uid": 234, "type": "group"}
async def test_subs_import(app: App, init_scheduler):
from nonebot_bison.config.db_config import config
from nonebot_bison.config.subs_io import nbesf_parser, subscribes_import
nbesf_data = nbesf_parser(get_json("subs_export.json"))
await subscribes_import(nbesf_data)
data = await config.list_subs_with_all_info()
assert len(data) == 3
async def test_subs_import_dup_err(app: App, init_scheduler):
from nonebot_bison.config.db_config import config
from nonebot_bison.config.subs_io import nbesf_parser, subscribes_import
nbesf_data = nbesf_parser(get_json("subs_export_has_subdup_err.json"))
await subscribes_import(nbesf_data)
data = await config.list_subs_with_all_info()
assert len(data) == 4
async def test_subs_import_all_fail(app: App, init_scheduler):
"""只要文件格式有任何一个错误, 都不会进行订阅"""
from nonebot_bison.config.subs_io import nbesf_parser
from nonebot_bison.config.subs_io.nbesf_model import NBESFParseErr
with pytest.raises(NBESFParseErr):
nbesf_data = nbesf_parser(get_json("subs_export_all_illegal.json"))