Merge branch 'main' into arknights

This commit is contained in:
felinae98
2021-02-26 21:44:43 +08:00
19 changed files with 67 additions and 23 deletions
@@ -0,0 +1,7 @@
import nonebot
from . import config_manager
from . import config
from . import scheduler
from . import send
+132
View File
@@ -0,0 +1,132 @@
from collections import defaultdict
from os import path
import os
from typing import DefaultDict
import nonebot
from tinydb import Query, TinyDB
from .plugin_config import plugin_config
from .types import User
from .utils import Singleton, supported_target_type
def get_config_path() -> str:
if plugin_config.hk_reporter_config_path:
data_dir = plugin_config.hk_reporter_config_path
else:
working_dir = os.getcwd()
data_dir = path.join(working_dir, 'data')
if not path.isdir(data_dir):
os.makedirs(data_dir)
return path.join(data_dir, 'hk_reporter.json')
class NoSuchUserException(Exception):
pass
class NoSuchSubscribeException(Exception):
pass
class Config(metaclass=Singleton):
migrate_version = 2
def __init__(self):
self.db = TinyDB(get_config_path(), encoding='utf-8')
self.kv_config = self.db.table('kv')
self.user_target = self.db.table('user_target')
self.target_user_cache = {}
self.target_user_cat_cache = {}
self.target_user_tag_cache = {}
self.target_list = {}
self.next_index: DefaultDict[str, int] = defaultdict(lambda: 0)
def add_subscribe(self, user, user_type, target, target_name, target_type, cats, tags):
user_query = Query()
query = (user_query.user == user) & (user_query.user_type == user_type)
if (user_data := self.user_target.get(query)):
# update
subs: list = user_data.get('subs', [])
subs.append({"target": target, "target_type": target_type, 'target_name': target_name, 'cats': cats, 'tags': tags})
self.user_target.update({"subs": subs}, query)
else:
# insert
self.user_target.insert({
'user': user, 'user_type': user_type,
'subs': [{'target': target, 'target_type': target_type, 'target_name': target_name, 'cats': cats, 'tags': tags }]
})
self.update_send_cache()
def list_subscribe(self, user, user_type):
query = Query()
return self.user_target.get((query.user == user) & (query.user_type ==user_type))['subs']
def del_subscribe(self, user, user_type, target, target_type):
user_query = Query()
query = (user_query.user == user) & (user_query.user_type == user_type)
if not (query_res := self.user_target.get(query)):
raise NoSuchUserException()
subs = query_res.get('subs', [])
for idx, sub in enumerate(subs):
if sub.get('target') == target and sub.get('target_type') == target_type:
subs.pop(idx)
self.user_target.update({'subs': subs}, query)
self.update_send_cache()
return
raise NoSuchSubscribeException()
def update_send_cache(self):
res = {target_type: defaultdict(list) for target_type in supported_target_type}
cat_res = {target_type: defaultdict(lambda: defaultdict(list)) for target_type in supported_target_type}
tag_res = {target_type: defaultdict(lambda: defaultdict(list)) for target_type in supported_target_type}
# res = {target_type: defaultdict(lambda: defaultdict(list)) for target_type in supported_target_type}
for user in self.user_target.all():
for sub in user.get('subs', []):
if not sub.get('target_type') in supported_target_type:
continue
res[sub['target_type']][sub['target']].append(User(user['user'], user['user_type']))
cat_res[sub['target_type']][sub['target']]['{}-{}'.format(user['user_type'], user['user'])] = sub['cats']
tag_res[sub['target_type']][sub['target']]['{}-{}'.format(user['user_type'], user['user'])] = sub['tags']
self.target_user_cache = res
self.target_user_cat_cache = cat_res
self.target_user_tag_cache = tag_res
for target_type in self.target_user_cache:
self.target_list[target_type] = list(self.target_user_cache[target_type].keys())
def get_sub_category(self, target_type, target, user_type, user):
return self.target_user_cat_cache[target_type][target]['{}-{}'.format(user_type, user)]
def get_sub_tags(self, target_type, target, user_type, user):
return self.target_user_tag_cache[target_type][target]['{}-{}'.format(user_type, user)]
def get_next_target(self, target_type):
# FIXME 插入或删除target后对队列的影响(但是并不是大问题
if not self.target_list[target_type]:
return None
self.next_index[target_type] %= len(self.target_list[target_type])
res = self.target_list[target_type][self.next_index[target_type]]
self.next_index[target_type] += 1
return res
def start_up():
config = Config()
if not (search_res := config.kv_config.search(Query().name=="version")):
config.kv_config.insert({"name": "version", "value": config.migrate_version})
elif search_res[0].get("value") < config.migrate_version:
query = Query()
version_query = (query.name == 'version')
cur_version = search_res[0].get("value")
if cur_version == 1:
cur_version = 2
for user_conf in config.user_target.all():
conf_id = user_conf.doc_id
subs = user_conf['subs']
for sub in subs:
sub['cats'] = []
sub['tags'] = []
config.user_target.update({'subs': subs}, doc_ids=[conf_id])
config.kv_config.update({"value": config.migrate_version}, version_query)
# do migration
config.update_send_cache()
nonebot.get_driver().on_startup(start_up)
@@ -0,0 +1,145 @@
from nonebot import logger, on_command
from nonebot.adapters.cqhttp import Bot, Event, GroupMessageEvent
from nonebot.adapters.cqhttp.message import Message
from nonebot.adapters.cqhttp.permission import GROUP_ADMIN, GROUP_MEMBER, GROUP_OWNER
from nonebot.permission import Permission, SUPERUSER
from nonebot.rule import to_me
from nonebot.typing import T_State
from .config import Config, NoSuchSubscribeException
from .platform import platform_manager
from .platform.utils import check_sub_target
from .send import send_msgs
from .utils import parse_text
from .types import Target
help_match = on_command('help', rule=to_me(), priority=5)
@help_match.handle()
async def send_help(bot: Bot, event: Event, state: T_State):
message = '使用方法:\n@bot 添加订阅(仅管理员)\n@bot 查询订阅\n@bot 删除订阅(仅管理员)'
await help_match.finish(Message(await parse_text(message)))
add_sub = on_command("添加订阅", rule=to_me(), permission=GROUP_ADMIN | GROUP_OWNER | SUPERUSER, priority=5)
@add_sub.got('platform', '请输入想要订阅的平台,目前支持:{}'.format(', '.join(platform_manager.keys())))
# @add_sub.got('id', '请输入订阅用户的id,详情查阅https://github.com/felinae98/nonebot-hk-reporter')
@add_sub.handle()
async def add_sub_handle_id(bot: Bot, event: Event, state: T_State):
if not platform_manager[state['platform']].has_target or 'id' in state:
return
await bot.send(event=event, message='请输入订阅用户的id,详情查阅https://github.com/felinae98/nonebot-hk-reporter')
await add_sub.pause()
@add_sub.handle()
async def add_sub_parse_id(bot: Bot, event: Event, state: T_State):
if not platform_manager[state['platform']].has_target or 'id' in state:
return
target = str(event.get_message()).strip()
name = await check_sub_target(state['platform'], target)
if not name:
await add_sub.reject('id输入错误')
state['id'] = target
state['name'] = name
@add_sub.handle()
async def add_sub_handle_cat(bot: Bot, event: Event, state: T_State):
if not platform_manager[state['platform']].categories:
return
if 'cats' in state:
return
await bot.send(event=event, message='请输入要订阅的类别,以空格分隔,支持的类别有:{}'.format(
','.join(list(platform_manager[state['platform']].categories.values()))
))
await add_sub.pause()
@add_sub.handle()
async def add_sub_parse_cat(bot: Bot, event: Event, state: T_State):
if not platform_manager[state['platform']].categories:
return
if 'cats' in state:
return
res = []
for cat in str(event.get_message()).strip().split():
if cat not in platform_manager[state['platform']].reverse_category:
await add_sub.reject('不支持 {}'.format(cat))
res.append(platform_manager[state['platform']].reverse_category[cat])
state['cats'] = res
@add_sub.handle()
async def add_sub_handle_tag(bot: Bot, event: Event, state: T_State):
if not platform_manager[state['platform']].enable_tag:
return
if 'tags' in state:
return
await bot.send(event=event, message='请输入要订阅的tag,订阅所有tag输入"全部标签"')
await add_sub.pause()
@add_sub.handle()
async def add_sub_parse_tag(bot: Bot, event: Event, state: T_State):
if not platform_manager[state['platform']].enable_tag:
return
if 'tags' in state:
return
if str(event.get_message()).strip() == '全部标签':
state['tags'] = []
else:
state['tags'] = str(event.get_message()).strip().split()
@add_sub.handle()
async def add_sub_process(bot: Bot, event: GroupMessageEvent, state: T_State):
if not platform_manager[state['platform']].has_target:
state['name'] = await platform_manager[state['platform']].get_account_name(Target(''))
state['id'] = 'default'
config = Config()
config.add_subscribe(event.group_id, user_type='group',
target=state['id'],
target_name=state['name'], target_type=state['platform'],
cats=state.get('cats', []), tags=state.get('tags', []))
await add_sub.finish('添加 {} 成功'.format(state['name']))
query_sub = on_command("查询订阅", rule=to_me(), priority=5)
@query_sub.handle()
async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
config: Config = Config()
sub_list = config.list_subscribe(event.group_id, "group")
res = '订阅的帐号为:\n'
for sub in sub_list:
res += '{} {} {}'.format(sub['target_type'], sub['target_name'], sub['target'])
platform = platform_manager[sub['target_type']]
if platform.categories:
res += ' [{}]'.format(', '.join(map(lambda x: platform.categories[x], sub['cats'])))
if platform.enable_tag:
res += ' {}'.format(', '.join(sub['tags']))
res += '\n'
# send_msgs(bot, event.group_id, 'group', [await parse_text(res)])
await query_sub.finish(Message(await parse_text(res)))
del_sub = on_command("删除订阅", rule=to_me(), permission=GROUP_ADMIN | GROUP_OWNER, priority=5)
@del_sub.handle()
async def send_list(bot: Bot, event: GroupMessageEvent, state: T_State):
config: Config = Config()
sub_list = config.list_subscribe(event.group_id, "group")
res = '订阅的帐号为:\n'
state['sub_table'] = {}
for index, sub in enumerate(sub_list, 1):
state['sub_table'][index] = {'target_type': sub['target_type'], 'target': sub['target']}
res += '{} {} {} {}\n'.format(index, sub['target_type'], sub['target_name'], sub['target'])
platform = platform_manager[sub['target_type']]
if platform.categories:
res += ' [{}]'.format(', '.join(map(lambda x: platform.categories[x], sub['cats'])))
if platform.enable_tag:
res += ' {}'.format(', '.join(sub['tags']))
res += '\n'
res += '请输入要删除的订阅的序号'
await bot.send(event=event, message=Message(await parse_text(res)))
@del_sub.receive()
async def do_del(bot, event: GroupMessageEvent, state: T_State):
try:
index = int(str(event.get_message()).strip())
config = Config()
config.del_subscribe(event.group_id, 'group', **state['sub_table'][index])
except Exception as e:
await del_sub.reject('删除错误')
logger.warning(e)
else:
await del_sub.finish('删除成功')
@@ -0,0 +1,6 @@
from .bilibili import Bilibili
from .rss import Rss
from .weibo import Weibo
from .utils import check_sub_target
from .platform import PlatformNoTarget
from .utils import platform_manager
@@ -0,0 +1,55 @@
from typing import Any
import httpx
import json
import time
from collections import defaultdict
from bs4 import BeautifulSoup as bs
from datetime import datetime
from nonebot import logger
from ..types import Category, RawPost, Tag, Target
from .platform import PlatformNoTarget, CategoryNotSupport
from ..utils import Singleton, Render
from ..post import Post
class Arknights(PlatformNoTarget):
categories = {}
platform_name = 'arknights'
enable_tag = False
@staticmethod
async def get_account_name(_: Target) -> str:
return '明日方舟游戏内公告'
async def get_sub_list(self) -> list[RawPost]:
async with httpx.AsyncClient() as client:
raw_data = await client.get('http://ak-fs.hypergryph.com/announce/IOS/announcement.meta.json')
return json.loads(raw_data.text)['announceList']
def get_id(self, post: RawPost) -> Any:
return post['announceId']
def get_date(self, post: RawPost) -> None:
return None
async def parse(self, raw_post: RawPost) -> Post:
announce_url = raw_post['webUrl']
async with httpx.AsyncClient() as client:
raw_html = await client.get(announce_url)
soup = bs(raw_html, 'html.parser')
pics = []
if soup.find("div", class_="standerd-container"):
# 图文
render = Render()
viewport = {'width': 320, 'height': 6400, 'deviceScaleFactor': 3}
pic_data = await render.render(announce_url, viewport=viewport, target='div.main')
pics.append('base64://{}'.format(pic_data))
elif (pic := soup.find('img', class_='banner-image')):
pics.append(pic['src'])
else:
raise CategoryNotSupport()
return Post('arknights', text='', url='', target_name="明日方舟游戏内公告", pics=pics, compress=True, override_use_pic=False)
@@ -0,0 +1,93 @@
from collections import defaultdict
import json
from typing import Any, Optional
import httpx
from ..post import Post
from ..types import Category, RawPost, Tag, Target
from .platform import CategoryNotSupport, Platform
class Bilibili(Platform):
categories = {
1: "一般动态",
2: "专栏文章",
3: "视频",
4: "纯文字",
# 5: "短视频"
}
platform_name = 'bilibili'
enable_tag = False
@staticmethod
async def get_account_name(target: Target) -> Optional[str]:
async with httpx.AsyncClient() as client:
res = await client.get('https://api.bilibili.com/x/space/acc/info', params={'mid': target})
res_data = json.loads(res.text)
if res_data['code']:
return None
return res_data['data']['name']
async def get_sub_list(self, target: Target) -> list[RawPost]:
async with httpx.AsyncClient() as client:
params = {'host_uid': target, 'offset': 0, 'need_top': 0}
res = await client.get('https://api.vc.bilibili.com/dynamic_svr/v1/dynamic_svr/space_history', params=params, timeout=4.0)
res_dict = json.loads(res.text)
if res_dict['code'] == 0:
return res_dict['data']['cards']
else:
return []
def get_id(self, post: RawPost) -> Any:
return post['desc']['dynamic_id']
def get_date(self, post: RawPost) -> int:
return post['desc']['timestamp']
def get_category(self, post: RawPost) -> Category:
post_type = post['desc']['type']
if post_type == 2:
return Category(1)
elif post_type == 64:
return Category(2)
elif post_type == 8:
return Category(3)
elif post_type == 4:
return Category(4)
elif post_type == 1:
# 转发
raise CategoryNotSupport()
raise CategoryNotSupport()
def get_tags(self, raw_post: RawPost) -> list[Tag]:
return []
async def parse(self, raw_post: RawPost) -> Post:
card_content = json.loads(raw_post['card'])
post_type = self.get_category(raw_post)
target_name = raw_post['desc']['user_profile']['info']['uname']
if post_type == 1:
# 一般动态
text = card_content['item']['description']
url = 'https://t.bilibili.com/{}'.format(raw_post['desc']['dynamic_id'])
pic = [img['img_src'] for img in card_content['item']['pictures']]
elif post_type == 2:
# 专栏文章
text = '{} {}'.format(card_content['title'], card_content['summary'])
url = 'https://www.bilibili.com/read/cv{}'.format(raw_post['desc']['rid'])
pic = card_content['image_urls']
elif post_type == 3:
# 视频
text = card_content['dynamic']
url = 'https://www.bilibili.com/video/{}'.format(raw_post['desc']['bvid'])
pic = [card_content['pic']]
elif post_type == 4:
# 纯文字
text = card_content['item']['content']
url = 'https://t.bilibili.com/{}'.format(raw_post['desc']['dynamic_id'])
pic = []
else:
raise CategoryNotSupport(post_type)
return Post('bilibili', text=text, url=url, pics=pic, target_name=target_name)
@@ -0,0 +1,287 @@
import time
from collections import defaultdict
from typing import Any, Optional
import httpx
from nonebot import logger
from ..config import Config
from ..post import Post
from ..types import Category, RawPost, Tag, Target, User
from ..utils import Singleton
class CategoryNotSupport(Exception):
"raise in get_category, when post category is not supported"
pass
class PlatformProto(metaclass=Singleton):
categories: dict[Category, str]
reverse_category: dict[str, Category]
has_target: bool
platform_name: str
enable_tag: bool
async def fetch_new_post(self, target: Target, users: list[User]) -> list[tuple[User, list[Post]]]:
...
@staticmethod
async def get_account_name(target: Target) -> Optional[str]:
...
class Platform(PlatformProto):
"platform with target(account), like weibo, bilibili"
categories: dict[Category, str]
has_target: bool = True
platform_name: str
enable_tag: bool
def __init__(self):
self.exists_posts = defaultdict(set)
self.inited = dict()
self.reverse_category = {}
self.cache: dict[Any, Post] = {}
for key, val in self.categories.items():
self.reverse_category[val] = key
@staticmethod
async def get_account_name(target: Target) -> Optional[str]:
"Given a tareget, return the username(name) of the target"
raise NotImplementedError()
async def get_sub_list(self, target: Target) -> list[RawPost]:
"Get post list of the given target"
raise NotImplementedError()
def get_id(self, post: RawPost) -> Any:
"Get post id of given RawPost"
raise NotImplementedError()
def get_date(self, post: RawPost) -> Optional[int]:
"Get post timestamp and return, return None if can't get the time"
raise NotImplementedError()
def get_category(self, post: RawPost) -> Optional[Category]:
"Return category of given Rawpost"
raise NotImplementedError()
def get_tags(self, raw_post: RawPost) -> Optional[list[Tag]]:
"Return Tag list of given RawPost"
raise NotImplementedError()
async def parse(self, raw_post: RawPost) -> Post:
"parse RawPost into post"
raise NotImplementedError()
def filter_platform_custom(self, post: RawPost) -> bool:
raise NotImplementedError()
async def _parse_with_cache(self, post: RawPost) -> Post:
post_id = self.get_id(post)
if post_id not in self.cache:
self.cache[post_id] = await self.parse(post)
return self.cache[post_id]
async def filter_common(self, target: Target, raw_post_list: list[RawPost]) -> list[RawPost]:
if not self.inited.get(target, False):
# target not init
for raw_post in raw_post_list:
post_id = self.get_id(raw_post)
self.exists_posts[target].add(post_id)
logger.info('init {}-{} with {}'.format(self.platform_name, target, self.exists_posts[target]))
self.inited[target] = True
return []
res: list[RawPost] = []
for raw_post in raw_post_list:
post_id = self.get_id(raw_post)
if post_id in self.exists_posts[target]:
continue
if (post_time := self.get_date(raw_post)) and time.time() - post_time > 2 * 60 * 60:
continue
try:
if not self.filter_platform_custom(raw_post):
continue
except NotImplementedError:
pass
try:
self.get_category(raw_post)
except CategoryNotSupport:
continue
except NotImplementedError:
pass
res.append(raw_post)
self.exists_posts[target].add(post_id)
return res
async def filter_user_custom(self, raw_post_list: list[RawPost], cats: list[Category], tags: list[Tag]) -> list[RawPost]:
res: list[RawPost] = []
for raw_post in raw_post_list:
if self.categories:
cat = self.get_category(raw_post)
if cats and cat not in cats:
continue
if self.enable_tag and tags:
flag = False
post_tags = self.get_tags(raw_post)
for tag in post_tags:
if tag in tags:
flag = True
break
if not flag:
continue
res.append(raw_post)
return res
async def fetch_new_post(self, target: Target, users: list[User]) -> list[tuple[User, list[Post]]]:
try:
config = Config()
post_list = await self.get_sub_list(target)
new_posts = await self.filter_common(target, post_list)
res: list[tuple[User, list[Post]]] = []
if not new_posts:
return []
else:
for post in new_posts:
logger.info('fetch new post from {} {}: {}'.format(self.platform_name, target, self.get_id(post)))
for user in users:
required_tags = config.get_sub_tags(self.platform_name, target, user.user_type, user.user) if self.enable_tag else []
cats = config.get_sub_category(self.platform_name, target, user.user_type, user.user)
user_raw_post = await self.filter_user_custom(new_posts, cats, required_tags)
user_post: list[Post] = []
for raw_post in user_raw_post:
user_post.append(await self._parse_with_cache(raw_post))
res.append((user, user_post))
self.cache = {}
return res
except httpx.RequestError as err:
logger.warning("network connection error: {}, url: {}".format(type(err), err.request.url))
return []
class PlatformNoTarget(PlatformProto):
categories: dict[Category, str]
has_target = False
platform_name: str
enable_tag: bool
def __init__(self):
self.exists_posts = set()
self.inited = False
self.reverse_category = {}
self.cache: dict[Any, Post] = {}
for key, val in self.categories.items():
self.reverse_category[val] = key
@staticmethod
async def get_account_name(target: Target) -> Optional[str]:
"return the username(name) of the target"
raise NotImplementedError()
async def get_sub_list(self) -> list[RawPost]:
"Get post list of the given target"
raise NotImplementedError()
def get_id(self, post: RawPost) -> Any:
"Get post id of given RawPost"
raise NotImplementedError()
def get_date(self, post: RawPost) -> Optional[int]:
"Get post timestamp and return, return None if can't get the time"
raise NotImplementedError()
def get_category(self, post: RawPost) -> Optional[Category]:
"Return category of given Rawpost"
raise NotImplementedError()
def get_tags(self, raw_post: RawPost) -> Optional[list[Tag]]:
"Return Tag list of given RawPost"
raise NotImplementedError()
async def parse(self, raw_post: RawPost) -> Post:
"parse RawPost into post"
raise NotImplementedError()
def filter_platform_custom(self, post: RawPost) -> bool:
raise NotImplementedError()
async def _parse_with_cache(self, post: RawPost) -> Post:
post_id = self.get_id(post)
if post_id not in self.cache:
self.cache[post_id] = await self.parse(post)
return self.cache[post_id]
async def filter_common(self, raw_post_list: list[RawPost]) -> list[RawPost]:
if not self.inited:
# target not init
for raw_post in raw_post_list:
post_id = self.get_id(raw_post)
self.exists_posts.add(post_id)
logger.info('init {} with {}'.format(self.platform_name, self.exists_posts))
self.inited = True
return []
res: list[RawPost] = []
for raw_post in raw_post_list:
post_id = self.get_id(raw_post)
if post_id in self.exists_posts:
continue
if (post_time := self.get_date(raw_post)) and time.time() - post_time > 2 * 60 * 60:
continue
try:
if not self.filter_platform_custom(raw_post):
continue
except NotImplementedError:
pass
try:
self.get_category(raw_post)
except CategoryNotSupport:
continue
except NotImplementedError:
pass
res.append(raw_post)
self.exists_posts.add(post_id)
return res
async def filter_user_custom(self, raw_post_list: list[RawPost], cats: list[Category], tags: list[Tag]) -> list[RawPost]:
res: list[RawPost] = []
for raw_post in raw_post_list:
if self.categories:
cat = self.get_category(raw_post)
if cats and cat not in cats:
continue
if self.enable_tag and tags:
flag = False
post_tags = self.get_tags(raw_post)
for tag in post_tags:
if tag in tags:
flag = True
break
if not flag:
continue
res.append(raw_post)
return res
async def fetch_new_post(self, _: Target, users: list[User]) -> list[tuple[User, list[Post]]]:
try:
config = Config()
post_list = await self.get_sub_list()
new_posts = await self.filter_common(post_list)
res: list[tuple[User, list[Post]]] = []
if not new_posts:
return []
else:
for post in new_posts:
logger.info('fetch new post from {}: {}'.format(self.platform_name, self.get_id(post)))
for user in users:
required_tags = config.get_sub_tags(self.platform_name, 'default', user.user_type, user.user) if self.enable_tag else []
cats = config.get_sub_category(self.platform_name, 'default', user.user_type, user.user)
user_raw_post = await self.filter_user_custom(new_posts, cats, required_tags)
user_post: list[Post] = []
for raw_post in user_raw_post:
user_post.append(await self._parse_with_cache(raw_post))
res.append((user, user_post))
self.cache = {}
return res
except httpx.RequestError as err:
logger.warning("network connection error: {}, url: {}".format(type(err), err.request.url))
return []
@@ -0,0 +1,41 @@
import calendar
from typing import Any, Optional
from bs4 import BeautifulSoup as bs
import feedparser
import httpx
from ..post import Post
from ..types import RawPost, Target
from .platform import Platform
class Rss(Platform):
categories = {}
enable_tag = False
platform_name = 'rss'
@staticmethod
async def get_account_name(target: Target) -> Optional[str]:
async with httpx.AsyncClient() as client:
res = await client.get(target, timeout=10.0)
feed = feedparser.parse(res.text)
return feed['feed']['title']
def get_date(self, post: RawPost) -> int:
return calendar.timegm(post.published_parsed)
def get_id(self, post: RawPost) -> Any:
return post.id
async def get_sub_list(self, target: Target) -> list[RawPost]:
async with httpx.AsyncClient() as client:
res = await client.get(target, timeout=10.0)
feed = feedparser.parse(res)
return feed.entries
async def parse(self, raw_post: RawPost) -> Post:
soup = bs(raw_post.description, 'html.parser')
text = soup.text
pics = list(map(lambda x: x.attrs['src'], soup('img')))
return Post('rss', text=text, url=raw_post.link, pics=pics)
@@ -0,0 +1,40 @@
import nonebot
from nonebot import logger
from collections import defaultdict
from typing import Type
from .arkninghts import Arknights
from .weibo import Weibo
from .bilibili import Bilibili
from .rss import Rss
from .platform import PlatformProto
from ..config import Config
from ..post import Post
from ..send import send_msgs
async def check_sub_target(target_type, target):
return await platform_manager[target_type].get_account_name(target)
platform_manager: dict[str, PlatformProto] = {
'bilibili': Bilibili(),
'weibo': Weibo(),
'rss': Rss(),
'arknights': Arknights()
}
async def fetch_and_send(target_type: str):
config = Config()
target = config.get_next_target(target_type)
if not target:
return
logger.debug('try to fecth new posts from {}, target: {}'.format(target_type, target))
send_list = config.target_user_cache[target_type][target]
bot_list = list(nonebot.get_bots().values())
bot = bot_list[0] if bot_list else None
to_send = await platform_manager[target_type].fetch_new_post(target, send_list)
for user, send_list in to_send:
for send_post in send_list:
logger.debug('send to {}: {}'.format(user, send_post))
if not bot:
logger.warning('no bot connected')
else:
send_msgs(bot, user.user, user.user_type, await send_post.generate_messages())
@@ -0,0 +1,84 @@
from collections import defaultdict
from datetime import datetime
import json
import re
import time
from typing import Any, Optional
from bs4 import BeautifulSoup as bs
import httpx
from nonebot import logger
from ..post import Post
from ..types import *
from ..utils import Singleton
from .platform import Platform
class Weibo(Platform):
categories = {
1: '转发',
2: '视频',
3: '图文'
}
enable_tag = False
platform_name = 'weibo'
@staticmethod
async def get_account_name(target: Target) -> Optional[str]:
async with httpx.AsyncClient() as client:
param = {'containerid': '100505' + target}
res = await client.get('https://m.weibo.cn/api/container/getIndex', params=param)
res_dict = json.loads(res.text)
if res_dict.get('ok') == 1:
return res_dict['data']['userInfo']['screen_name']
else:
return None
async def get_sub_list(self, target: Target) -> list[RawPost]:
async with httpx.AsyncClient() as client:
params = { 'containerid': '107603' + target}
res = await client.get('https://m.weibo.cn/api/container/getIndex?', params=params, timeout=4.0)
res_data = json.loads(res.text)
if not res_data['ok']:
return []
return res_data['data']['cards']
def get_id(self, post: RawPost) -> Any:
return post['mblog']['id']
def filter_platform_custom(self, raw_post: RawPost) -> bool:
return raw_post['card_type'] == 9
def get_date(self, raw_post: RawPost) -> float:
created_time = datetime.strptime(raw_post['mblog']['created_at'], '%a %b %d %H:%M:%S %z %Y')
return created_time.timestamp()
def get_tags(self, raw_post: RawPost) -> Optional[list[Tag]]:
"Return Tag list of given RawPost"
return None
def get_category(self, raw_post: RawPost) -> Category:
if raw_post['mblog'].get('retweeted_status'):
return Category(1)
elif raw_post['mblog'].get('page_info') and raw_post['mblog']['page_info'].get('type') == 'video':
return Category(2)
else:
return Category(3)
def _get_text(self, raw_text: str) -> str:
text = raw_text.replace('<br />', '\n')
return bs(text, 'html.parser').text
async def parse(self, raw_post: RawPost) -> Post:
info = raw_post['mblog']
if info['isLongText'] or info['pic_num'] > 9:
async with httpx.AsyncClient() as client:
res = await client.get('https://m.weibo.cn/detail/{}'.format(info['mid']))
full_json_text = re.search(r'"status": ([\s\S]+),\s+"hotScheme"', res.text).group(1)
info = json.loads(full_json_text)
parsed_text = self._get_text(info['text'])
pic_urls = [img['large']['url'] for img in info.get('pics', [])]
detail_url = 'https://weibo.com/{}/{}'.format(info['user']['id'], info['bid'])
# return parsed_text, detail_url, pic_urls
return Post('weibo', text=parsed_text, url=detail_url, pics=pic_urls, target_name=info['user']['screen_name'])
@@ -0,0 +1,14 @@
from pydantic import BaseSettings
import nonebot
class PlugConfig(BaseSettings):
hk_reporter_config_path: str = ""
hk_reporter_use_pic: bool = False
hk_reporter_use_local: bool = False
class Config:
extra = 'ignore'
global_config = nonebot.get_driver().config
plugin_config = PlugConfig(**global_config.dict())
+97
View File
@@ -0,0 +1,97 @@
import base64
from dataclasses import dataclass, field
from io import BytesIO
from typing import NoReturn, Optional
from nonebot import logger
import httpx
from PIL import Image
from .plugin_config import plugin_config
from .utils import parse_text
@dataclass
class Post:
target_type: str
text: str
url: Optional[str]
target_name: Optional[str] = None
compress: bool = False
override_use_pic: Optional[bool] = None
pics: list[str] = field(default_factory=list)
def _use_pic(self):
if not self.override_use_pic is None:
return self.override_use_pic
return plugin_config.hk_reporter_use_pic
async def _pic_url_to_image(self, url: str) -> Image.Image:
async with httpx.AsyncClient() as client:
res = await client.get(url)
pic_buffer = BytesIO()
pic_buffer.write(res.content)
return Image.open(pic_buffer)
async def _pic_merge(self) -> None:
if len(self.pics) < 6:
return
first_image = await self._pic_url_to_image(self.pics[0])
if first_image.size[0] != first_image.size[1]:
return
pic_size = first_image.size[0]
images = [first_image]
for pic in self.pics[1:]:
cur_image = await self._pic_url_to_image(pic)
if cur_image.size[0] != pic_size or cur_image.size[1] != pic_size:
break
images.append(cur_image)
if len(images) == 6:
matrix = (3, 2)
self.pics = self.pics[6:]
elif len(images) >= 9:
matrix = (3, 3)
self.pics = self.pics[9:]
else:
return
logger.info('trigger merge image')
target = Image.new('RGB', (matrix[0] * pic_size, matrix[1] * pic_size))
for y in range(matrix[1]):
for x in range(matrix[0]):
target.paste(images[y * matrix[0] + x], (
x * pic_size, y * pic_size, (x + 1) * pic_size, (y + 1) * pic_size
))
target_io = BytesIO()
target.save(target_io, 'JPEG')
b64image = 'base64://' + base64.b64encode(target_io.getvalue()).decode()
self.pics.insert(0, b64image)
async def generate_messages(self):
await self._pic_merge()
msgs = []
text = '来源: {}'.format(self.target_type)
if self.target_name:
text += ' {}'.format(self.target_name)
if self.text:
text += ' \n{}'.format(self.text)
if self._use_pic():
msgs.append(await parse_text(text))
if not self.target_type == 'rss' and self.url:
msgs.append(self.url)
else:
if self.url:
text += ' \n详情: {}'.format(self.url)
msgs.append(text)
for pic in self.pics:
msgs.append("[CQ:image,file={url}]".format(url=pic))
if self.compress:
msgs = [''.join(msgs)]
return msgs
def __str__(self):
return 'type: {}\nfrom: {}\ntext: {}\nurl: {}\npic: {}'.format(
self.target_type,
self.target_name,
self.text,
self.url,
', '.join(map(lambda x: 'b64img' if x.startswith('base64') else x, self.pics))
)
@@ -0,0 +1,27 @@
from nonebot import require
from .send import do_send_msgs
from .platform.utils import fetch_and_send
from apscheduler.schedulers.asyncio import AsyncIOScheduler
scheduler: AsyncIOScheduler = require('nonebot_plugin_apscheduler').scheduler
@scheduler.scheduled_job('interval', seconds=10)
async def weibo_check():
await fetch_and_send('weibo')
@scheduler.scheduled_job('interval', seconds=10)
async def bilibili_check():
await fetch_and_send('bilibili')
@scheduler.scheduled_job('interval', seconds=30)
async def rss_check():
await fetch_and_send('rss')
@scheduler.scheduled_job('interval', seconds=30)
async def arknights_check():
await fetch_and_send('arknights')
@scheduler.scheduled_job('interval', seconds=1)
async def _():
await do_send_msgs()
+33
View File
@@ -0,0 +1,33 @@
from nonebot.adapters.cqhttp import Bot
import nonebot
from nonebot import logger
import time
import asyncio
QUEUE = []
LAST_SEND_TIME = time.time()
async def do_send_msgs():
global LAST_SEND_TIME
if time.time() - LAST_SEND_TIME < 1.5:
return
if QUEUE:
bot, user, user_type, msg, retry_time = QUEUE.pop(0)
try:
if user_type == 'group':
await bot.call_api('send_group_msg', group_id=user, message=msg)
elif user_type == 'private':
await bot.call_api('send_private_msg', user_id=user, message=msg)
except:
if retry_time > 0:
QUEUE.insert(0, (bot, user, user_type, msg, retry_time - 1))
else:
logger.warning('send msg err {}'.format(msg))
LAST_SEND_TIME = time.time()
def send_msgs(bot, user, user_type, msgs):
for msg in msgs:
QUEUE.append((bot, user, user_type, msg, 2))
+12
View File
@@ -0,0 +1,12 @@
from typing import Any, NewType
from dataclasses import dataclass
RawPost = NewType('RawPost', Any)
Target = NewType('Target', str)
Category = NewType('Category', int)
Tag = NewType('Tag', str)
@dataclass
class User:
user: str
user_type: str
+76
View File
@@ -0,0 +1,76 @@
import os
import asyncio
from typing import Optional
import nonebot
from nonebot import logger
import base64
from pyppeteer import launch
from pyppeteer.chromium_downloader import check_chromium, download_chromium
from html import escape
from hashlib import sha256
from tempfile import NamedTemporaryFile
from .plugin_config import plugin_config
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
supported_target_type = ('weibo', 'bilibili', 'rss', 'arknights')
if not plugin_config.hk_reporter_use_local and not check_chromium():
os.environ['PYPPETEER_DOWNLOAD_HOST'] = 'http://npm.taobao.org/mirrors'
download_chromium()
class Render(metaclass=Singleton):
def __init__(self):
self.lock = asyncio.Lock()
async def render(self, url: str, viewport: Optional[dict] = None, target: Optional[str] = None) -> str:
async with self.lock:
if plugin_config.hk_reporter_use_local:
browser = await launch(executablePath='/usr/bin/chromium', args=['--no-sandbox'])
else:
browser = await launch(args=['--no-sandbox'])
page = await browser.newPage()
await page.goto(url)
if viewport:
await page.setViewport(viewport)
if target:
target_ele = await page.querySelector(target)
data = await target_ele.screenshot(type='jpeg', encoding='base64')
else:
data = await page.screenshot(type='jpeg', encoding='base64')
await page.close()
await browser.close()
return str(data)
async def text_to_pic(self, text: str) -> str:
lines = text.split('\n')
parsed_lines = list(map(lambda x: '<p>{}</p>'.format(escape(x)), lines))
html_text = '<div style="width:17em;padding:1em">{}</div>'.format(''.join(parsed_lines))
with NamedTemporaryFile('wt', suffix='.html', delete=False) as tmp:
tmp_path = tmp.name
tmp.write(html_text)
data = await self.render('file://{}'.format(tmp_path), target='div')
os.remove(tmp_path)
return data
async def text_to_pic_cqcode(self, text:str) -> str:
data = await self.text_to_pic(text)
# logger.debug('file size: {}'.format(len(data)))
code = '[CQ:image,file=base64://{}]'.format(data)
# logger.debug(code)
return code
async def parse_text(text: str) -> str:
'return raw text if don\'t use pic, otherwise return rendered opcode'
if plugin_config.hk_reporter_use_pic:
render = Render()
return await render.text_to_pic_cqcode(text)
else:
return text