reconstruct

This commit is contained in:
felinae98 2021-02-16 17:24:23 +08:00
parent b0ab1e578e
commit 80bd0ae068
No known key found for this signature in database
GPG Key ID: 00C8B010587FF610
13 changed files with 618 additions and 113 deletions

View File

@ -1,7 +1,4 @@
import nonebot
from .plugin_config import PlugConfig
global_config = nonebot.get_driver().config
plugin_config = PlugConfig(**global_config.dict())
from . import config_manager
from . import config

View File

@ -1,9 +1,11 @@
from .utils import Singleton, supported_target_type
from . import plugin_config
from .types import User
from .plugin_config import plugin_config
from os import path
import nonebot
from tinydb import TinyDB, Query
from collections import defaultdict
from typing import DefaultDict
import os
@ -25,23 +27,25 @@ class NoSuchSubscribeException(Exception):
class Config(metaclass=Singleton):
migrate_version = 1
migrate_version = 2
def __init__(self):
self.db = TinyDB(get_config_path(), encoding='utf-8')
self.kv_config = self.db.table('kv')
self.user_target = self.db.table('user_target')
self.target_user_cache = {}
self.target_user_cat_cache = {}
self.target_user_tag_cache = {}
self.target_list = {}
self.next_index = defaultdict(lambda: 0)
self.next_index: DefaultDict[str, int] = defaultdict(lambda: 0)
def add_subscribe(self, user, user_type, target, target_name, target_type):
def add_subscribe(self, user, user_type, target, target_name, target_type, cats, tags):
user_query = Query()
query = (user_query.user == user) & (user_query.user_type == user_type)
if (user_data := self.user_target.get(query)):
# update
subs: list = user_data.get('subs', [])
subs.append({"target": target, "target_type": target_type, 'target_name': target_name})
subs.append({"target": target, "target_type": target_type, 'target_name': target_name, 'cats': cats, 'tags': tags})
self.user_target.update({"subs": subs}, query)
else:
# insert
@ -68,16 +72,28 @@ class Config(metaclass=Singleton):
def update_send_cache(self):
res = {target_type: defaultdict(list) for target_type in supported_target_type}
cat_res = {target_type: defaultdict(lambda: defaultdict(list)) for target_type in supported_target_type}
tag_res = {target_type: defaultdict(lambda: defaultdict(list)) for target_type in supported_target_type}
# res = {target_type: defaultdict(lambda: defaultdict(list)) for target_type in supported_target_type}
for user in self.user_target.all():
for sub in user.get('subs', []):
if not sub.get('target_type') in supported_target_type:
continue
res[sub['target_type']][sub['target']].append({"user": user['user'], "user_type": user['user_type']})
res[sub['target_type']][sub['target']].append(User(user['user'], user['user_type']))
cat_res[sub['target_type']][sub['target']]['{}-{}'.format(user['user_type'], user['user'])] = sub['cats']
tag_res[sub['target_type']][sub['target']]['{}-{}'.format(user['user_type'], user['user'])] = sub['tags']
self.target_user_cache = res
self.target_user_cat_cache = cat_res
self.target_user_tag_cache = tag_res
for target_type in self.target_user_cache:
self.target_list[target_type] = list(self.target_user_cache[target_type].keys())
def get_sub_category(self, target_type, target, user_type, user):
return self.target_user_cat_cache[target_type][target]['{}-{}'.format(user_type, user)]
def get_sub_tags(self, target_type, target, user_type, user):
return self.target_user_tag_cache[target_type][target]['{}-{}'.format(user_type, user)]
def get_next_target(self, target_type):
# FIXME 插入或删除target后对队列的影响但是并不是大问题
if not self.target_list[target_type]:
@ -92,7 +108,19 @@ def start_up():
if not (search_res := config.kv_config.search(Query().name=="version")):
config.kv_config.insert({"name": "version", "value": config.migrate_version})
elif search_res[0].get("value") < config.migrate_version:
pass
query = Query()
version_query = (query.name == 'version')
cur_version = search_res[0].get("value")
if cur_version == 1:
cur_version = 2
for user_conf in config.user_target.all():
conf_id = user_conf.doc_id
subs = user_conf['subs']
for sub in subs:
sub['cats'] = []
sub['tags'] = []
config.user_target.update({'subs': subs}, doc_ids=[conf_id])
config.kv_config.update({"value": config.migrate_version}, version_query)
# do migration
config.update_send_cache()

View File

@ -1,29 +1,102 @@
from nonebot.rule import to_me
from nonebot.typing import T_State
from nonebot.adapters.cqhttp import Bot, Event, GroupMessageEvent
from nonebot.permission import Permission
from nonebot.permission import Permission, SUPERUSER
from nonebot.adapters.cqhttp.permission import GROUP_ADMIN, GROUP_MEMBER, GROUP_OWNER
from nonebot import on_command
from .platform.utils import check_sub_target
from .platform import platform_manager
from .config import Config, NoSuchSubscribeException
from .utils import parse_text
from .send import send_msgs
add_sub = on_command("添加订阅", rule=to_me(), permission=GROUP_ADMIN | GROUP_OWNER, priority=5)
add_sub = on_command("添加订阅", rule=to_me(), permission=GROUP_ADMIN | GROUP_OWNER | SUPERUSER, priority=5)
@add_sub.got('platform', '请输入想要订阅的平台,目前支持:{}'.format(', '.join(platform_manager.keys())))
@add_sub.got('id', '请输入订阅用户的id详情查阅https://github.com/felinae98/nonebot-hk-reporter')
@add_sub.handle()
async def _(bot: Bot, event: Event, state: T_State):
args = str(event.get_message()).strip().split()
if len(args) != 2:
await add_sub.finish("使用方法为: 添加订阅 平台 id")
async def add_sub_handle_id(bot: Bot, event: Event, state: T_State):
if 'id' in state:
return
target_type, target = args
if name := await check_sub_target(target_type, target):
config: Config = Config()
config.add_subscribe(event.group_id, "group", target, name, target_type)
await add_sub.finish("成功添加 {}".format(name))
await bot.send(event=event, message='请输入订阅用户的id详情查阅https://github.com/felinae98/nonebot-hk-reporter')
await add_sub.pause()
@add_sub.handle()
async def add_sub_parse_id(bot: Bot, event: Event, state: T_State):
if 'id' in state:
return
target = str(event.get_message()).strip()
name = await check_sub_target(state['platform'], target)
if not name:
await add_sub.reject('id输入错误')
state['id'] = target
state['name'] = name
async def add_sub_handle_cat(bot: Bot, event: Event, state: T_State):
if not platform_manager[state['platform']].categories:
return
if 'cats' in state:
return
await bot.send(event=event, message='请输入要订阅的类别,以空格分隔,支持的类别有:{}'.format(
','.join(list(platform_manager[state['platform']].categories.values()))
))
await add_sub.pause()
@add_sub.handle()
async def add_sub_parse_cat(bot: Bot, event: Event, state: T_State):
if not platform_manager[state['platform']].categories:
return
if 'cats' in state:
return
res = []
for cat in str(event.get_message()).strip().split():
if cat not in platform_manager[state['platform']].reverse_category:
await add_sub.reject('不支持 {}'.format(cat))
res.append(platform_manager[state['platform']].reverse_category[cat])
state['cat'] = res
@add_sub.handle()
async def add_sub_handle_tag(bot: Bot, event: Event, state: T_State):
if not platform_manager[state['platform']].enable_tag:
return
if 'tags' in state:
return
await bot.send(event=event, message='请输入要订阅的tag订阅所有tag输入"全部标签"')
await add_sub.pause()
@add_sub.handle()
async def add_sub_parse_tag(bot: Bot, event: Event, state: T_State):
if not platform_manager[state['platform']].enable_tag:
return
if 'tags' in state:
return
if str(event.get_message()).strip() == '全部标签':
state['tags'] = []
else:
await add_sub.finish("平台或者id不存在")
state['tags'] = str(event.get_message()).strip().split()
@add_sub.handle()
async def add_sub_process(bot: Bot, event: Event, state: T_State):
config = Config()
config.add_subscribe(event.group_id, user_type='group', target=state['id'],
target_name=state['name'], target_type=state['platform'],
cats=state.get('cats', []), tags=state.get('tags', []))
await add_sub.finish('添加 {} 成功'.format(state['name']))
# @add_sub.handle()
# async def _(bot: Bot, event: Event, state: T_State):
# args = str(event.get_message()).strip().split()
# if len(args) != 2:
# await add_sub.finish("使用方法为: 添加订阅 平台 id")
# return
# target_type, target = args
# if name := await check_sub_target(target_type, target):
# config: Config = Config()
# config.add_subscribe(event.group_id, "group", target, name, target_type)
# await add_sub.finish("成功添加 {}".format(name))
# else:
# await add_sub.finish("平台或者id不存在")
query_sub = on_command("查询订阅", rule=to_me(), priority=5)
@query_sub.handle()

View File

@ -2,3 +2,5 @@ from .bilibili import Bilibili
from .rss import Rss
from .weibo import Weibo
from .utils import check_sub_target
from .platform import PlatformNoTarget
from .utils import platform_manager

View File

@ -1,3 +1,6 @@
from typing import Any, Optional
from ..types import Category, RawPost, Tag, Target
from ..utils import Singleton
from ..post import Post
from collections import defaultdict
@ -5,8 +8,9 @@ from nonebot import logger
import httpx
import json
import time
from .platform import Platform, CategoryNotSupport
class Bilibili(metaclass=Singleton):
class Bilibili_(metaclass=Singleton):
def __init__(self):
self.exists_posts = defaultdict(set)
@ -37,7 +41,6 @@ class Bilibili(metaclass=Singleton):
res.remove(None)
return res
def parse(self, card, target) -> Post:
card_content = json.loads(card['card'])
dynamic_id = card['desc']['dynamic_id']
@ -89,3 +92,86 @@ async def get_user_info(mid):
if res_data['code']:
return None
return res_data['data']['name']
class Bilibili(Platform):
categories = {
1: "一般动态",
2: "专栏文章",
3: "视频",
4: "纯文字",
# 5: "短视频"
}
platform_name = 'bilibili'
enable_tag = False
@staticmethod
async def get_account_name(target: Target) -> Optional[str]:
async with httpx.AsyncClient() as client:
res = await client.get('https://api.bilibili.com/x/space/acc/info', params={'mid': target})
res_data = json.loads(res.text)
if res_data['code']:
return None
return res_data['data']['name']
async def get_sub_list(self, target: Target) -> list[RawPost]:
async with httpx.AsyncClient() as client:
params = {'host_uid': target, 'offset': 0, 'need_top': 0}
res = await client.get('https://api.vc.bilibili.com/dynamic_svr/v1/dynamic_svr/space_history', params=params, timeout=4.0)
res_dict = json.loads(res.text)
if res_dict['code'] == 0:
return res_dict['data']['cards']
else:
return []
def get_id(self, post: RawPost) -> Any:
return post['desc']['dynamic_id']
def get_date(self, post: RawPost) -> int:
return post['desc']['timestamp']
def get_category(self, post: RawPost) -> Category:
post_type = post['desc']['type']
if post_type == 2:
return Category(1)
elif post_type == 64:
return Category(2)
elif post_type == 8:
return Category(3)
elif post_type == 4:
return Category(4)
elif post_type == 1:
# 转发
raise CategoryNotSupport()
raise CategoryNotSupport()
def get_tags(self, raw_post: RawPost) -> list[Tag]:
return []
async def parse(self, raw_post: RawPost) -> Post:
card_content = json.loads(raw_post['card'])
post_type = self.get_category(raw_post)
if post_type == 1:
# 一般动态
text = card_content['item']['description']
url = 'https://t.bilibili.com/{}'.format(raw_post['desc']['dynamic_id'])
pic = [img['img_src'] for img in card_content['item']['pictures']]
elif post_type == 2:
# 专栏文章
text = '{} {}'.format(card_content['title'], card_content['summary'])
url = 'https://www.bilibili.com/read/cv{}'.format(raw_post['desc']['rid'])
pic = card_content['image_urls']
elif post_type == 3:
# 视频
text = card_content['dynamic']
url = 'https://www.bilibili.com/video/{}'.format(raw_post['desc']['bvid'])
pic = [card_content['pic']]
elif post_type == 4:
# 纯文字
text = card_content['item']['content']
url = 'https://t.bilibili.com/{}'.format(raw_post['desc']['dynamic_id'])
pic = []
else:
raise CategoryNotSupport(post_type)
return Post('bilibili', text, url, pic)

View File

@ -0,0 +1,275 @@
import time
from collections import defaultdict
from typing import Any, Literal, Optional, Protocol
from nonebot import logger
from ..config import Config
from ..post import Post
from ..utils import Singleton
from ..types import Category, Tag, RawPost, Target, User
class CategoryNotSupport(Exception):
"raise in get_category, when post category is not supported"
pass
class PlatformProto(metaclass=Singleton):
categories: dict[Category, str]
reverse_category: dict[str, Category]
has_target: bool
platform_name: str
enable_tag: bool
async def fetch_new_post(self, target: Target, users: list[User]) -> list[tuple[User, list[Post]]]:
...
@staticmethod
async def get_account_name(target: Target) -> Optional[str]:
...
class Platform(PlatformProto):
"platform with target(account), like weibo, bilibili"
categories: dict[Category, str]
has_target: bool = True
platform_name: str
enable_tag: bool
def __init__(self):
self.exists_posts = defaultdict(set)
self.inited = dict()
self.reverse_category = {}
self.cache: dict[Any, Post] = {}
for key, val in self.categories.items():
self.reverse_category[val] = key
@staticmethod
async def get_account_name(target: Target) -> Optional[str]:
"Given a tareget, return the username(name) of the target"
raise NotImplementedError()
async def get_sub_list(self, target: Target) -> list[RawPost]:
"Get post list of the given target"
raise NotImplementedError()
def get_id(self, post: RawPost) -> Any:
"Get post id of given RawPost"
raise NotImplementedError()
def get_date(self, post: RawPost) -> Optional[int]:
"Get post timestamp and return, return None if can't get the time"
raise NotImplementedError()
def get_category(self, post: RawPost) -> Optional[Category]:
"Return category of given Rawpost"
raise NotImplementedError()
def get_tags(self, raw_post: RawPost) -> Optional[list[Tag]]:
"Return Tag list of given RawPost"
raise NotImplementedError()
async def parse(self, raw_post: RawPost) -> Post:
"parse RawPost into post"
raise NotImplementedError()
def filter_platform_custom(self, post: RawPost) -> bool:
raise NotImplementedError()
async def _parse_with_cache(self, post: RawPost) -> Post:
post_id = self.get_id(post)
if post_id not in self.cache:
self.cache[post_id] = await self.parse(post)
return self.cache[post_id]
async def filter_common(self, target: Target, raw_post_list: list[RawPost]) -> list[RawPost]:
if False and not self.inited.get(target, False):
# target not init
for raw_post in raw_post_list:
post_id = self.get_id(raw_post)
self.exists_posts[target].add(post_id)
logger.info('init {}-{} with {}'.format(self.platform_name, target, self.exists_posts[target]))
self.inited[target] = True
return []
res: list[RawPost] = []
for raw_post in raw_post_list:
post_id = self.get_id(raw_post)
if post_id in self.exists_posts[target]:
continue
# if (post_time := self.get_date(raw_post)) and time.time() - post_time > 2 * 60 * 60:
# continue
try:
if not self.filter_platform_custom(raw_post):
continue
except NotImplementedError:
pass
try:
self.get_category(raw_post)
except CategoryNotSupport:
continue
except NotImplementedError:
pass
res.append(raw_post)
self.exists_posts[target].add(post_id)
return res
async def filter_user_custom(self, raw_post_list: list[RawPost], cats: list[Category], tags: list[Tag]) -> list[RawPost]:
res: list[RawPost] = []
for raw_post in raw_post_list:
if self.categories:
cat = self.get_category(raw_post)
if cats and cat not in cats:
continue
if self.enable_tag and tags:
flag = False
post_tags = self.get_tags(raw_post)
for tag in post_tags:
if tag in tags:
flag = True
break
if not flag:
continue
res.append(raw_post)
return res
async def fetch_new_post(self, target: Target, users: list[User]) -> list[tuple[User, list[Post]]]:
config = Config()
post_list = await self.get_sub_list(target)
new_posts = await self.filter_common(target, post_list)
res: list[tuple[User, list[Post]]] = []
if not new_posts:
return []
else:
for post in new_posts:
logger.info('fetch new post from {} {}: {}'.format(self.platform_name, target, self.get_id(post)))
for user in users:
required_tags = config.get_sub_tags(self.platform_name, target, user.user_type, user.user) if self.enable_tag else []
cats = config.get_sub_category(self.platform_name, target, user.user_type, user.user)
user_raw_post = await self.filter_user_custom(new_posts, cats, required_tags)
user_post: list[Post] = []
for raw_post in user_raw_post:
user_post.append(await self._parse_with_cache(raw_post))
res.append((user, user_post))
self.cache = {}
return res
class PlatformNoTarget(PlatformProto):
categories: dict[Category, str]
has_target = False
platform_name: str
enable_tag: bool
def __init__(self):
self.exists_posts = set()
self.inited = False
self.reverse_category = {}
self.cache: dict[Any, Post] = {}
for key, val in self.categories.items():
self.reverse_category[val] = key
@staticmethod
async def get_account_name() -> Optional[str]:
"return the username(name) of the target"
raise NotImplementedError()
async def get_sub_list(self) -> list[RawPost]:
"Get post list of the given target"
raise NotImplementedError()
def get_id(self, post: RawPost) -> Any:
"Get post id of given RawPost"
raise NotImplementedError()
def get_date(self, post: RawPost) -> Optional[int]:
"Get post timestamp and return, return None if can't get the time"
raise NotImplementedError()
def get_category(self, post: RawPost) -> Optional[Category]:
"Return category of given Rawpost"
raise NotImplementedError()
def get_tags(self, raw_post: RawPost) -> Optional[list[Tag]]:
"Return Tag list of given RawPost"
raise NotImplementedError()
async def parse(self, raw_post: RawPost) -> Post:
"parse RawPost into post"
raise NotImplementedError()
def filter_platform_custom(self, post: RawPost) -> bool:
raise NotImplementedError()
async def _parse_with_cache(self, post: RawPost) -> Post:
post_id = self.get_id(post)
if post_id not in self.cache:
self.cache[post_id] = await self.parse(post)
return self.cache[post_id]
async def filter_common(self, raw_post_list: list[RawPost]) -> list[RawPost]:
if not self.inited:
# target not init
for raw_post in raw_post_list:
post_id = self.get_id(raw_post)
self.exists_posts.add(post_id)
logger.info('init {} with {}'.format(self.platform_name, self.exists_posts))
self.inited = True
return []
res: list[RawPost] = []
for raw_post in raw_post_list:
post_id = self.get_id(raw_post)
if post_id in self.exists_posts:
continue
if (post_time := self.get_date(raw_post)) and time.time() - post_time > 2 * 60 * 60:
continue
try:
if not self.filter_platform_custom(raw_post):
continue
except NotImplementedError:
pass
try:
self.get_category(raw_post)
except CategoryNotSupport:
continue
res.append(raw_post)
self.exists_posts.add(post_id)
return res
async def filter_user_custom(self, raw_post_list: list[RawPost], cats: list[Category], tags: list[Tag]) -> list[RawPost]:
res: list[RawPost] = []
for raw_post in raw_post_list:
if self.categories:
cat = self.get_category(raw_post)
if cats and cat not in cats:
continue
if self.enable_tag and tags:
flag = False
post_tags = self.get_tags(raw_post)
for tag in post_tags:
if tag in tags:
flag = True
break
if not flag:
continue
res.append(raw_post)
return res
async def fetch_new_post(self, users: list[User]) -> list[tuple[User, list[Post]]]:
config = Config()
post_list = await self.get_sub_list()
new_posts = await self.filter_common(post_list)
res: list[tuple[User, list[Post]]] = []
if not new_posts:
return []
else:
for post in new_posts:
logger.info('fetch new post from {}: {}'.format(self.platform_name, self.get_id(post)))
for user in users:
required_tags = config.get_sub_tags(self.platform_name, 'default', user.user_type, user.user) if self.enable_tag else []
cats = config.get_sub_category(self.platform_name, 'default', user.user_type, user.user)
user_raw_post = await self.filter_user_custom(new_posts, cats, required_tags)
user_post: list[Post] = []
for raw_post in user_raw_post:
user_post.append(await self._parse_with_cache(raw_post))
res.append((user, user_post))
self.cache = {}
return res

View File

@ -1,5 +1,8 @@
from typing import Any, Optional
from ..types import RawPost, Target
from ..utils import Singleton
from ..post import Post
from .platform import Platform
from collections import defaultdict
from bs4 import BeautifulSoup as bs
from nonebot import logger
@ -8,56 +11,33 @@ import httpx
import time
import calendar
async def get_rss_raw_data(url) -> str:
async with httpx.AsyncClient() as client:
res = await client.get(url, timeout=10.0)
return res.text
class Rss(Platform):
async def get_rss_info(url) -> str:
data = await get_rss_raw_data(url)
feed = feedparser.parse(data)
return feed.feed.title
categories = {}
enable_tag = False
platform_name = 'rss'
class Rss(metaclass=Singleton):
@staticmethod
async def get_account_name(target: Target) -> Optional[str]:
async with httpx.AsyncClient() as client:
res = await client.get(target, timeout=10.0)
feed = feedparser.parse(res.text)
return feed['feed']['title']
def __init__(self):
self.exists_posts = defaultdict(set)
self.inited = defaultdict(lambda: False)
def get_date(self, post: RawPost) -> int:
return calendar.timegm(post.published_parsed)
def filter(self, data, target, init=False) -> list[Post]:
feed = feedparser.parse(data)
entries = feed.entries
res = []
for entry in entries:
entry_id = entry.id
if init:
self.exists_posts[target].add(entry_id)
continue
if entry_id in self.exists_posts[target]:
continue
# if time.time() - calendar.timegm(entry.published_parsed) > 2 * 60 * 60:
# continue
res.append(self.parse(entry, target))
return res
def get_id(self, post: RawPost) -> Any:
return post.id
def parse(self, entry, target) -> Post:
soup = bs(entry.description, 'html.parser')
async def get_sub_list(self, target: Target) -> list[RawPost]:
async with httpx.AsyncClient() as client:
res = await client.get(target, timeout=10.0)
feed = feedparser.parse(res)
return feed.entries
async def parse(self, raw_post: RawPost) -> Post:
soup = bs(raw_post.description, 'html.parser')
text = soup.text
pics = list(map(lambda x: x.attrs['src'], soup('img')))
self.exists_posts[target].add(entry.id)
return Post('rss', text, entry.link, pics)
async def fetch_new_post(self, target) -> list[Post]:
try:
raw_data = await get_rss_raw_data(target)
if self.inited[target]:
return self.filter(raw_data, target)
else:
self.filter(raw_data, target, True)
logger.info('rss init {} success'.format(target))
logger.info('post list: {}'.format(self.exists_posts[target]))
self.inited[target] = True
return []
except httpx.RequestError as err:
logger.warning("network connection error: {}, url: {}".format(type(err), err.request.url))
return []
return Post('rss', text, raw_post.link, pics)

View File

@ -1,55 +1,39 @@
import time
import asyncio
import nonebot
from nonebot import logger
from collections import defaultdict
from .weibo import Weibo, get_user_info as weibo_user_info
from .bilibili import Bilibili, get_user_info as bilibili_user_info
from .rss import Rss, get_rss_info as rss_info
from typing import Type
from .weibo import Weibo
from .bilibili import Bilibili
from .rss import Rss
from .platform import PlatformProto
from ..config import Config
from ..post import Post
from ..send import send_msgs
async def check_sub_target(target_type, target):
if target_type == 'weibo':
return await weibo_user_info(target)
elif target_type == 'bilibili':
return await bilibili_user_info(target)
elif target_type == 'rss':
return await rss_info(target)
else:
return None
return platform_manager[target_type].get_account_name(target)
scheduler_last_run = defaultdict(lambda: 0)
async def scheduler(fun, target_type):
platform_interval = {
'weibo': 3
}
if (wait_time := time.time() - scheduler_last_run[target_type]) < platform_interval[target_type]:
await asyncio.sleep(wait_time)
await fun()
platform_manager: dict[str, PlatformProto] = {
'bilibili': Bilibili(),
'weibo': Weibo(),
'rss': Rss()
}
async def fetch_and_send(target_type: str):
config = Config()
platform_manager = {
'bilibili': Bilibili(),
'weibo': Weibo(),
'rss': Rss()
}
target = config.get_next_target(target_type)
if not target:
return
logger.debug('try to fecth new posts from {}, target: {}'.format(target_type, target))
new_posts: list[Post] = await platform_manager[target_type].fetch_new_post(target)
send_list = config.target_user_cache[target_type][target]
bot_list = list(nonebot.get_bots().values())
bot = bot_list[0] if bot_list else None
for new_post in new_posts:
logger.warning('get new {} dynamic: {}'.format(target_type, new_post.url))
logger.warning(new_post)
if not bot:
logger.warning('no bot connected')
else:
for to_send in send_list:
send_msgs(bot, to_send['user'], to_send['user_type'], await new_post.generate_messages())
if target_type == 'rss':
to_send = await platform_manager[target_type].fetch_new_post(target, send_list)
for user, send_list in to_send:
for send_post in send_list:
logger.debug('send to {}: {}'.format(user, send_post))
if not bot:
logger.warning('no bot connected')
else:
send_msgs(bot, user.user, user.user_type, await send_post.generate_messages())

View File

@ -5,11 +5,14 @@ from collections import defaultdict
from bs4 import BeautifulSoup as bs
from datetime import datetime
from nonebot import logger
from typing import Any, Optional
from ..utils import Singleton
from ..post import Post
from ..types import *
from .platform import Platform
class Weibo(metaclass=Singleton):
class Weibo_(metaclass=Singleton):
def __init__(self):
self.exists_posts = defaultdict(set)
@ -72,3 +75,63 @@ async def get_user_info(id):
return res_dict['data']['userInfo']['screen_name']
else:
return None
class Weibo(Platform):
categories = {
1: '转发',
2: '视频',
3: '图文'
}
enable_tag = False
platform_name = 'weibo'
@staticmethod
async def get_account_name(target: Target) -> Optional[str]:
async with httpx.AsyncClient() as client:
param = {'containerid': '100505' + target}
res = await client.get('https://m.weibo.cn/api/container/getIndex', params=param)
res_dict = json.loads(res.text)
if res_dict.get('ok') == 1:
return res_dict['data']['userInfo']['screen_name']
else:
return None
async def get_sub_list(self, target: Target) -> list[RawPost]:
async with httpx.AsyncClient() as client:
params = { 'containerid': '107603' + target}
res = await client.get('https://m.weibo.cn/api/container/getIndex?', params=params, timeout=4.0)
res_data = json.loads(res.text)
if not res_data['ok']:
return []
return res_data['data']['cards']
def get_id(self, post: RawPost) -> Any:
return post['mblog']['id']
def filter_platform_custom(self, raw_post: RawPost) -> bool:
return raw_post['card_type'] == 9
def get_date(self, raw_post: RawPost) -> float:
created_time = datetime.strptime(raw_post['mblog']['created_at'], '%a %b %d %H:%M:%S %z %Y')
return created_time.timestamp()
def get_tags(self, raw_post: RawPost) -> Optional[list[Tag]]:
"Return Tag list of given RawPost"
return None
def get_category(self, raw_post: RawPost) -> Category:
if raw_post['mblog'].get('retweeted_status'):
return Category(1)
elif raw_post['mblog'].get('page_info') and raw_post['mblog']['page_info'].get('type') == 'video':
return Category(2)
else:
return Category(3)
async def parse(self, raw_post: RawPost) -> Post:
info = raw_post['mblog']
parsed_text = bs(info['text'], 'html.parser').text
pic_urls = [img['large']['url'] for img in info.get('pics', [])]
detail_url = 'https://weibo.com/{}/{}'.format(info['user']['id'], info['bid'])
# return parsed_text, detail_url, pic_urls
return Post('weibo', parsed_text, detail_url, pic_urls)

View File

@ -1,4 +1,5 @@
from pydantic import BaseSettings
import nonebot
class PlugConfig(BaseSettings):
@ -8,3 +9,6 @@ class PlugConfig(BaseSettings):
class Config:
extra = 'ignore'
global_config = nonebot.get_driver().config
plugin_config = PlugConfig(**global_config.dict())

View File

@ -1,5 +1,5 @@
from dataclasses import dataclass, field
from . import plugin_config
from .plugin_config import plugin_config
from .utils import parse_text
@dataclass

View File

@ -0,0 +1,12 @@
from typing import Any, NewType
from dataclasses import dataclass
RawPost = NewType('RawPost', Any)
Target = NewType('Target', str)
Category = NewType('Category', int)
Tag = NewType('Tag', str)
@dataclass
class User:
user: str
user_type: str

View File

@ -1,5 +1,6 @@
import os
import asyncio
from typing import Optional
import nonebot
from nonebot import logger
import base64
@ -7,7 +8,7 @@ from pyppeteer import launch
from html import escape
from hashlib import sha256
from . import plugin_config
from .plugin_config import plugin_config
class Singleton(type):
_instances = {}
@ -23,7 +24,7 @@ class Render(metaclass=Singleton):
def __init__(self):
self.lock = asyncio.Lock()
async def render(self, url: str, viewport: dict = None, target: str = None) -> str:
async def render(self, url: str, viewport: Optional[dict] = None, target: Optional[str] = None) -> str:
async with self.lock:
if plugin_config.hk_reporter_use_local:
browser = await launch(executablePath='/usr/bin/chromium', args=['--no-sandbox'])
@ -32,7 +33,7 @@ class Render(metaclass=Singleton):
page = await browser.newPage()
await page.goto(url)
if viewport:
await page.setViewport(target)
await page.setViewport(viewport)
if target:
target_ele = await page.querySelector(target)
data = await target_ele.screenshot(type='jpeg', encoding='base64')
@ -40,7 +41,7 @@ class Render(metaclass=Singleton):
data = await page.screenshot(type='jpeg', encoding='base64')
await page.close()
await browser.close()
return data
return str(data)
async def text_to_pic(self, text: str) -> str:
hash_text = sha256(text.encode()).hexdigest()[:20]