move files

This commit is contained in:
felinae98
2021-11-17 15:59:19 +08:00
parent 7f46d87c3f
commit e9de860058
17 changed files with 0 additions and 0 deletions
@@ -0,0 +1,28 @@
from collections import defaultdict
from .platform import Platform, NoTargetGroup
from pkgutil import iter_modules
from pathlib import Path
from importlib import import_module
_package_dir = str(Path(__file__).resolve().parent)
for (_, module_name, _) in iter_modules([_package_dir]):
import_module(f'{__name__}.{module_name}')
async def check_sub_target(target_type, target):
return await platform_manager[target_type].get_target_name(target)
_platform_list = defaultdict(list)
for _platform in Platform.registory:
if not _platform.enabled:
continue
_platform_list[_platform.platform_name].append(_platform)
platform_manager: dict[str, Platform] = dict()
for name, platform_list in _platform_list.items():
if len(platform_list) == 1:
platform_manager[name] = platform_list[0]()
else:
platform_manager[name] = NoTargetGroup([_platform() for _platform in platform_list])
@@ -0,0 +1,147 @@
import json
from typing import Any
from bs4 import BeautifulSoup as bs
import httpx
from ..post import Post
from ..types import Category, RawPost, Target
from ..utils import Render
from .platform import CategoryNotSupport, NewMessage, NoTargetMixin, StatusChange
class Arknights(NewMessage, NoTargetMixin):
categories = {1: '游戏公告'}
platform_name = 'arknights'
name = '明日方舟游戏信息'
enable_tag = False
enabled = True
is_common = False
schedule_type = 'interval'
schedule_kw = {'seconds': 30}
async def get_target_name(self, _: Target) -> str:
return '明日方舟游戏信息'
async def get_sub_list(self, _) -> list[RawPost]:
async with httpx.AsyncClient() as client:
raw_data = await client.get('https://ak-conf.hypergryph.com/config/prod/announce_meta/IOS/announcement.meta.json')
return json.loads(raw_data.text)['announceList']
def get_id(self, post: RawPost) -> Any:
return post['announceId']
def get_date(self, _: RawPost) -> None:
return None
def get_category(self, _) -> Category:
return Category(1)
async def parse(self, raw_post: RawPost) -> Post:
announce_url = raw_post['webUrl']
text = ''
async with httpx.AsyncClient() as client:
raw_html = await client.get(announce_url)
soup = bs(raw_html, 'html.parser')
pics = []
if soup.find("div", class_="standerd-container"):
# 图文
render = Render()
viewport = {'width': 320, 'height': 6400, 'deviceScaleFactor': 3}
pic_data = await render.render(announce_url, viewport=viewport, target='div.main')
if pic_data:
pics.append(pic_data)
else:
text = '图片渲染失败'
elif (pic := soup.find('img', class_='banner-image')):
pics.append(pic['src'])
else:
raise CategoryNotSupport()
return Post('arknights', text=text, url='', target_name="明日方舟游戏内公告", pics=pics, compress=True, override_use_pic=False)
class AkVersion(NoTargetMixin, StatusChange):
categories = {2: '更新信息'}
platform_name = 'arknights'
name = '明日方舟游戏信息'
enable_tag = False
enabled = True
is_common = False
schedule_type = 'interval'
schedule_kw = {'seconds': 30}
async def get_target_name(self, _: Target) -> str:
return '明日方舟游戏信息'
async def get_status(self, _):
async with httpx.AsyncClient() as client:
res_ver = await client.get('https://ak-conf.hypergryph.com/config/prod/official/IOS/version')
res_preanounce = await client.get('https://ak-conf.hypergryph.com/config/prod/announce_meta/IOS/preannouncement.meta.json')
res = res_ver.json()
res.update(res_preanounce.json())
return res
def compare_status(self, _, old_status, new_status):
res = []
if old_status.get('preAnnounceType') == 2 and new_status.get('preAnnounceType') == 0:
res.append(Post('arknights',
text='登录界面维护公告上线(大概是开始维护了)',
target_name='明日方舟更新信息'))
elif old_status.get('preAnnounceType') == 0 and new_status.get('preAnnounceType') == 2:
res.append(Post('arknights',
text='登录界面维护公告下线(大概是开服了,冲!)',
target_name='明日方舟更新信息'))
if old_status.get('clientVersion') != new_status.get('clientVersion'):
res.append(Post('arknights', text='游戏本体更新(大更新)', target_name='明日方舟更新信息'))
if old_status.get('resVersion') != new_status.get('resVersion'):
res.append(Post('arknights', text='游戏资源更新(小更新)', target_name='明日方舟更新信息'))
return res
def get_category(self, _):
return Category(2)
async def parse(self, raw_post):
return raw_post
class MonsterSiren(NewMessage, NoTargetMixin):
categories = {3: '塞壬唱片新闻'}
platform_name = 'arknights'
name = '明日方舟游戏信息'
enable_tag = False
enabled = True
is_common = False
schedule_type = 'interval'
schedule_kw = {'seconds': 30}
async def get_target_name(self, _: Target) -> str:
return '明日方舟游戏信息'
async def get_sub_list(self, _) -> list[RawPost]:
async with httpx.AsyncClient() as client:
raw_data = await client.get('https://monster-siren.hypergryph.com/api/news')
return raw_data.json()['data']['list']
def get_id(self, post: RawPost) -> Any:
return post['cid']
def get_date(self, _) -> None:
return None
def get_category(self, _) -> Category:
return Category(3)
async def parse(self, raw_post: RawPost) -> Post:
url = f'https://monster-siren.hypergryph.com/info/{raw_post["cid"]}'
async with httpx.AsyncClient() as client:
res = await client.get(f'https://monster-siren.hypergryph.com/api/news/{raw_post["cid"]}')
raw_data = res.json()
content = raw_data['data']['content']
content = content.replace('</p>', '</p>\n')
soup = bs(content, 'html.parser')
imgs = list(map(lambda x: x['src'], soup('img')))
text = f'{raw_post["title"]}\n{soup.text.strip()}'
return Post('monster-siren', text=text, pics=imgs,
url=url, target_name="塞壬唱片新闻", compress=True,
override_use_pic=False)
@@ -0,0 +1,126 @@
import json
from typing import Any, Optional
import httpx
from ..post import Post
from ..types import Category, RawPost, Tag, Target
from .platform import NewMessage, TargetMixin, CategoryNotSupport
class Bilibili(NewMessage, TargetMixin):
categories = {
1: "一般动态",
2: "专栏文章",
3: "视频",
4: "纯文字",
5: "转发"
# 5: "短视频"
}
platform_name = 'bilibili'
enable_tag = True
enabled = True
is_common = True
schedule_type = 'interval'
schedule_kw = {'seconds': 10}
name = 'B站'
async def get_target_name(self, target: Target) -> Optional[str]:
async with httpx.AsyncClient() as client:
res = await client.get('https://api.bilibili.com/x/space/acc/info', params={'mid': target})
res_data = json.loads(res.text)
if res_data['code']:
return None
return res_data['data']['name']
async def get_sub_list(self, target: Target) -> list[RawPost]:
async with httpx.AsyncClient() as client:
params = {'host_uid': target, 'offset': 0, 'need_top': 0}
res = await client.get('https://api.vc.bilibili.com/dynamic_svr/v1/dynamic_svr/space_history', params=params, timeout=4.0)
res_dict = json.loads(res.text)
if res_dict['code'] == 0:
return res_dict['data']['cards']
else:
return []
def get_id(self, post: RawPost) -> Any:
return post['desc']['dynamic_id']
def get_date(self, post: RawPost) -> int:
return post['desc']['timestamp']
def _do_get_category(self, post_type: int) -> Category:
if post_type == 2:
return Category(1)
elif post_type == 64:
return Category(2)
elif post_type == 8:
return Category(3)
elif post_type == 4:
return Category(4)
elif post_type == 1:
# 转发
return Category(5)
raise CategoryNotSupport()
def get_category(self, post: RawPost) -> Category:
post_type = post['desc']['type']
return self._do_get_category(post_type)
def get_tags(self, raw_post: RawPost) -> list[Tag]:
return [*map(lambda tp: tp['topic_name'], raw_post['display']['topic_info']['topic_details'])]
def _get_info(self, post_type: Category, card) -> tuple[str, list]:
if post_type == 1:
# 一般动态
text = card['item']['description']
pic = [img['img_src'] for img in card['item']['pictures']]
elif post_type == 2:
# 专栏文章
text = '{} {}'.format(card['title'], card['summary'])
pic = card['image_urls']
elif post_type == 3:
# 视频
text = card['dynamic']
pic = [card['pic']]
elif post_type == 4:
# 纯文字
text = card['item']['content']
pic = []
else:
raise CategoryNotSupport()
return text, pic
async def parse(self, raw_post: RawPost) -> Post:
card_content = json.loads(raw_post['card'])
post_type = self.get_category(raw_post)
target_name = raw_post['desc']['user_profile']['info']['uname']
if post_type >= 1 and post_type < 5:
url = ''
if post_type == 1:
# 一般动态
url = 'https://t.bilibili.com/{}'.format(raw_post['desc']['dynamic_id_str'])
elif post_type == 2:
# 专栏文章
url = 'https://www.bilibili.com/read/cv{}'.format(raw_post['desc']['rid'])
elif post_type == 3:
# 视频
url = 'https://www.bilibili.com/video/{}'.format(raw_post['desc']['bvid'])
elif post_type == 4:
# 纯文字
url = 'https://t.bilibili.com/{}'.format(raw_post['desc']['dynamic_id_str'])
text, pic = self._get_info(post_type, card_content)
elif post_type == 5:
# 转发
url = 'https://t.bilibili.com/{}'.format(raw_post['desc']['dynamic_id_str'])
text = card_content['item']['content']
orig_type = card_content['item']['orig_type']
orig = json.loads(card_content['origin'])
orig_text, _ = self._get_info(self._do_get_category(orig_type), orig)
text += '\n--------------\n'
text += orig_text
pic = []
else:
raise CategoryNotSupport(post_type)
return Post('bilibili', text=text, url=url, pics=pic, target_name=target_name)
@@ -0,0 +1,53 @@
from typing import Any, Optional
import httpx
from ..post import Post
from ..types import RawPost, Target
from .platform import TargetMixin, NewMessage
class NcmArtist(TargetMixin, NewMessage):
categories = {}
platform_name = 'ncm-artist'
enable_tag = False
enabled = True
is_common = True
schedule_type = 'interval'
schedule_kw = {'minutes': 10}
name = "网易云-歌手"
async def get_target_name(self, target: Target) -> Optional[str]:
async with httpx.AsyncClient() as client:
res = await client.get(
"https://music.163.com/api/artist/albums/{}".format(target),
headers={'Referer': 'https://music.163.com/'}
)
res_data = res.json()
if res_data['code'] != 200:
return
return res_data['artist']['name']
async def get_sub_list(self, target: Target) -> list[RawPost]:
async with httpx.AsyncClient() as client:
res = await client.get(
"https://music.163.com/api/artist/albums/{}".format(target),
headers={'Referer': 'https://music.163.com/'}
)
res_data = res.json()
if res_data['code'] != 200:
return []
else:
return res_data['hotAlbums']
def get_id(self, post: RawPost) -> Any:
return post['id']
def get_date(self, post: RawPost) -> int:
return post['publishTime'] // 1000
async def parse(self, raw_post: RawPost) -> Post:
text = '新专辑发布:{}'.format(raw_post['name'])
target_name = raw_post['artist']['name']
pics = [raw_post['picUrl']]
url = "https://music.163.com/#/album?id={}".format(raw_post['id'])
return Post('ncm-artist', text=text, url=url, pics=pics, target_name=target_name)
@@ -0,0 +1,384 @@
from abc import abstractmethod, ABC
from collections import defaultdict
from dataclasses import dataclass
from functools import reduce
import time
from typing import Any, Collection, Optional, Literal
import httpx
from nonebot import logger
from ..plugin_config import plugin_config
from ..post import Post
from ..types import Category, RawPost, Tag, Target, User, UserSubInfo
class CategoryNotSupport(Exception):
"raise in get_category, when post category is not supported"
class RegistryMeta(type):
def __new__(cls, name, bases, namespace, **kwargs):
return super().__new__(cls, name, bases, namespace)
def __init__(cls, name, bases, namespace, **kwargs):
if kwargs.get('base'):
# this is the base class
cls.registory = []
elif not kwargs.get('abstract'):
# this is the subclass
cls.registory.append(cls)
super().__init__(name, bases, namespace, **kwargs)
class RegistryABCMeta(RegistryMeta, ABC):
...
class StorageMixinProto(metaclass=RegistryABCMeta, abstract=True):
has_target: bool
@abstractmethod
def get_stored_data(self, target: Target) -> Any:
...
@abstractmethod
def set_stored_data(self, target: Target, data: Any):
...
class TargetMixin(StorageMixinProto, abstract=True):
has_target = True
def __init__(self):
super().__init__()
self.store: dict[Target, Any] = dict()
def get_stored_data(self, target: Target) -> Any:
return self.store.get(target)
def set_stored_data(self, target: Target, data: Any):
self.store[target] = data
class NoTargetMixin(StorageMixinProto, abstract=True):
has_target = False
def __init__(self):
super().__init__()
self.store = None
def get_stored_data(self, _: Target) -> Any:
return self.store
def set_stored_data(self, _: Target, data: Any):
self.store = data
class PlatformNameMixin(metaclass=RegistryABCMeta, abstract=True):
platform_name: str
class CategoryMixin(metaclass=RegistryABCMeta, abstract=True):
@abstractmethod
def get_category(self, post: RawPost) -> Optional[Category]:
"Return category of given Rawpost"
raise NotImplementedError()
class ParsePostMixin(metaclass=RegistryABCMeta, abstract=True):
@abstractmethod
async def parse(self, raw_post: RawPost) -> Post:
"parse RawPost into post"
...
class MessageProcessMixin(PlatformNameMixin, CategoryMixin, ParsePostMixin, abstract=True):
"General message process fetch, parse, filter progress"
def __init__(self):
super().__init__()
self.parse_cache: dict[Any, Post] = dict()
@abstractmethod
def get_id(self, post: RawPost) -> Any:
"Get post id of given RawPost"
async def _parse_with_cache(self, raw_post: RawPost) -> Post:
post_id = self.get_id(raw_post)
if post_id not in self.parse_cache:
retry_times = 3
while retry_times:
try:
self.parse_cache[post_id] = await self.parse(raw_post)
break
except Exception as err:
retry_times -= 1
if not retry_times:
raise err
return self.parse_cache[post_id]
@abstractmethod
async def get_sub_list(self, target: Target) -> list[RawPost]:
"Get post list of the given target"
@abstractmethod
def get_date(self, post: RawPost) -> Optional[int]:
"Get post timestamp and return, return None if can't get the time"
async def filter_common(self, raw_post_list: list[RawPost]) -> list[RawPost]:
res = []
for raw_post in raw_post_list:
# post_id = self.get_id(raw_post)
# if post_id in exists_posts_set:
# continue
if (post_time := self.get_date(raw_post)) and time.time() - post_time > 2 * 60 * 60 and \
plugin_config.hk_reporter_init_filter:
continue
try:
self.get_category(raw_post)
except CategoryNotSupport:
continue
except NotImplementedError:
pass
res.append(raw_post)
return res
class NewMessageProcessMixin(StorageMixinProto, MessageProcessMixin, abstract=True):
"General message process, fetch, parse, filter, and only returns the new Post"
@dataclass
class MessageStorage():
inited: bool
exists_posts: set[Any]
async def filter_common_with_diff(self, target: Target, raw_post_list: list[RawPost]) -> list[RawPost]:
filtered_post = await self.filter_common(raw_post_list)
store = self.get_stored_data(target) or self.MessageStorage(False, set())
res = []
if not store.inited and plugin_config.hk_reporter_init_filter:
# target not init
for raw_post in filtered_post:
post_id = self.get_id(raw_post)
store.exists_posts.add(post_id)
logger.info('init {}-{} with {}'.format(self.platform_name, target, store.exists_posts))
store.inited = True
else:
for raw_post in filtered_post:
post_id = self.get_id(raw_post)
if post_id in store.exists_posts:
continue
res.append(raw_post)
store.exists_posts.add(post_id)
self.set_stored_data(target, store)
return res
class UserCustomFilterMixin(CategoryMixin, ParsePostMixin, abstract=True):
categories: dict[Category, str]
enable_tag: bool
def __init__(self):
super().__init__()
self.reverse_category = {}
for key, val in self.categories.items():
self.reverse_category[val] = key
@abstractmethod
def get_tags(self, raw_post: RawPost) -> Optional[Collection[Tag]]:
"Return Tag list of given RawPost"
async def filter_user_custom(self, raw_post_list: list[RawPost], cats: list[Category], tags: list[Tag]) -> list[RawPost]:
res: list[RawPost] = []
for raw_post in raw_post_list:
if self.categories:
cat = self.get_category(raw_post)
if cats and cat not in cats:
continue
if self.enable_tag and tags:
flag = False
post_tags = self.get_tags(raw_post)
for tag in post_tags or []:
if tag in tags:
flag = True
break
if not flag:
continue
res.append(raw_post)
return res
async def dispatch_user_post(self, target: Target, new_posts: list[RawPost], users: list[UserSubInfo]) -> list[tuple[User, list[Post]]]:
res: list[tuple[User, list[Post]]] = []
for user, category_getter, tag_getter in users:
required_tags = tag_getter(target) if self.enable_tag else []
cats = category_getter(target)
user_raw_post = await self.filter_user_custom(new_posts, cats, required_tags)
user_post: list[Post] = []
for raw_post in user_raw_post:
if isinstance(self, MessageProcessMixin):
user_post.append(await self._parse_with_cache(raw_post))
else:
user_post.append(await self.parse(raw_post))
res.append((user, user_post))
return res
class Platform(PlatformNameMixin, UserCustomFilterMixin, base=True):
# schedule_interval: int
schedule_type: Literal['date', 'interval', 'cron']
schedule_kw: dict
is_common: bool
enabled: bool
name: str
@abstractmethod
async def get_target_name(self, target: Target) -> Optional[str]:
...
@abstractmethod
async def fetch_new_post(self, target: Target, users: list[UserSubInfo]) -> list[tuple[User, list[Post]]]:
...
class NewMessage(
Platform,
NewMessageProcessMixin,
UserCustomFilterMixin,
abstract=True
):
"Fetch a list of messages, filter the new messages, dispatch it to different users"
async def fetch_new_post(self, target: Target, users: list[UserSubInfo]) -> list[tuple[User, list[Post]]]:
try:
post_list = await self.get_sub_list(target)
new_posts = await self.filter_common_with_diff(target, post_list)
if not new_posts:
return []
else:
for post in new_posts:
logger.info('fetch new post from {} {}: {}'.format(
self.platform_name,
target if self.has_target else '-',
self.get_id(post)))
res = await self.dispatch_user_post(target, new_posts, users)
self.parse_cache = {}
return res
except httpx.RequestError as err:
logger.warning("network connection error: {}, url: {}".format(type(err), err.request.url))
return []
class StatusChange(
Platform,
StorageMixinProto,
UserCustomFilterMixin,
abstract=True
):
"Watch a status, and fire a post when status changes"
@abstractmethod
async def get_status(self, target: Target) -> Any:
...
@abstractmethod
def compare_status(self, target: Target, old_status, new_status) -> list[RawPost]:
...
@abstractmethod
async def parse(self, raw_post: RawPost) -> Post:
...
async def fetch_new_post(self, target: Target, users: list[UserSubInfo]) -> list[tuple[User, list[Post]]]:
try:
new_status = await self.get_status(target)
res = []
if old_status := self.get_stored_data(target):
diff = self.compare_status(target, old_status, new_status)
if diff:
logger.info("status changes {} {}: {} -> {}".format(
self.platform_name,
target if self.has_target else '-',
old_status, new_status
))
res = await self.dispatch_user_post(target, diff, users)
self.set_stored_data(target, new_status)
return res
except httpx.RequestError as err:
logger.warning("network connection error: {}, url: {}".format(type(err), err.request.url))
return []
class SimplePost(
Platform,
MessageProcessMixin,
UserCustomFilterMixin,
StorageMixinProto,
abstract=True
):
"Fetch a list of messages, dispatch it to different users"
async def fetch_new_post(self, target: Target, users: list[UserSubInfo]) -> list[tuple[User, list[Post]]]:
try:
new_posts = await self.get_sub_list(target)
if not new_posts:
return []
else:
for post in new_posts:
logger.info('fetch new post from {} {}: {}'.format(
self.platform_name,
target if self.has_target else '-',
self.get_id(post)))
res = await self.dispatch_user_post(target, new_posts, users)
self.parse_cache = {}
return res
except httpx.RequestError as err:
logger.warning("network connection error: {}, url: {}".format(type(err), err.request.url))
return []
class NoTargetGroup(
Platform,
NoTargetMixin,
UserCustomFilterMixin,
abstract=True
):
enable_tag = False
DUMMY_STR = '_DUMMY'
enabled = True
class PlatformProto(Platform, NoTargetMixin, UserCustomFilterMixin, abstract=True):
...
def __init__(self, platform_list: list[PlatformProto]):
self.platform_list = platform_list
name = self.DUMMY_STR
self.categories = {}
categories_keys = set()
self.schedule_type = platform_list[0].schedule_type
self.schedule_kw = platform_list[0].schedule_kw
for platform in platform_list:
if name == self.DUMMY_STR:
name = platform.name
elif name != platform.name:
raise RuntimeError('Platform name for {} not fit'.format(self.platform_name))
platform_category_key_set = set(platform.categories.keys())
if platform_category_key_set & categories_keys:
raise RuntimeError('Platform categories for {} duplicate'.format(self.platform_name))
categories_keys |= platform_category_key_set
self.categories.update(platform.categories)
if platform.schedule_kw != self.schedule_kw or platform.schedule_type != self.schedule_type:
raise RuntimeError('Platform scheduler for {} not fit'.format(self.platform_name))
self.name = name
self.is_common = platform_list[0].is_common
super().__init__()
def __str__(self):
return '[' + ' '.join(map(lambda x: x.name, self.platform_list)) + ']'
async def get_target_name(self, _):
return await self.platform_list[0].get_target_name(_)
async def fetch_new_post(self, target, users):
res = defaultdict(list)
for platform in self.platform_list:
platform_res = await platform.fetch_new_post(target=target, users=users)
for user, posts in platform_res:
res[user].extend(posts)
return [[key, val] for key, val in res.items()]
+49
View File
@@ -0,0 +1,49 @@
import calendar
from typing import Any, Optional
from bs4 import BeautifulSoup as bs
import feedparser
import httpx
from ..post import Post
from ..types import RawPost, Target
from .platform import NewMessage, TargetMixin
class Rss(NewMessage, TargetMixin):
categories = {}
enable_tag = False
platform_name = 'rss'
name = "Rss"
enabled = True
is_common = True
schedule_type = 'interval'
schedule_kw = {'seconds': 30}
async def get_target_name(self, target: Target) -> Optional[str]:
async with httpx.AsyncClient() as client:
res = await client.get(target, timeout=10.0)
feed = feedparser.parse(res.text)
return feed['feed']['title']
def get_date(self, post: RawPost) -> int:
return calendar.timegm(post.published_parsed)
def get_id(self, post: RawPost) -> Any:
return post.id
async def get_sub_list(self, target: Target) -> list[RawPost]:
async with httpx.AsyncClient() as client:
res = await client.get(target, timeout=10.0)
feed = feedparser.parse(res)
entries = feed.entries
for entry in entries:
entry['_target_name'] = feed.feed.title
return feed.entries
async def parse(self, raw_post: RawPost) -> Post:
text = raw_post.get('title', '') + '\n' if raw_post.get('title') else ''
soup = bs(raw_post.description, 'html.parser')
text += soup.text.strip()
pics = list(map(lambda x: x.attrs['src'], soup('img')))
return Post('rss', text=text, url=raw_post.link, pics=pics, target_name=raw_post['_target_name'])
@@ -0,0 +1,78 @@
from datetime import datetime
import hashlib
import json
import re
from typing import Any, Optional
from bs4 import BeautifulSoup as bs
import httpx
from ..post import Post
from ..types import *
# from .platform import Platform
# class Wechat(Platform):
# categories = {}
# enable_tag = False
# platform_name = 'wechat'
# enabled = False
# is_common = False
# name = '微信公众号'
# @classmethod
# def _get_query_url(cls, target: Target):
# return 'https://weixin.sogou.com/weixin?type=1&s_from=input&query={}&ie=utf8&_sug_=n&_sug_type_='.format(target)
# @classmethod
# async def _get_target_soup(cls, target: Target) -> Optional[bs]:
# target_url = cls._get_query_url(target)
# async with httpx.AsyncClient() as client:
# res = await client.get(target_url)
# soup = bs(res.text, 'html.parser')
# blocks = soup.find(class_='news-list2').find_all('li',recursive=False)
# for block in blocks:
# if block.find(string=[target]):
# return block
# @classmethod
# async def get_account_name(cls, target: Target) -> Optional[str]:
# if not (block := await cls._get_target_soup(target)):
# return None
# return block.find('p', class_='tit').find('a').text
# async def get_sub_list(self, target: Target) -> list[RawPost]:
# block = await self._get_target_soup(target)
# if (last_post_dt := block.find('dt', string='最近文章:')):
# post = {
# 'title': last_post_dt.find_parent().find('a').text,
# 'target': target,
# 'page_url': self._get_query_url(target),
# 'name': block.find('p', class_='tit').find('a').text
# }
# return [post]
# else:
# return []
# def get_id(self, post: RawPost) -> Any:
# return post['title']
# def get_date(self, post: RawPost):
# return None
# def get_tags(self, post: RawPost):
# return None
# def get_category(self, post: RawPost):
# return None
# async def parse(self, raw_post: RawPost) -> Post:
# # TODO get content of post
# return Post(target_type='wechat',
# text='{}\n详细内容请自行查看公众号'.format(raw_post['title']),
# target_name=raw_post['name'],
# pics=[],
# url=''
# )
+120
View File
@@ -0,0 +1,120 @@
from datetime import datetime
import json
import re
from typing import Any, Optional
from bs4 import BeautifulSoup as bs
import httpx
from nonebot import logger
from ..post import Post
from ..types import *
from .platform import NewMessage, TargetMixin
class Weibo(NewMessage, TargetMixin):
categories = {
1: '转发',
2: '视频',
3: '图文',
4: '文字',
}
enable_tag = True
platform_name = 'weibo'
name = '新浪微博'
enabled = True
is_common = True
schedule_type = 'interval'
schedule_kw = {'seconds': 3}
async def get_target_name(self, target: Target) -> Optional[str]:
async with httpx.AsyncClient() as client:
param = {'containerid': '100505' + target}
res = await client.get('https://m.weibo.cn/api/container/getIndex', params=param)
res_dict = json.loads(res.text)
if res_dict.get('ok') == 1:
return res_dict['data']['userInfo']['screen_name']
else:
return None
async def get_sub_list(self, target: Target) -> list[RawPost]:
async with httpx.AsyncClient() as client:
params = { 'containerid': '107603' + target}
res = await client.get('https://m.weibo.cn/api/container/getIndex?', params=params, timeout=4.0)
res_data = json.loads(res.text)
if not res_data['ok']:
return []
custom_filter: Callable[[RawPost], bool] = lambda d: d['card_type'] == 9
return list(filter(custom_filter, res_data['data']['cards']))
def get_id(self, post: RawPost) -> Any:
return post['mblog']['id']
def filter_platform_custom(self, raw_post: RawPost) -> bool:
return raw_post['card_type'] == 9
def get_date(self, raw_post: RawPost) -> float:
created_time = datetime.strptime(raw_post['mblog']['created_at'], '%a %b %d %H:%M:%S %z %Y')
return created_time.timestamp()
def get_tags(self, raw_post: RawPost) -> Optional[list[Tag]]:
"Return Tag list of given RawPost"
text = raw_post['mblog']['text']
soup = bs(text, 'html.parser')
res = list(map(
lambda x: x[1:-1],
filter(
lambda s: s[0] == '#' and s[-1] == '#',
map(lambda x:x.text, soup.find_all('span', class_='surl-text'))
)
))
super_topic_img = soup.find('img', src=re.compile(r'timeline_card_small_super_default'))
if super_topic_img:
try:
res.append(super_topic_img.parent.parent.find('span', class_='surl-text').text + '超话')
except:
logger.info('super_topic extract error: {}'.format(text))
return res
def get_category(self, raw_post: RawPost) -> Category:
if raw_post['mblog'].get('retweeted_status'):
return Category(1)
elif raw_post['mblog'].get('page_info') and raw_post['mblog']['page_info'].get('type') == 'video':
return Category(2)
elif raw_post['mblog'].get('pics'):
return Category(3)
else:
return Category(4)
def _get_text(self, raw_text: str) -> str:
text = raw_text.replace('<br />', '\n')
return bs(text, 'html.parser').text
async def parse(self, raw_post: RawPost) -> Post:
header = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'accept-language': 'zh-CN,zh;q=0.9',
'authority': 'm.weibo.cn',
'cache-control': 'max-age=0',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'same-origin',
'sec-fetch-site': 'same-origin',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) '
'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.72 '
'Mobile Safari/537.36'}
info = raw_post['mblog']
if info['isLongText'] or info['pic_num'] > 9:
async with httpx.AsyncClient() as client:
res = await client.get('https://m.weibo.cn/detail/{}'.format(info['mid']), headers=header)
try:
full_json_text = re.search(r'"status": ([\s\S]+),\s+"hotScheme"', res.text).group(1)
info = json.loads(full_json_text)
except:
logger.info('detail message error: https://m.weibo.cn/detail/{}'.format(info['mid']))
parsed_text = self._get_text(info['text'])
pic_urls = [img['large']['url'] for img in info.get('pics', [])]
detail_url = 'https://weibo.com/{}/{}'.format(info['user']['id'], info['bid'])
# return parsed_text, detail_url, pic_urls
return Post('weibo', text=parsed_text, url=detail_url, pics=pic_urls, target_name=info['user']['screen_name'])