Merge branch 'dev'

This commit is contained in:
felinae98 2021-06-29 19:43:26 +08:00
commit 122e6875a5
No known key found for this signature in database
GPG Key ID: 00C8B010587FF610
19 changed files with 873 additions and 235 deletions

View File

@ -45,3 +45,8 @@ build-backend = "poetry.masonry.api"
name = "aliyun"
url = "https://mirrors.aliyun.com/pypi/simple/"
default = true
[tool.pytest.ini_options]
markers = [
"compare: compare fetching result with rsshub"
]

View File

@ -1,4 +1,4 @@
from .platform import PlatformProto
from .platform import Platform
from pkgutil import iter_modules
from pathlib import Path
from importlib import import_module
@ -9,9 +9,9 @@ for (_, module_name, _) in iter_modules([_package_dir]):
async def check_sub_target(target_type, target):
return await platform_manager[target_type].get_account_name(target)
return await platform_manager[target_type].get_target_name(target)
platform_manager: dict[str, PlatformProto] = {
platform_manager: dict[str, Platform] = {
obj.platform_name: obj() for obj in \
filter(lambda platform: platform.enabled, PlatformProto.registory)
filter(lambda platform: platform.enabled, Platform.registory)
}

View File

@ -1,21 +1,17 @@
from typing import Any
import httpx
import json
import time
from collections import defaultdict
from bs4 import BeautifulSoup as bs
from datetime import datetime
from nonebot import logger
from ..types import Category, RawPost, Tag, Target
from ..types import RawPost, Target
from .platform import PlatformNoTarget, CategoryNotSupport
from .platform import NewMessage, NoTargetMixin, CategoryNotSupport
from ..utils import Singleton, Render
from ..utils import Render
from ..post import Post
class Arknights(PlatformNoTarget):
class Arknights(NewMessage, NoTargetMixin):
categories = {}
platform_name = 'arknights'
@ -23,13 +19,14 @@ class Arknights(PlatformNoTarget):
enable_tag = False
enabled = True
is_common = False
schedule_interval = 30
schedule_type = 'interval'
schedule_kw = {'seconds': 30}
@staticmethod
async def get_account_name(_: Target) -> str:
async def get_target_name(_: Target) -> str:
return '明日方舟游戏内公告'
async def get_sub_list(self) -> list[RawPost]:
async def get_sub_list(self, _) -> list[RawPost]:
async with httpx.AsyncClient() as client:
raw_data = await client.get('http://ak-fs.hypergryph.com/announce/IOS/announcement.meta.json')
return json.loads(raw_data.text)['announceList']
@ -37,7 +34,7 @@ class Arknights(PlatformNoTarget):
def get_id(self, post: RawPost) -> Any:
return post['announceId']
def get_date(self, post: RawPost) -> None:
def get_date(self, _: RawPost) -> None:
return None
async def parse(self, raw_post: RawPost) -> Post:

View File

@ -5,9 +5,9 @@ import httpx
from ..post import Post
from ..types import Category, RawPost, Tag, Target
from .platform import CategoryNotSupport, Platform
from .platform import NewMessage, TargetMixin, CategoryNotSupport
class Bilibili(Platform):
class Bilibili(NewMessage, TargetMixin):
categories = {
1: "一般动态",
@ -20,11 +20,12 @@ class Bilibili(Platform):
enable_tag = True
enabled = True
is_common = True
schedule_interval = 10
schedule_type = 'interval'
schedule_kw = {'seconds': 10}
name = 'B站'
@staticmethod
async def get_account_name(target: Target) -> Optional[str]:
async def get_target_name(target: Target) -> Optional[str]:
async with httpx.AsyncClient() as client:
res = await client.get('https://api.bilibili.com/x/space/acc/info', params={'mid': target})
res_data = json.loads(res.text)

View File

@ -1,27 +1,26 @@
from typing import Any
import httpx
import json
from .platform import PlatformNoTarget
from ..utils import Singleton
from .platform import NewMessage, NoTargetMixin
from ..types import RawPost
from ..post import Post
class MonsterSiren(PlatformNoTarget):
class MonsterSiren(NewMessage, NoTargetMixin):
categories = {}
platform_name = 'monster-siren'
enable_tag = False
enabled = True
is_common = False
schedule_interval = 30
schedule_type = 'interval'
schedule_kw = {'seconds': 30}
name = '塞壬唱片官网新闻'
@staticmethod
async def get_account_name(_) -> str:
async def get_target_name(_) -> str:
return '塞壬唱片新闻'
async def get_sub_list(self) -> list[RawPost]:
async def get_sub_list(self, _) -> list[RawPost]:
async with httpx.AsyncClient() as client:
raw_data = await client.get('https://monster-siren.hypergryph.com/api/news')
return raw_data.json()['data']['list']

View File

@ -1,7 +1,7 @@
from abc import abstractmethod
from abc import abstractmethod, ABC
from dataclasses import dataclass
import time
from collections import defaultdict
from typing import Any, Collection, Optional
from typing import Any, Collection, Optional, Literal
import httpx
from nonebot import logger
@ -18,98 +18,122 @@ class CategoryNotSupport(Exception):
class RegistryMeta(type):
def __new__(cls, name, bases, namespace, **kwargs):
if name not in ['PlatformProto', 'Platform', 'PlatformNoTarget'] and \
'platform_name' not in namespace:
raise TypeError('Platform has no `platform_name`')
return super().__new__(cls, name, bases, namespace, **kwargs)
return super().__new__(cls, name, bases, namespace)
def __init__(cls, name, bases, namespace, **kwargs):
if not hasattr(cls, 'registory'):
if kwargs.get('base'):
# this is the base class
cls.registory = []
elif name not in ['Platform', 'PlatformNoTarget']:
elif not kwargs.get('abstract'):
# this is the subclass
cls.registory.append(cls)
super().__init__(name, bases, namespace, **kwargs)
class RegistryABCMeta(RegistryMeta, ABC):
...
class PlatformProto(metaclass=RegistryMeta):
categories: dict[Category, str]
reverse_category: dict[str, Category]
class StorageMixinProto(metaclass=RegistryABCMeta, abstract=True):
has_target: bool
platform_name: str
name: str
enable_tag: bool
cache: dict[Any, Post]
enabled: bool
is_common: bool
schedule_interval: int
@abstractmethod
async def fetch_new_post(self, target: Target, users: list[UserSubInfo]) -> list[tuple[User, list[Post]]]:
def get_stored_data(self, target: Target) -> Any:
...
@staticmethod
@abstractmethod
async def get_account_name(target: Target) -> Optional[str]:
"return the username(name) of the target"
def set_stored_data(self, target: Target, data: Any):
...
@abstractmethod
def get_id(self, post: RawPost) -> Any:
"Get post id of given RawPost"
class TargetMixin(StorageMixinProto, abstract=True):
@abstractmethod
def get_date(self, post: RawPost) -> Optional[int]:
"Get post timestamp and return, return None if can't get the time"
has_target = True
def __init__(self):
super().__init__()
self.store: dict[Target, Any] = dict()
def get_stored_data(self, target: Target) -> Any:
return self.store.get(target)
def set_stored_data(self, target: Target, data: Any):
self.store[target] = data
class NoTargetMixin(StorageMixinProto, abstract=True):
has_target = False
def __init__(self):
super().__init__()
self.store = None
def get_stored_data(self, _: Target) -> Any:
return self.store
def set_stored_data(self, _: Target, data: Any):
self.store = data
class PlaformNameMixin(metaclass=RegistryABCMeta, abstract=True):
platform_name: str
class CategoryMixin(metaclass=RegistryABCMeta, abstract=True):
@abstractmethod
def get_category(self, post: RawPost) -> Optional[Category]:
"Return category of given Rawpost"
raise NotImplementedError()
@abstractmethod
def get_tags(self, raw_post: RawPost) -> Optional[Collection[Tag]]:
"Return Tag list of given RawPost"
class ParsePostMixin(metaclass=RegistryABCMeta, abstract=True):
@abstractmethod
async def parse(self, raw_post: RawPost) -> Post:
"parse RawPost into post"
...
class MessageProcessMixin(PlaformNameMixin, CategoryMixin, ParsePostMixin, abstract=True):
"General message process fetch, parse, filter progress"
def __init__(self):
super().__init__()
self.parse_cache: dict[Any, Post] = dict()
@abstractmethod
def filter_platform_custom(self, post: RawPost) -> bool:
"a customed filter"
raise NotImplementedError()
def get_id(self, post: RawPost) -> Any:
"Get post id of given RawPost"
async def _parse_with_cache(self, post: RawPost) -> Post:
post_id = self.get_id(post)
if post_id not in self.cache:
async def _parse_with_cache(self, raw_post: RawPost) -> Post:
post_id = self.get_id(raw_post)
if post_id not in self.parse_cache:
retry_times = 3
while retry_times:
try:
self.cache[post_id] = await self.parse(post)
self.parse_cache[post_id] = await self.parse(raw_post)
break
except Exception as err:
if not retry_times:
raise err
retry_times -= 1
return self.cache[post_id]
return self.parse_cache[post_id]
def _do_filter_common(self, raw_post_list: list[RawPost], exists_posts_set: set) -> list[RawPost]:
@abstractmethod
async def get_sub_list(self, target: Target) -> list[RawPost]:
"Get post list of the given target"
@abstractmethod
def get_date(self, post: RawPost) -> Optional[int]:
"Get post timestamp and return, return None if can't get the time"
async def filter_common(self, raw_post_list: list[RawPost]) -> list[RawPost]:
res = []
for raw_post in raw_post_list:
post_id = self.get_id(raw_post)
if post_id in exists_posts_set:
continue
# post_id = self.get_id(raw_post)
# if post_id in exists_posts_set:
# continue
if (post_time := self.get_date(raw_post)) and time.time() - post_time > 2 * 60 * 60 and \
plugin_config.hk_reporter_init_filter:
continue
try:
if not self.filter_platform_custom(raw_post):
continue
except NotImplementedError:
pass
try:
self.get_category(raw_post)
except CategoryNotSupport:
@ -117,9 +141,45 @@ class PlatformProto(metaclass=RegistryMeta):
except NotImplementedError:
pass
res.append(raw_post)
exists_posts_set.add(post_id)
return res
class NewMessageProcessMixin(StorageMixinProto, MessageProcessMixin, abstract=True):
"General message process, fetch, parse, filter, and only returns the new Post"
@dataclass
class MessageStorage():
inited: bool
exists_posts: set[Any]
async def filter_common_with_diff(self, target: Target, raw_post_list: list[RawPost]) -> list[RawPost]:
filtered_post = await self.filter_common(raw_post_list)
store = self.get_stored_data(target) or self.MessageStorage(False, set())
res = []
if not store.inited and plugin_config.hk_reporter_init_filter:
# target not init
for raw_post in filtered_post:
post_id = self.get_id(raw_post)
store.exists_posts.add(post_id)
logger.info('init {}-{} with {}'.format(self.platform_name, target, store.exists_posts))
store.inited = True
else:
for raw_post in filtered_post:
post_id = self.get_id(raw_post)
if post_id in store.exists_posts:
continue
res.append(raw_post)
self.set_stored_data(target, store)
return res
class UserCustomFilterMixin(CategoryMixin, ParsePostMixin, abstract=True):
categories: dict[Category, str]
enable_tag: bool
@abstractmethod
def get_tags(self, raw_post: RawPost) -> Optional[Collection[Tag]]:
"Return Tag list of given RawPost"
async def filter_user_custom(self, raw_post_list: list[RawPost], cats: list[Category], tags: list[Tag]) -> list[RawPost]:
res: list[RawPost] = []
for raw_post in raw_post_list:
@ -139,112 +199,96 @@ class PlatformProto(metaclass=RegistryMeta):
res.append(raw_post)
return res
async def dispatch_user_post(self, target: Target, new_posts: list[RawPost], users: list[UserSubInfo]) -> list[tuple[User, list[Post]]]:
res: list[tuple[User, list[Post]]] = []
for user, category_getter, tag_getter in users:
required_tags = tag_getter(target) if self.enable_tag else []
cats = category_getter(target)
user_raw_post = await self.filter_user_custom(new_posts, cats, required_tags)
user_post: list[Post] = []
for raw_post in user_raw_post:
if isinstance(self, MessageProcessMixin):
user_post.append(await self._parse_with_cache(raw_post))
else:
user_post.append(await self.parse(raw_post))
res.append((user, user_post))
return res
class Platform(PlatformProto):
"platform with target(account), like weibo, bilibili"
class Platform(metaclass=RegistryABCMeta, base=True):
# schedule_interval: int
schedule_type: Literal['date', 'interval', 'cron']
schedule_kw: dict
is_common: bool
enabled: bool
name: str
categories: dict[Category, str]
has_target: bool = True
platform_name: str
enable_tag: bool
def __init__(self):
self.exists_posts = defaultdict(set)
self.inited = dict()
self.reverse_category = {}
self.cache: dict[Any, Post] = {}
for key, val in self.categories.items():
self.reverse_category[val] = key
@staticmethod
@abstractmethod
async def get_target_name(target: Target) -> Optional[str]:
...
@abstractmethod
async def get_sub_list(self, target: Target) -> list[RawPost]:
"Get post list of the given target"
async def filter_common(self, target: Target, raw_post_list: list[RawPost]) -> list[RawPost]:
if not self.inited.get(target, False) and plugin_config.hk_reporter_init_filter:
# target not init
for raw_post in raw_post_list:
post_id = self.get_id(raw_post)
self.exists_posts[target].add(post_id)
logger.info('init {}-{} with {}'.format(self.platform_name, target, self.exists_posts[target]))
self.inited[target] = True
return []
return self._do_filter_common(raw_post_list, self.exists_posts[target])
async def fetch_new_post(self, target: Target, users: list[UserSubInfo]) -> list[tuple[User, list[Post]]]:
...
class NewMessage(
Platform,
NewMessageProcessMixin,
UserCustomFilterMixin,
abstract=True
):
"Fetch a list of messages, filter the new messages, dispatch it to different users"
async def fetch_new_post(self, target: Target, users: list[UserSubInfo]) -> list[tuple[User, list[Post]]]:
try:
post_list = await self.get_sub_list(target)
new_posts = await self.filter_common(target, post_list)
res: list[tuple[User, list[Post]]] = []
new_posts = await self.filter_common_with_diff(target, post_list)
if not new_posts:
return []
else:
for post in new_posts:
logger.info('fetch new post from {} {}: {}'.format(self.platform_name, target, self.get_id(post)))
for user, category_getter, tag_getter in users:
required_tags = tag_getter(target) if self.enable_tag else []
cats = category_getter(target)
user_raw_post = await self.filter_user_custom(new_posts, cats, required_tags)
user_post: list[Post] = []
for raw_post in user_raw_post:
user_post.append(await self._parse_with_cache(raw_post))
res.append((user, user_post))
self.cache = {}
logger.info('fetch new post from {} {}: {}'.format(
self.platform_name,
target if self.has_target else '-',
self.get_id(post)))
res = await self.dispatch_user_post(target, new_posts, users)
self.parse_cache = {}
return res
except httpx.RequestError as err:
logger.warning("network connection error: {}, url: {}".format(type(err), err.request.url))
return []
class StatusChange(
Platform,
StorageMixinProto,
PlaformNameMixin,
UserCustomFilterMixin,
abstract=True
):
"Watch a status, and fire a post when status changes"
class PlatformNoTarget(PlatformProto):
@abstractmethod
async def get_status(self, target: Target) -> Any:
...
categories: dict[Category, str]
has_target = False
platform_name: str
enable_tag: bool
@abstractmethod
def compare_status(self, target: Target, old_status, new_status) -> Optional[RawPost]:
...
async def get_sub_list(self) -> list[RawPost]:
"Get post list of the given target"
raise NotImplementedError()
@abstractmethod
async def parse(self, raw_post: RawPost) -> Post:
...
def __init__(self):
self.exists_posts = set()
self.inited = False
self.reverse_category = {}
self.cache: dict[Any, Post] = {}
for key, val in self.categories.items():
self.reverse_category[val] = key
async def filter_common(self, raw_post_list: list[RawPost]) -> list[RawPost]:
if not self.inited and plugin_config.hk_reporter_init_filter:
# target not init
for raw_post in raw_post_list:
post_id = self.get_id(raw_post)
self.exists_posts.add(post_id)
logger.info('init {} with {}'.format(self.platform_name, self.exists_posts))
self.inited = True
return []
return self._do_filter_common(raw_post_list, self.exists_posts)
async def fetch_new_post(self, _: Target, users: list[UserSubInfo]) -> list[tuple[User, list[Post]]]:
async def fetch_new_post(self, target: Target, users: list[UserSubInfo]) -> list[tuple[User, list[Post]]]:
try:
post_list = await self.get_sub_list()
new_posts = await self.filter_common(post_list)
res: list[tuple[User, list[Post]]] = []
if not new_posts:
return []
else:
for post in new_posts:
logger.info('fetch new post from {}: {}'.format(self.platform_name, self.get_id(post)))
for user, category_getter, tag_getter in users:
required_tags = tag_getter(Target('default'))
cats = category_getter(Target('default'))
user_raw_post = await self.filter_user_custom(new_posts, cats, required_tags)
user_post: list[Post] = []
for raw_post in user_raw_post:
user_post.append(await self._parse_with_cache(raw_post))
res.append((user, user_post))
self.cache = {}
new_status = await self.get_status(target)
res = []
if old_status := self.get_stored_data(target):
diff = self.compare_status(target, old_status, new_status)
if diff:
res = await self.dispatch_user_post(target, [diff], users)
self.set_stored_data(target, new_status)
return res
except httpx.RequestError as err:
logger.warning("network connection error: {}, url: {}".format(type(err), err.request.url))

View File

@ -7,9 +7,9 @@ import httpx
from ..post import Post
from ..types import RawPost, Target
from .platform import Platform
from .platform import NewMessage, TargetMixin
class Rss(Platform):
class Rss(NewMessage, TargetMixin):
categories = {}
enable_tag = False
@ -17,10 +17,11 @@ class Rss(Platform):
name = "Rss"
enabled = True
is_common = True
schedule_interval = 30
schedule_type = 'interval'
schedule_kw = {'seconds': 30}
@staticmethod
async def get_account_name(target: Target) -> Optional[str]:
async def get_target_name(target: Target) -> Optional[str]:
async with httpx.AsyncClient() as client:
res = await client.get(target, timeout=10.0)
feed = feedparser.parse(res.text)

View File

@ -9,70 +9,70 @@ import httpx
from ..post import Post
from ..types import *
from .platform import Platform
# from .platform import Platform
class Wechat(Platform):
# class Wechat(Platform):
categories = {}
enable_tag = False
platform_name = 'wechat'
enabled = False
is_common = False
name = '微信公众号'
# categories = {}
# enable_tag = False
# platform_name = 'wechat'
# enabled = False
# is_common = False
# name = '微信公众号'
@classmethod
def _get_query_url(cls, target: Target):
return 'https://weixin.sogou.com/weixin?type=1&s_from=input&query={}&ie=utf8&_sug_=n&_sug_type_='.format(target)
# @classmethod
# def _get_query_url(cls, target: Target):
# return 'https://weixin.sogou.com/weixin?type=1&s_from=input&query={}&ie=utf8&_sug_=n&_sug_type_='.format(target)
@classmethod
async def _get_target_soup(cls, target: Target) -> Optional[bs]:
target_url = cls._get_query_url(target)
async with httpx.AsyncClient() as client:
res = await client.get(target_url)
soup = bs(res.text, 'html.parser')
blocks = soup.find(class_='news-list2').find_all('li',recursive=False)
for block in blocks:
if block.find(string=[target]):
return block
# @classmethod
# async def _get_target_soup(cls, target: Target) -> Optional[bs]:
# target_url = cls._get_query_url(target)
# async with httpx.AsyncClient() as client:
# res = await client.get(target_url)
# soup = bs(res.text, 'html.parser')
# blocks = soup.find(class_='news-list2').find_all('li',recursive=False)
# for block in blocks:
# if block.find(string=[target]):
# return block
@classmethod
async def get_account_name(cls, target: Target) -> Optional[str]:
if not (block := await cls._get_target_soup(target)):
return None
return block.find('p', class_='tit').find('a').text
# @classmethod
# async def get_account_name(cls, target: Target) -> Optional[str]:
# if not (block := await cls._get_target_soup(target)):
# return None
# return block.find('p', class_='tit').find('a').text
async def get_sub_list(self, target: Target) -> list[RawPost]:
block = await self._get_target_soup(target)
if (last_post_dt := block.find('dt', string='最近文章:')):
post = {
'title': last_post_dt.find_parent().find('a').text,
'target': target,
'page_url': self._get_query_url(target),
'name': block.find('p', class_='tit').find('a').text
}
return [post]
else:
return []
# async def get_sub_list(self, target: Target) -> list[RawPost]:
# block = await self._get_target_soup(target)
# if (last_post_dt := block.find('dt', string='最近文章:')):
# post = {
# 'title': last_post_dt.find_parent().find('a').text,
# 'target': target,
# 'page_url': self._get_query_url(target),
# 'name': block.find('p', class_='tit').find('a').text
# }
# return [post]
# else:
# return []
def get_id(self, post: RawPost) -> Any:
return post['title']
# def get_id(self, post: RawPost) -> Any:
# return post['title']
def get_date(self, post: RawPost):
return None
# def get_date(self, post: RawPost):
# return None
def get_tags(self, post: RawPost):
return None
# def get_tags(self, post: RawPost):
# return None
def get_category(self, post: RawPost):
return None
# def get_category(self, post: RawPost):
# return None
async def parse(self, raw_post: RawPost) -> Post:
# TODO get content of post
return Post(target_type='wechat',
text='{}\n详细内容请自行查看公众号'.format(raw_post['title']),
target_name=raw_post['name'],
pics=[],
url=''
)
# async def parse(self, raw_post: RawPost) -> Post:
# # TODO get content of post
# return Post(target_type='wechat',
# text='{}\n详细内容请自行查看公众号'.format(raw_post['title']),
# target_name=raw_post['name'],
# pics=[],
# url=''
# )

View File

@ -9,9 +9,9 @@ from nonebot import logger
from ..post import Post
from ..types import *
from .platform import Platform
from .platform import NewMessage, TargetMixin
class Weibo(Platform):
class Weibo(NewMessage, TargetMixin):
categories = {
1: '转发',
@ -23,14 +23,11 @@ class Weibo(Platform):
name = '新浪微博'
enabled = True
is_common = True
schedule_interval = 10
def __init__(self):
self.top : dict[Target, RawPost] = dict()
super().__init__()
schedule_type = 'interval'
schedule_kw = {'seconds': 10}
@staticmethod
async def get_account_name(target: Target) -> Optional[str]:
async def get_target_name(target: Target) -> Optional[str]:
async with httpx.AsyncClient() as client:
param = {'containerid': '100505' + target}
res = await client.get('https://m.weibo.cn/api/container/getIndex', params=param)
@ -47,7 +44,8 @@ class Weibo(Platform):
res_data = json.loads(res.text)
if not res_data['ok']:
return []
return res_data['data']['cards']
custom_filter: Callable[[RawPost], bool] = lambda d: d['card_type'] == 9
return list(filter(custom_filter, res_data['data']['cards']))
def get_id(self, post: RawPost) -> Any:
return post['mblog']['id']

View File

@ -10,11 +10,12 @@ from .types import UserSubInfo
scheduler = AsyncIOScheduler()
@get_driver().on_startup
async def _start():
scheduler.configure({"apscheduler.timezone": "Asia/Shanghai"})
scheduler.start()
get_driver().on_startup(_start)
# get_driver().on_startup(_start)
async def fetch_and_send(target_type: str):
config = Config()
@ -41,10 +42,10 @@ async def fetch_and_send(target_type: str):
send_msgs(bot, user.user, user.user_type, await send_post.generate_messages())
for platform_name, platform in platform_manager.items():
if isinstance(platform.schedule_interval, int):
logger.info(f'start scheduler for {platform_name} with interval {platform.schedule_interval}')
if platform.schedule_type in ['cron', 'interval', 'date']:
logger.info(f'start scheduler for {platform_name} with {platform.schedule_type} {platform.schedule_kw}')
scheduler.add_job(
fetch_and_send, 'interval', seconds=platform.schedule_interval,
fetch_and_send, platform.schedule_type, **platform.schedule_kw,
args=(platform_name,))
scheduler.add_job(do_send_msgs, 'interval', seconds=0.3)

View File

@ -0,0 +1,24 @@
<!doctype html>
<html>
<head>
<meta charset="utf-8" />
<meta http-equiv="X-UA-Compatible" content="IE=edge" />
<meta name="viewport" content="width=device-width, initial-scale=1, minimum-scale=1" />
<meta name="keywords" content="明日方舟,明日方舟官网,明日方舟手游,二次元,明日方舟Arknights,魔物娘,战棋,策略,塔防,塔防RPG,Arknights,人外,Monster" />
<meta name="description" content="《明日方舟》是一款魔物主题的策略手游。在游戏中,玩家将管理一艘满载“ 魔物干员”的方舟,为调查来源神秘的矿石灾难而踏上旅途。在这个宽广而危机四伏的世界中,你或许会看到废土中的城市废墟,或许会看到仿若幻境的亚人国度,或许会遭遇无法解读的神秘,或许参与无比残酷的战争。在有关幻想与异种生命的世界中,体验史诗与想象,情感与牵绊!" />
<link rel="icon" href="data:;base64,=" />
<title>公告</title>
<link rel="stylesheet" href="../../assets/css/announcement.css" />
</head>
<body>
<div class="main">
<div class="container">
<div class="banner-image-container cover">
<a class="cover-jumper" href="uniwebview://move?target=recruit&amp;param1=NORM_19_0_4">
<img class="banner-image" src="https://ak-fs.hypergryph.com/announce/images/20210623/e6f49aeb9547a2278678368a43b95b07.jpg" />
</a>
</div>
</div>
</div>
</body>
</html>

View File

@ -0,0 +1,108 @@
{
"focusAnnounceId": "677",
"announceList": [
{
"announceId": "677",
"title": "联锁竞赛预告\n「荷谟伊智境」",
"isWebUrl": true,
"webUrl": "https://ak-fs.hypergryph.com/announce/IOS/announcement/677.html",
"day": 28,
"month": 6,
"group": "ACTIVITY"
},
{
"announceId": "676",
"title": "「制作组通讯」\n#12期",
"isWebUrl": true,
"webUrl": "https://ak-fs.hypergryph.com/announce/IOS/announcement/676.html",
"day": 23,
"month": 6,
"group": "SYSTEM"
},
{
"announceId": "672",
"title": "时代系列\n复刻限时上架",
"isWebUrl": true,
"webUrl": "https://ak-fs.hypergryph.com/announce/IOS/announcement/672.html",
"day": 17,
"month": 6,
"group": "ACTIVITY"
},
{
"announceId": "671",
"title": "生命之地系列\n新装限时上架",
"isWebUrl": true,
"webUrl": "https://ak-fs.hypergryph.com/announce/IOS/announcement/671.html",
"day": 17,
"month": 6,
"group": "ACTIVITY"
},
{
"announceId": "670",
"title": "【君影轻灵】\n复刻寻访开启",
"isWebUrl": true,
"webUrl": "https://ak-fs.hypergryph.com/announce/IOS/announcement/670.html",
"day": 17,
"month": 6,
"group": "ACTIVITY"
},
{
"announceId": "667",
"title": "沃伦姆德的薄暮\n限时复刻开启",
"isWebUrl": true,
"webUrl": "https://ak-fs.hypergryph.com/announce/IOS/announcement/667.html",
"day": 17,
"month": 6,
"group": "ACTIVITY"
},
{
"announceId": "97",
"title": "新人寻访特惠\n必得六星干员",
"isWebUrl": true,
"webUrl": "https://ak-fs.hypergryph.com/announce/IOS/announcement/97.html",
"day": 30,
"month": 4,
"group": "ACTIVITY"
},
{
"announceId": "95",
"title": "通关特定关卡\n赠送专属时装",
"isWebUrl": true,
"webUrl": "https://ak-fs.hypergryph.com/announce/IOS/announcement/95.html",
"day": 30,
"month": 4,
"group": "ACTIVITY"
},
{
"announceId": "192",
"title": "《明日方舟》\n公测开启说明",
"isWebUrl": true,
"webUrl": "https://ak-fs.hypergryph.com/announce/IOS/announcement/192.html",
"day": 30,
"month": 4,
"group": "SYSTEM"
},
{
"announceId": "98",
"title": "《明日方舟》\n公平运营申明",
"isWebUrl": true,
"webUrl": "https://ak-fs.hypergryph.com/announce/IOS/announcement/98.html",
"day": 30,
"month": 4,
"group": "SYSTEM"
},
{
"announceId": "94",
"title": "常驻活动介绍",
"isWebUrl": true,
"webUrl": "https://ak-fs.hypergryph.com/announce/IOS/announcement/94.html",
"day": 30,
"month": 4,
"group": "ACTIVITY"
}
],
"extra": {
"enable": false,
"name": "额外活动"
}
}

View File

@ -0,0 +1 @@
{"focusAnnounceId":"677","announceList":[{"announceId":"677","title":"联锁竞赛预告\n「荷谟伊智境」","isWebUrl":true,"webUrl":"https://ak-fs.hypergryph.com/announce/IOS/announcement/677.html","day":28,"month":6,"group":"ACTIVITY"},{"announceId":"675","title":"特定干员\n限时出率上升","isWebUrl":true,"webUrl":"https://ak-fs.hypergryph.com/announce/IOS/announcement/675.html","day":24,"month":6,"group":"ACTIVITY"},{"announceId":"676","title":"「制作组通讯」\n#12期","isWebUrl":true,"webUrl":"https://ak-fs.hypergryph.com/announce/IOS/announcement/676.html","day":23,"month":6,"group":"SYSTEM"},{"announceId":"672","title":"时代系列\n复刻限时上架","isWebUrl":true,"webUrl":"https://ak-fs.hypergryph.com/announce/IOS/announcement/672.html","day":17,"month":6,"group":"ACTIVITY"},{"announceId":"671","title":"生命之地系列\n新装限时上架","isWebUrl":true,"webUrl":"https://ak-fs.hypergryph.com/announce/IOS/announcement/671.html","day":17,"month":6,"group":"ACTIVITY"},{"announceId":"670","title":"【君影轻灵】\n复刻寻访开启","isWebUrl":true,"webUrl":"https://ak-fs.hypergryph.com/announce/IOS/announcement/670.html","day":17,"month":6,"group":"ACTIVITY"},{"announceId":"667","title":"沃伦姆德的薄暮\n限时复刻开启","isWebUrl":true,"webUrl":"https://ak-fs.hypergryph.com/announce/IOS/announcement/667.html","day":17,"month":6,"group":"ACTIVITY"},{"announceId":"97","title":"新人寻访特惠\n必得六星干员","isWebUrl":true,"webUrl":"https://ak-fs.hypergryph.com/announce/IOS/announcement/97.html","day":30,"month":4,"group":"ACTIVITY"},{"announceId":"95","title":"通关特定关卡\n赠送专属时装","isWebUrl":true,"webUrl":"https://ak-fs.hypergryph.com/announce/IOS/announcement/95.html","day":30,"month":4,"group":"ACTIVITY"},{"announceId":"192","title":"《明日方舟》\n公测开启说明","isWebUrl":true,"webUrl":"https://ak-fs.hypergryph.com/announce/IOS/announcement/192.html","day":30,"month":4,"group":"SYSTEM"},{"announceId":"98","title":"《明日方舟》\n公平运营申明","isWebUrl":true,"webUrl":"https://ak-fs.hypergryph.com/announce/IOS/announcement/98.html","day":30,"month":4,"group":"SYSTEM"},{"announceId":"94","title":"常驻活动介绍","isWebUrl":true,"webUrl":"https://ak-fs.hypergryph.com/announce/IOS/announcement/94.html","day":30,"month":4,"group":"ACTIVITY"}],"extra":{"enable":false,"name":"额外活动"}}

View File

@ -0,0 +1,63 @@
{
"code": 0,
"msg": "",
"data": {
"list": [
{
"cid": "114091",
"title": "#AUS小屋",
"cate": 8,
"date": "2021-06-23"
},
{
"cid": "027726",
"title": "「音律联觉原声EP」正式上架",
"cate": 1,
"date": "2021-06-12"
},
{
"cid": "750459",
"title": "「ManiFesto:」MV正式公开",
"cate": 1,
"date": "2021-06-08"
},
{
"cid": "241304",
"title": "「Real Me」正式上架",
"cate": 1,
"date": "2021-06-01"
},
{
"cid": "578835",
"title": "#D.D.D.PHOTO",
"cate": 8,
"date": "2021-05-24"
},
{
"cid": "489188",
"title": "#AUS小屋",
"cate": 8,
"date": "2021-05-19"
},
{
"cid": "992677",
"title": "「Immutable」正式上架",
"cate": 1,
"date": "2021-05-02"
},
{
"cid": "605962",
"title": "「Voices」正式上架",
"cate": 1,
"date": "2021-05-01"
},
{
"cid": "336213",
"title": "#D.D.D.PHOTO",
"cate": 8,
"date": "2021-04-28"
}
],
"end": false
}
}

View File

@ -0,0 +1 @@
{"code":0,"msg":"","data":{"list":[{"cid":"241303","title":"#D.D.D.PHOTO","cate":8,"date":"2021-06-29"},{"cid":"114091","title":"#AUS小屋","cate":8,"date":"2021-06-23"},{"cid":"027726","title":"「音律联觉原声EP」正式上架","cate":1,"date":"2021-06-12"},{"cid":"750459","title":"「ManiFesto:」MV正式公开","cate":1,"date":"2021-06-08"},{"cid":"241304","title":"「Real Me」正式上架","cate":1,"date":"2021-06-01"},{"cid":"578835","title":"#D.D.D.PHOTO","cate":8,"date":"2021-05-24"},{"cid":"489188","title":"#AUS小屋","cate":8,"date":"2021-05-19"},{"cid":"992677","title":"「Immutable」正式上架","cate":1,"date":"2021-05-02"},{"cid":"605962","title":"「Voices」正式上架","cate":1,"date":"2021-05-01"},{"cid":"336213","title":"#D.D.D.PHOTO","cate":8,"date":"2021-04-28"}],"end":false}}

View File

@ -0,0 +1,49 @@
import pytest
import typing
import respx
from httpx import Response
import feedparser
if typing.TYPE_CHECKING:
import sys
sys.path.append('./src/plugins')
import nonebot_hk_reporter
from .utils import get_json, get_file
@pytest.fixture
def arknights(plugin_module: 'nonebot_hk_reporter'):
return plugin_module.platform.platform_manager['arknights']
@pytest.fixture(scope='module')
def arknights_list_0():
return get_json('arknights_list_0.json')
@pytest.fixture(scope='module')
def arknights_list_1():
return get_json('arknights_list_1.json')
@pytest.mark.asyncio
@respx.mock
async def test_fetch_new(arknights, dummy_user_subinfo, arknights_list_0, arknights_list_1):
ak_list_router = respx.get("http://ak-fs.hypergryph.com/announce/IOS/announcement.meta.json")
detail_router = respx.get("https://ak-fs.hypergryph.com/announce/IOS/announcement/675.html")
ak_list_router.mock(return_value=Response(200, json=arknights_list_0))
detail_router.mock(return_value=Response(200, text=get_file('arknights-detail-675.html')))
target = ''
res = await arknights.fetch_new_post(target, [dummy_user_subinfo])
assert(ak_list_router.called)
assert(len(res) == 0)
assert(not detail_router.called)
mock_data = arknights_list_1
ak_list_router.mock(return_value=Response(200, json=mock_data))
res3 = await arknights.fetch_new_post(target, [dummy_user_subinfo])
assert(len(res3[0][1]) == 1)
assert(detail_router.called)
post = res3[0][1][0]
assert(post.target_type == 'arknights')
assert(post.text == '')
assert(post.url == '')
assert(post.target_name == '明日方舟游戏内公告')
assert(len(post.pics) == 1)
assert(post.pics == ['https://ak-fs.hypergryph.com/announce/images/20210623/e6f49aeb9547a2278678368a43b95b07.jpg'])

View File

@ -0,0 +1,44 @@
import pytest
import typing
import respx
from httpx import Response
import feedparser
if typing.TYPE_CHECKING:
import sys
sys.path.append('./src/plugins')
import nonebot_hk_reporter
from .utils import get_json, get_file
@pytest.fixture
def monster_siren(plugin_module: 'nonebot_hk_reporter'):
return plugin_module.platform.platform_manager['monster-siren']
@pytest.fixture(scope='module')
def monster_siren_list_0():
return get_json('monster-siren_list_0.json')
@pytest.fixture(scope='module')
def monster_siren_list_1():
return get_json('monster-siren_list_1.json')
@pytest.mark.asyncio
@respx.mock
async def test_fetch_new(monster_siren, dummy_user_subinfo, monster_siren_list_0, monster_siren_list_1):
ak_list_router = respx.get("https://monster-siren.hypergryph.com/api/news")
ak_list_router.mock(return_value=Response(200, json=monster_siren_list_0))
target = ''
res = await monster_siren.fetch_new_post(target, [dummy_user_subinfo])
assert(ak_list_router.called)
assert(len(res) == 0)
mock_data = monster_siren_list_1
ak_list_router.mock(return_value=Response(200, json=mock_data))
res3 = await monster_siren.fetch_new_post(target, [dummy_user_subinfo])
assert(len(res3[0][1]) == 1)
post = res3[0][1][0]
assert(post.target_type == 'monster-siren')
assert(post.text == '#D.D.D.PHOTO')
assert(post.url == 'https://monster-siren.hypergryph.com/info/241303')
assert(post.target_name == '塞壬唱片新闻')
assert(len(post.pics) == 0)

View File

@ -0,0 +1,300 @@
import sys
import typing
from typing import Any, Optional
import pytest
if typing.TYPE_CHECKING:
import sys
sys.path.append('./src/plugins')
import nonebot_hk_reporter
from nonebot_hk_reporter.types import *
from nonebot_hk_reporter.post import Post
from time import time
now = time()
passed = now - 3 * 60 * 60
raw_post_list_1 = [
{'id': 1, 'text': 'p1', 'date': now, 'tags': ['tag1'], 'category': 1}
]
raw_post_list_2 = raw_post_list_1 + [
{'id': 2, 'text': 'p2', 'date': now, 'tags': ['tag1'], 'category': 1},
{'id': 3, 'text': 'p3', 'date': now, 'tags': ['tag2'], 'category': 2},
{'id': 4, 'text': 'p4', 'date': now, 'tags': ['tag2'], 'category': 3}
]
@pytest.fixture
def dummy_user(plugin_module: 'nonebot_hk_reporter'):
user = plugin_module.types.User('123', 'group')
return user
@pytest.fixture
def user_info_factory(plugin_module: 'nonebot_hk_reporter', dummy_user):
def _user_info(category_getter, tag_getter):
return plugin_module.types.UserSubInfo(dummy_user, category_getter, tag_getter)
return _user_info
@pytest.fixture
def mock_platform_without_cats_tags(plugin_module: 'nonebot_hk_reporter'):
class MockPlatform(plugin_module.platform.platform.NewMessage,
plugin_module.platform.platform.TargetMixin):
platform_name = 'mock_platform'
name = 'Mock Platform'
enabled = True
is_common = True
schedule_interval = 10
enable_tag = False
categories = {}
def __init__(self):
self.sub_index = 0
super().__init__()
@staticmethod
async def get_target_name(_: 'Target'):
return 'MockPlatform'
def get_id(self, post: 'RawPost') -> Any:
return post['id']
def get_date(self, raw_post: 'RawPost') -> float:
return raw_post['date']
async def parse(self, raw_post: 'RawPost') -> 'Post':
return plugin_module.post.Post('mock_platform', raw_post['text'], 'http://t.tt/' + str(self.get_id(raw_post)), target_name='Mock')
async def get_sub_list(self, _: 'Target'):
if self.sub_index == 0:
self.sub_index += 1
return raw_post_list_1
else:
return raw_post_list_2
return MockPlatform()
@pytest.fixture
def mock_platform(plugin_module: 'nonebot_hk_reporter'):
class MockPlatform(plugin_module.platform.platform.NewMessage,
plugin_module.platform.platform.TargetMixin):
platform_name = 'mock_platform'
name = 'Mock Platform'
enabled = True
is_common = True
schedule_interval = 10
enable_tag = True
categories = {
1: '转发',
2: '视频',
}
def __init__(self):
self.sub_index = 0
super().__init__()
@staticmethod
async def get_target_name(_: 'Target'):
return 'MockPlatform'
def get_id(self, post: 'RawPost') -> Any:
return post['id']
def get_date(self, raw_post: 'RawPost') -> float:
return raw_post['date']
def get_tags(self, raw_post: 'RawPost') -> list['Tag']:
return raw_post['tags']
def get_category(self, raw_post: 'RawPost') -> 'Category':
return raw_post['category']
async def parse(self, raw_post: 'RawPost') -> 'Post':
return plugin_module.post.Post('mock_platform', raw_post['text'], 'http://t.tt/' + str(self.get_id(raw_post)), target_name='Mock')
async def get_sub_list(self, _: 'Target'):
if self.sub_index == 0:
self.sub_index += 1
return raw_post_list_1
else:
return raw_post_list_2
return MockPlatform()
@pytest.fixture
def mock_platform_no_target(plugin_module: 'nonebot_hk_reporter'):
class MockPlatform(plugin_module.platform.platform.NewMessage,
plugin_module.platform.platform.NoTargetMixin):
platform_name = 'mock_platform'
name = 'Mock Platform'
enabled = True
is_common = True
schedule_interval = 10
enable_tag = True
categories = {
1: '转发',
2: '视频',
3: '不支持'
}
def __init__(self):
self.sub_index = 0
super().__init__()
@staticmethod
async def get_target_name(_: 'Target'):
return 'MockPlatform'
def get_id(self, post: 'RawPost') -> Any:
return post['id']
def get_date(self, raw_post: 'RawPost') -> float:
return raw_post['date']
def get_tags(self, raw_post: 'RawPost') -> list['Tag']:
return raw_post['tags']
def get_category(self, raw_post: 'RawPost') -> 'Category':
if raw_post['category'] == 3:
raise plugin_module.platform.platform.CategoryNotSupport()
return raw_post['category']
async def parse(self, raw_post: 'RawPost') -> 'Post':
return plugin_module.post.Post('mock_platform', raw_post['text'], 'http://t.tt/' + str(self.get_id(raw_post)), target_name='Mock')
async def get_sub_list(self, _: 'Target'):
if self.sub_index == 0:
self.sub_index += 1
return raw_post_list_1
else:
return raw_post_list_2
return MockPlatform()
@pytest.fixture
def mock_status_change(plugin_module: 'nonebot_hk_reporter'):
class MockPlatform(plugin_module.platform.platform.StatusChange,
plugin_module.platform.platform.NoTargetMixin):
platform_name = 'mock_platform'
name = 'Mock Platform'
enabled = True
is_common = True
enable_tag = False
schedule_type = 'interval'
schedule_kw = {'seconds': 10}
categories = {
1: '转发',
2: '视频',
}
def __init__(self):
self.sub_index = 0
super().__init__()
async def get_status(self, _: 'Target'):
if self.sub_index == 0:
self.sub_index += 1
return {'s': False}
elif self.sub_index == 1:
self.sub_index += 1
return {'s': True}
else:
return {'s': False}
def compare_status(self, target, old_status, new_status) -> Optional['RawPost']:
if old_status['s'] == False and new_status['s'] == True:
return {'text': 'on', 'cat': 1}
elif old_status['s'] == True and new_status['s'] == False:
return {'text': 'off', 'cat': 2}
return None
async def parse(self, raw_post) -> 'Post':
return plugin_module.post.Post('mock_status', raw_post['text'], '')
def get_category(self, raw_post):
return raw_post['cat']
return MockPlatform()
@pytest.mark.asyncio
async def test_new_message_target_without_cats_tags(mock_platform_without_cats_tags, user_info_factory):
res1 = await mock_platform_without_cats_tags.fetch_new_post('dummy', [user_info_factory(lambda _: [1,2], lambda _: [])])
assert(len(res1) == 0)
res2 = await mock_platform_without_cats_tags.fetch_new_post('dummy', [
user_info_factory(lambda _: [], lambda _: []),
])
assert(len(res2) == 1)
posts_1 = res2[0][1]
assert(len(posts_1) == 3)
id_set_1 = set(map(lambda x: x.text, posts_1))
assert('p2' in id_set_1 and 'p3' in id_set_1 and 'p4' in id_set_1)
@pytest.mark.asyncio
async def test_new_message_target(mock_platform, user_info_factory):
res1 = await mock_platform.fetch_new_post('dummy', [user_info_factory(lambda _: [1,2], lambda _: [])])
assert(len(res1) == 0)
res2 = await mock_platform.fetch_new_post('dummy', [
user_info_factory(lambda _: [1,2], lambda _: []),
user_info_factory(lambda _: [1], lambda _: []),
user_info_factory(lambda _: [1,2], lambda _: ['tag1'])
])
assert(len(res2) == 3)
posts_1 = res2[0][1]
posts_2 = res2[1][1]
posts_3 = res2[2][1]
assert(len(posts_1) == 2)
assert(len(posts_2) == 1)
assert(len(posts_3) == 1)
id_set_1 = set(map(lambda x: x.text, posts_1))
id_set_2 = set(map(lambda x: x.text, posts_2))
id_set_3 = set(map(lambda x: x.text, posts_3))
assert('p2' in id_set_1 and 'p3' in id_set_1)
assert('p2' in id_set_2)
assert('p2' in id_set_3)
@pytest.mark.asyncio
async def test_new_message_no_target(mock_platform_no_target, user_info_factory):
res1 = await mock_platform_no_target.fetch_new_post('dummy', [user_info_factory(lambda _: [1,2], lambda _: [])])
assert(len(res1) == 0)
res2 = await mock_platform_no_target.fetch_new_post('dummy', [
user_info_factory(lambda _: [1,2], lambda _: []),
user_info_factory(lambda _: [1], lambda _: []),
user_info_factory(lambda _: [1,2], lambda _: ['tag1'])
])
assert(len(res2) == 3)
posts_1 = res2[0][1]
posts_2 = res2[1][1]
posts_3 = res2[2][1]
assert(len(posts_1) == 2)
assert(len(posts_2) == 1)
assert(len(posts_3) == 1)
id_set_1 = set(map(lambda x: x.text, posts_1))
id_set_2 = set(map(lambda x: x.text, posts_2))
id_set_3 = set(map(lambda x: x.text, posts_3))
assert('p2' in id_set_1 and 'p3' in id_set_1)
assert('p2' in id_set_2)
assert('p2' in id_set_3)
@pytest.mark.asyncio
async def test_status_change(mock_status_change, user_info_factory):
res1 = await mock_status_change.fetch_new_post('dummy', [user_info_factory(lambda _: [1,2], lambda _: [])])
assert(len(res1) == 0)
res2 = await mock_status_change.fetch_new_post('dummy', [
user_info_factory(lambda _: [1,2], lambda _:[])
])
assert(len(res2) == 1)
posts = res2[0][1]
assert(len(posts) == 1)
assert(posts[0].text == 'on')
res3 = await mock_status_change.fetch_new_post('dummy', [
user_info_factory(lambda _: [1,2], lambda _: []),
user_info_factory(lambda _: [1], lambda _: []),
])
assert(len(res3) == 2)
assert(len(res3[0][1]) == 1)
assert(res3[0][1][0].text == 'off')
assert(len(res3[1][1]) == 0)
res4 = await mock_status_change.fetch_new_post('dummy', [user_info_factory(lambda _: [1,2], lambda _: [])])
assert(len(res4) == 0)

View File

@ -23,7 +23,7 @@ def weibo_ak_list_1():
@pytest.mark.asyncio
async def test_get_name(weibo):
name = await weibo.get_account_name('6279793937')
name = await weibo.get_target_name('6279793937')
assert(name == "明日方舟Arknights")
@pytest.mark.asyncio
@ -40,6 +40,7 @@ async def test_fetch_new(weibo, dummy_user_subinfo):
assert(not detail_router.called)
mock_data = get_json('weibo_ak_list_1.json')
ak_list_router.mock(return_value=Response(200, json=mock_data))
# import ipdb; ipdb.set_trace()
res2 = await weibo.fetch_new_post(target, [dummy_user_subinfo])
assert(len(res2) == 0)
mock_data['data']['cards'][1]['mblog']['created_at'] = \
@ -80,7 +81,8 @@ def test_tag(weibo, weibo_ak_list_1):
assert(weibo.get_tags(raw_post) == ['明日方舟', '音律联觉'])
@pytest.mark.asyncio
async def test_rsshub_compare(weibo, dummy_user_subinfo):
@pytest.mark.compare
async def test_rsshub_compare(weibo):
target = '6279793937'
raw_posts = filter(weibo.filter_platform_custom, await weibo.get_sub_list(target))
posts = []