big is comming!

This commit is contained in:
felinae98 2022-01-19 11:45:06 +08:00
parent 35424085a5
commit bd02367772
No known key found for this signature in database
GPG Key ID: 00C8B010587FF610
11 changed files with 115 additions and 161 deletions

View File

@ -1,7 +1,7 @@
from collections import defaultdict from collections import defaultdict
from os import path from os import path
import os import os
from typing import DefaultDict, Mapping from typing import DefaultDict, Literal, Mapping, TypedDict
import nonebot import nonebot
from nonebot import logger from nonebot import logger
@ -34,6 +34,18 @@ class NoSuchUserException(Exception):
class NoSuchSubscribeException(Exception): class NoSuchSubscribeException(Exception):
pass pass
class SubscribeContent(TypedDict):
target: str
target_type: str
target_name: str
cats: list[int]
tags: list[str]
class ConfigContent(TypedDict):
user: str
user_type: Literal["group", "private"]
subs: list[SubscribeContent]
class Config(metaclass=Singleton): class Config(metaclass=Singleton):
migrate_version = 2 migrate_version = 2
@ -64,7 +76,7 @@ class Config(metaclass=Singleton):
}) })
self.update_send_cache() self.update_send_cache()
def list_subscribe(self, user, user_type): def list_subscribe(self, user, user_type) -> list[SubscribeContent]:
query = Query() query = Query()
if user_sub := self.user_target.get((query.user == user) & (query.user_type ==user_type)): if user_sub := self.user_target.get((query.user == user) & (query.user_type ==user_type)):
return user_sub['subs'] return user_sub['subs']

View File

@ -14,7 +14,7 @@ async def check_sub_target(target_type, target):
return await platform_manager[target_type].get_target_name(target) return await platform_manager[target_type].get_target_name(target)
_platform_list = defaultdict(list) _platform_list = defaultdict(list)
for _platform in Platform.registory: for _platform in Platform.registry:
if not _platform.enabled: if not _platform.enabled:
continue continue
_platform_list[_platform.platform_name].append(_platform) _platform_list[_platform.platform_name].append(_platform)

View File

@ -7,10 +7,10 @@ import httpx
from ..post import Post from ..post import Post
from ..types import Category, RawPost, Target from ..types import Category, RawPost, Target
from ..utils import Render from ..utils import Render
from .platform import CategoryNotSupport, NewMessage, NoTargetMixin, StatusChange from .platform import CategoryNotSupport, NewMessage, StatusChange
class Arknights(NewMessage, NoTargetMixin): class Arknights(NewMessage):
categories = {1: '游戏公告'} categories = {1: '游戏公告'}
platform_name = 'arknights' platform_name = 'arknights'
@ -20,6 +20,7 @@ class Arknights(NewMessage, NoTargetMixin):
is_common = False is_common = False
schedule_type = 'interval' schedule_type = 'interval'
schedule_kw = {'seconds': 30} schedule_kw = {'seconds': 30}
has_target = False
async def get_target_name(self, _: Target) -> str: async def get_target_name(self, _: Target) -> str:
return '明日方舟游戏信息' return '明日方舟游戏信息'
@ -60,7 +61,7 @@ class Arknights(NewMessage, NoTargetMixin):
raise CategoryNotSupport() raise CategoryNotSupport()
return Post('arknights', text=text, url='', target_name="明日方舟游戏内公告", pics=pics, compress=True, override_use_pic=False) return Post('arknights', text=text, url='', target_name="明日方舟游戏内公告", pics=pics, compress=True, override_use_pic=False)
class AkVersion(NoTargetMixin, StatusChange): class AkVersion(StatusChange):
categories = {2: '更新信息'} categories = {2: '更新信息'}
platform_name = 'arknights' platform_name = 'arknights'
@ -70,6 +71,7 @@ class AkVersion(NoTargetMixin, StatusChange):
is_common = False is_common = False
schedule_type = 'interval' schedule_type = 'interval'
schedule_kw = {'seconds': 30} schedule_kw = {'seconds': 30}
has_target = False
async def get_target_name(self, _: Target) -> str: async def get_target_name(self, _: Target) -> str:
return '明日方舟游戏信息' return '明日方舟游戏信息'
@ -104,7 +106,7 @@ class AkVersion(NoTargetMixin, StatusChange):
async def parse(self, raw_post): async def parse(self, raw_post):
return raw_post return raw_post
class MonsterSiren(NewMessage, NoTargetMixin): class MonsterSiren(NewMessage):
categories = {3: '塞壬唱片新闻'} categories = {3: '塞壬唱片新闻'}
platform_name = 'arknights' platform_name = 'arknights'
@ -114,6 +116,7 @@ class MonsterSiren(NewMessage, NoTargetMixin):
is_common = False is_common = False
schedule_type = 'interval' schedule_type = 'interval'
schedule_kw = {'seconds': 30} schedule_kw = {'seconds': 30}
has_target = False
async def get_target_name(self, _: Target) -> str: async def get_target_name(self, _: Target) -> str:
return '明日方舟游戏信息' return '明日方舟游戏信息'

View File

@ -5,9 +5,9 @@ import httpx
from ..post import Post from ..post import Post
from ..types import Category, RawPost, Tag, Target from ..types import Category, RawPost, Tag, Target
from .platform import NewMessage, TargetMixin, CategoryNotSupport from .platform import NewMessage, CategoryNotSupport
class Bilibili(NewMessage, TargetMixin): class Bilibili(NewMessage):
categories = { categories = {
1: "一般动态", 1: "一般动态",
@ -24,6 +24,7 @@ class Bilibili(NewMessage, TargetMixin):
schedule_type = 'interval' schedule_type = 'interval'
schedule_kw = {'seconds': 10} schedule_kw = {'seconds': 10}
name = 'B站' name = 'B站'
has_target = True
async def get_target_name(self, target: Target) -> Optional[str]: async def get_target_name(self, target: Target) -> Optional[str]:
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:

View File

@ -3,9 +3,9 @@ from typing import Any, Optional
import httpx import httpx
from ..post import Post from ..post import Post
from ..types import RawPost, Target from ..types import RawPost, Target
from .platform import TargetMixin, NewMessage from .platform import NewMessage
class NcmArtist(TargetMixin, NewMessage): class NcmArtist(NewMessage):
categories = {} categories = {}
platform_name = 'ncm-artist' platform_name = 'ncm-artist'
@ -15,6 +15,7 @@ class NcmArtist(TargetMixin, NewMessage):
schedule_type = 'interval' schedule_type = 'interval'
schedule_kw = {'minutes': 1} schedule_kw = {'minutes': 1}
name = "网易云-歌手" name = "网易云-歌手"
has_target = True
async def get_target_name(self, target: Target) -> Optional[str]: async def get_target_name(self, target: Target) -> Optional[str]:
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:

View File

@ -3,9 +3,9 @@ from typing import Any, Optional
import httpx import httpx
from ..post import Post from ..post import Post
from ..types import RawPost, Target from ..types import RawPost, Target
from .platform import TargetMixin, NewMessage from .platform import NewMessage
class NcmRadio(TargetMixin, NewMessage): class NcmRadio(NewMessage):
categories = {} categories = {}
platform_name = 'ncm-radio' platform_name = 'ncm-radio'
@ -15,6 +15,7 @@ class NcmRadio(TargetMixin, NewMessage):
schedule_type = 'interval' schedule_type = 'interval'
schedule_kw = {'minutes': 10} schedule_kw = {'minutes': 10}
name = "网易云-电台" name = "网易云-电台"
has_target = True
async def get_target_name(self, target: Target) -> Optional[str]: async def get_target_name(self, target: Target) -> Optional[str]:
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:

View File

@ -24,35 +24,55 @@ class RegistryMeta(type):
def __init__(cls, name, bases, namespace, **kwargs): def __init__(cls, name, bases, namespace, **kwargs):
if kwargs.get('base'): if kwargs.get('base'):
# this is the base class # this is the base class
cls.registory = [] cls.registry = []
elif not kwargs.get('abstract'): elif not kwargs.get('abstract'):
# this is the subclass # this is the subclass
cls.registory.append(cls) cls.registry.append(cls)
super().__init__(name, bases, namespace, **kwargs) super().__init__(name, bases, namespace, **kwargs)
class RegistryABCMeta(RegistryMeta, ABC): class RegistryABCMeta(RegistryMeta, ABC):
... ...
class StorageMixinProto(metaclass=RegistryABCMeta, abstract=True): class Platform(metaclass=RegistryABCMeta, base=True):
schedule_type: Literal['date', 'interval', 'cron']
schedule_kw: dict
is_common: bool
enabled: bool
name: str
has_target: bool has_target: bool
categories: dict[Category, str]
enable_tag: bool
store: dict[Target, Any]
platform_name: str
@abstractmethod @abstractmethod
def get_stored_data(self, target: Target) -> Any: async def get_target_name(self, target: Target) -> Optional[str]:
... ...
@abstractmethod @abstractmethod
def set_stored_data(self, target: Target, data: Any): async def fetch_new_post(self, target: Target, users: list[UserSubInfo]) -> list[tuple[User, list[Post]]]:
... ...
class TargetMixin(StorageMixinProto, abstract=True): @abstractmethod
async def parse(self, raw_post: RawPost) -> Post:
...
has_target = True async def do_parse(self, raw_post: RawPost) -> Post:
"actually function called"
return await self.parse(raw_post)
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.store: dict[Target, Any] = dict() self.reverse_category = {}
for key, val in self.categories.items():
self.reverse_category[val] = key
self.store = dict()
@abstractmethod
def get_tags(self, raw_post: RawPost) -> Optional[Collection[Tag]]:
"Return Tag list of given RawPost"
def get_stored_data(self, target: Target) -> Any: def get_stored_data(self, target: Target) -> Any:
return self.store.get(target) return self.store.get(target)
@ -60,39 +80,43 @@ class TargetMixin(StorageMixinProto, abstract=True):
def set_stored_data(self, target: Target, data: Any): def set_stored_data(self, target: Target, data: Any):
self.store[target] = data self.store[target] = data
async def filter_user_custom(self, raw_post_list: list[RawPost], cats: list[Category], tags: list[Tag]) -> list[RawPost]:
res: list[RawPost] = []
for raw_post in raw_post_list:
if self.categories:
cat = self.get_category(raw_post)
if cats and cat not in cats:
continue
if self.enable_tag and tags:
flag = False
post_tags = self.get_tags(raw_post)
for tag in post_tags or []:
if tag in tags:
flag = True
break
if not flag:
continue
res.append(raw_post)
return res
class NoTargetMixin(StorageMixinProto, abstract=True): async def dispatch_user_post(self, target: Target, new_posts: list[RawPost], users: list[UserSubInfo]) -> list[tuple[User, list[Post]]]:
res: list[tuple[User, list[Post]]] = []
has_target = False for user, category_getter, tag_getter in users:
required_tags = tag_getter(target) if self.enable_tag else []
def __init__(self): cats = category_getter(target)
super().__init__() user_raw_post = await self.filter_user_custom(new_posts, cats, required_tags)
self.store = None user_post: list[Post] = []
for raw_post in user_raw_post:
def get_stored_data(self, _: Target) -> Any: user_post.append(await self.do_parse(raw_post))
return self.store res.append((user, user_post))
return res
def set_stored_data(self, _: Target, data: Any):
self.store = data
class PlatformNameMixin(metaclass=RegistryABCMeta, abstract=True):
platform_name: str
class CategoryMixin(metaclass=RegistryABCMeta, abstract=True):
@abstractmethod @abstractmethod
def get_category(self, post: RawPost) -> Optional[Category]: def get_category(self, post: RawPost) -> Optional[Category]:
"Return category of given Rawpost" "Return category of given Rawpost"
raise NotImplementedError() raise NotImplementedError()
class ParsePostMixin(metaclass=RegistryABCMeta, abstract=True): class MessageProcess(Platform, abstract=True):
@abstractmethod
async def parse(self, raw_post: RawPost) -> Post:
"parse RawPost into post"
...
class MessageProcessMixin(PlatformNameMixin, CategoryMixin, ParsePostMixin, abstract=True):
"General message process fetch, parse, filter progress" "General message process fetch, parse, filter progress"
def __init__(self): def __init__(self):
@ -104,7 +128,7 @@ class MessageProcessMixin(PlatformNameMixin, CategoryMixin, ParsePostMixin, abst
"Get post id of given RawPost" "Get post id of given RawPost"
async def _parse_with_cache(self, raw_post: RawPost) -> Post: async def do_parse(self, raw_post: RawPost) -> Post:
post_id = self.get_id(raw_post) post_id = self.get_id(raw_post)
if post_id not in self.parse_cache: if post_id not in self.parse_cache:
retry_times = 3 retry_times = 3
@ -144,8 +168,8 @@ class MessageProcessMixin(PlatformNameMixin, CategoryMixin, ParsePostMixin, abst
res.append(raw_post) res.append(raw_post)
return res return res
class NewMessageProcessMixin(StorageMixinProto, MessageProcessMixin, abstract=True): class NewMessage(MessageProcess, abstract=True):
"General message process, fetch, parse, filter, and only returns the new Post" "Fetch a list of messages, filter the new messages, dispatch it to different users"
@dataclass @dataclass
class MessageStorage(): class MessageStorage():
@ -173,79 +197,6 @@ class NewMessageProcessMixin(StorageMixinProto, MessageProcessMixin, abstract=Tr
self.set_stored_data(target, store) self.set_stored_data(target, store)
return res return res
class UserCustomFilterMixin(CategoryMixin, ParsePostMixin, abstract=True):
categories: dict[Category, str]
enable_tag: bool
def __init__(self):
super().__init__()
self.reverse_category = {}
for key, val in self.categories.items():
self.reverse_category[val] = key
@abstractmethod
def get_tags(self, raw_post: RawPost) -> Optional[Collection[Tag]]:
"Return Tag list of given RawPost"
async def filter_user_custom(self, raw_post_list: list[RawPost], cats: list[Category], tags: list[Tag]) -> list[RawPost]:
res: list[RawPost] = []
for raw_post in raw_post_list:
if self.categories:
cat = self.get_category(raw_post)
if cats and cat not in cats:
continue
if self.enable_tag and tags:
flag = False
post_tags = self.get_tags(raw_post)
for tag in post_tags or []:
if tag in tags:
flag = True
break
if not flag:
continue
res.append(raw_post)
return res
async def dispatch_user_post(self, target: Target, new_posts: list[RawPost], users: list[UserSubInfo]) -> list[tuple[User, list[Post]]]:
res: list[tuple[User, list[Post]]] = []
for user, category_getter, tag_getter in users:
required_tags = tag_getter(target) if self.enable_tag else []
cats = category_getter(target)
user_raw_post = await self.filter_user_custom(new_posts, cats, required_tags)
user_post: list[Post] = []
for raw_post in user_raw_post:
if isinstance(self, MessageProcessMixin):
user_post.append(await self._parse_with_cache(raw_post))
else:
user_post.append(await self.parse(raw_post))
res.append((user, user_post))
return res
class Platform(PlatformNameMixin, UserCustomFilterMixin, base=True):
# schedule_interval: int
schedule_type: Literal['date', 'interval', 'cron']
schedule_kw: dict
is_common: bool
enabled: bool
name: str
@abstractmethod
async def get_target_name(self, target: Target) -> Optional[str]:
...
@abstractmethod
async def fetch_new_post(self, target: Target, users: list[UserSubInfo]) -> list[tuple[User, list[Post]]]:
...
class NewMessage(
Platform,
NewMessageProcessMixin,
UserCustomFilterMixin,
abstract=True
):
"Fetch a list of messages, filter the new messages, dispatch it to different users"
async def fetch_new_post(self, target: Target, users: list[UserSubInfo]) -> list[tuple[User, list[Post]]]: async def fetch_new_post(self, target: Target, users: list[UserSubInfo]) -> list[tuple[User, list[Post]]]:
try: try:
@ -266,12 +217,7 @@ class NewMessage(
logger.warning("network connection error: {}, url: {}".format(type(err), err.request.url)) logger.warning("network connection error: {}, url: {}".format(type(err), err.request.url))
return [] return []
class StatusChange( class StatusChange(Platform, abstract=True):
Platform,
StorageMixinProto,
UserCustomFilterMixin,
abstract=True
):
"Watch a status, and fire a post when status changes" "Watch a status, and fire a post when status changes"
@abstractmethod @abstractmethod
@ -305,13 +251,7 @@ class StatusChange(
logger.warning("network connection error: {}, url: {}".format(type(err), err.request.url)) logger.warning("network connection error: {}, url: {}".format(type(err), err.request.url))
return [] return []
class SimplePost( class SimplePost(MessageProcess, abstract=True):
Platform,
MessageProcessMixin,
UserCustomFilterMixin,
StorageMixinProto,
abstract=True
):
"Fetch a list of messages, dispatch it to different users" "Fetch a list of messages, dispatch it to different users"
async def fetch_new_post(self, target: Target, users: list[UserSubInfo]) -> list[tuple[User, list[Post]]]: async def fetch_new_post(self, target: Target, users: list[UserSubInfo]) -> list[tuple[User, list[Post]]]:
@ -332,20 +272,12 @@ class SimplePost(
logger.warning("network connection error: {}, url: {}".format(type(err), err.request.url)) logger.warning("network connection error: {}, url: {}".format(type(err), err.request.url))
return [] return []
class NoTargetGroup( class NoTargetGroup(Platform, abstract=True):
Platform,
NoTargetMixin,
UserCustomFilterMixin,
abstract=True
):
enable_tag = False enable_tag = False
DUMMY_STR = '_DUMMY' DUMMY_STR = '_DUMMY'
enabled = True enabled = True
class PlatformProto(Platform, NoTargetMixin, UserCustomFilterMixin, abstract=True): def __init__(self, platform_list: list[Platform]):
...
def __init__(self, platform_list: list[PlatformProto]):
self.platform_list = platform_list self.platform_list = platform_list
name = self.DUMMY_STR name = self.DUMMY_STR
self.categories = {} self.categories = {}
@ -353,6 +285,8 @@ class NoTargetGroup(
self.schedule_type = platform_list[0].schedule_type self.schedule_type = platform_list[0].schedule_type
self.schedule_kw = platform_list[0].schedule_kw self.schedule_kw = platform_list[0].schedule_kw
for platform in platform_list: for platform in platform_list:
if platform.has_target:
raise RuntimeError('Platform {} should have no target'.format(platform.name))
if name == self.DUMMY_STR: if name == self.DUMMY_STR:
name = platform.name name = platform.name
elif name != platform.name: elif name != platform.name:
@ -381,3 +315,4 @@ class NoTargetGroup(
for user, posts in platform_res: for user, posts in platform_res:
res[user].extend(posts) res[user].extend(posts)
return [[key, val] for key, val in res.items()] return [[key, val] for key, val in res.items()]

View File

@ -7,9 +7,9 @@ import httpx
from ..post import Post from ..post import Post
from ..types import RawPost, Target from ..types import RawPost, Target
from .platform import NewMessage, TargetMixin from .platform import NewMessage
class Rss(NewMessage, TargetMixin): class Rss(NewMessage):
categories = {} categories = {}
enable_tag = False enable_tag = False
@ -19,6 +19,7 @@ class Rss(NewMessage, TargetMixin):
is_common = True is_common = True
schedule_type = 'interval' schedule_type = 'interval'
schedule_kw = {'seconds': 30} schedule_kw = {'seconds': 30}
has_target = True
async def get_target_name(self, target: Target) -> Optional[str]: async def get_target_name(self, target: Target) -> Optional[str]:
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:

View File

@ -9,9 +9,9 @@ from nonebot import logger
from ..post import Post from ..post import Post
from ..types import * from ..types import *
from .platform import NewMessage, TargetMixin from .platform import NewMessage
class Weibo(NewMessage, TargetMixin): class Weibo(NewMessage):
categories = { categories = {
1: '转发', 1: '转发',
@ -26,6 +26,7 @@ class Weibo(NewMessage, TargetMixin):
is_common = True is_common = True
schedule_type = 'interval' schedule_type = 'interval'
schedule_kw = {'seconds': 3} schedule_kw = {'seconds': 3}
has_target = True
async def get_target_name(self, target: Target) -> Optional[str]: async def get_target_name(self, target: Target) -> Optional[str]:
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:

View File

@ -2,7 +2,6 @@ import pytest
import typing import typing
import respx import respx
from httpx import Response from httpx import Response
import feedparser
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
import sys import sys

View File

@ -38,8 +38,7 @@ def user_info_factory(plugin_module: 'nonebot_bison', dummy_user):
@pytest.fixture @pytest.fixture
def mock_platform_without_cats_tags(plugin_module: 'nonebot_bison'): def mock_platform_without_cats_tags(plugin_module: 'nonebot_bison'):
class MockPlatform(plugin_module.platform.platform.NewMessage, class MockPlatform(plugin_module.platform.platform.NewMessage):
plugin_module.platform.platform.TargetMixin):
platform_name = 'mock_platform' platform_name = 'mock_platform'
name = 'Mock Platform' name = 'Mock Platform'
@ -48,6 +47,7 @@ def mock_platform_without_cats_tags(plugin_module: 'nonebot_bison'):
schedule_interval = 10 schedule_interval = 10
enable_tag = False enable_tag = False
categories = {} categories = {}
has_target = True
def __init__(self): def __init__(self):
self.sub_index = 0 self.sub_index = 0
@ -77,8 +77,7 @@ def mock_platform_without_cats_tags(plugin_module: 'nonebot_bison'):
@pytest.fixture @pytest.fixture
def mock_platform(plugin_module: 'nonebot_bison'): def mock_platform(plugin_module: 'nonebot_bison'):
class MockPlatform(plugin_module.platform.platform.NewMessage, class MockPlatform(plugin_module.platform.platform.NewMessage):
plugin_module.platform.platform.TargetMixin):
platform_name = 'mock_platform' platform_name = 'mock_platform'
name = 'Mock Platform' name = 'Mock Platform'
@ -86,6 +85,7 @@ def mock_platform(plugin_module: 'nonebot_bison'):
is_common = True is_common = True
schedule_interval = 10 schedule_interval = 10
enable_tag = True enable_tag = True
has_target = True
categories = { categories = {
1: '转发', 1: '转发',
2: '视频', 2: '视频',
@ -124,8 +124,7 @@ def mock_platform(plugin_module: 'nonebot_bison'):
@pytest.fixture @pytest.fixture
def mock_platform_no_target(plugin_module: 'nonebot_bison'): def mock_platform_no_target(plugin_module: 'nonebot_bison'):
class MockPlatform(plugin_module.platform.platform.NewMessage, class MockPlatform(plugin_module.platform.platform.NewMessage):
plugin_module.platform.platform.NoTargetMixin):
platform_name = 'mock_platform' platform_name = 'mock_platform'
name = 'Mock Platform' name = 'Mock Platform'
@ -134,6 +133,7 @@ def mock_platform_no_target(plugin_module: 'nonebot_bison'):
schedule_type = 'interval' schedule_type = 'interval'
schedule_kw = {'seconds': 30} schedule_kw = {'seconds': 30}
enable_tag = True enable_tag = True
has_target = False
categories = { categories = {
1: '转发', 1: '转发',
2: '视频', 2: '视频',
@ -175,8 +175,7 @@ def mock_platform_no_target(plugin_module: 'nonebot_bison'):
@pytest.fixture @pytest.fixture
def mock_platform_no_target_2(plugin_module: 'nonebot_bison'): def mock_platform_no_target_2(plugin_module: 'nonebot_bison'):
class MockPlatform(plugin_module.platform.platform.NewMessage, class MockPlatform(plugin_module.platform.platform.NewMessage):
plugin_module.platform.platform.NoTargetMixin):
platform_name = 'mock_platform' platform_name = 'mock_platform'
name = 'Mock Platform' name = 'Mock Platform'
@ -185,6 +184,7 @@ def mock_platform_no_target_2(plugin_module: 'nonebot_bison'):
schedule_kw = {'seconds': 30} schedule_kw = {'seconds': 30}
is_common = True is_common = True
enable_tag = True enable_tag = True
has_target = False
categories = { categories = {
4: 'leixing4', 4: 'leixing4',
5: 'leixing5', 5: 'leixing5',
@ -231,8 +231,7 @@ def mock_platform_no_target_2(plugin_module: 'nonebot_bison'):
@pytest.fixture @pytest.fixture
def mock_status_change(plugin_module: 'nonebot_bison'): def mock_status_change(plugin_module: 'nonebot_bison'):
class MockPlatform(plugin_module.platform.platform.StatusChange, class MockPlatform(plugin_module.platform.platform.StatusChange):
plugin_module.platform.platform.NoTargetMixin):
platform_name = 'mock_platform' platform_name = 'mock_platform'
name = 'Mock Platform' name = 'Mock Platform'
@ -241,6 +240,7 @@ def mock_status_change(plugin_module: 'nonebot_bison'):
enable_tag = False enable_tag = False
schedule_type = 'interval' schedule_type = 'interval'
schedule_kw = {'seconds': 10} schedule_kw = {'seconds': 10}
has_target = False
categories = { categories = {
1: '转发', 1: '转发',
2: '视频', 2: '视频',