Merge remote-tracking branch 'upstream/main' into feat/custom-post

This commit is contained in:
Azide
2022-07-10 12:52:00 +08:00
36 changed files with 33576 additions and 341 deletions
+13 -6
View File
@@ -353,7 +353,7 @@ async def send_group_list(bot: Bot, event: PrivateMessageEvent, state: T_State):
for idx, group in enumerate(groups, 1):
group_number_idx[idx] = group["group_id"]
res_text += f'{idx}. {group["group_id"]} - {group["group_name"]}\n'
res_text += "请输入左侧序号"
res_text += "请输入左侧序号\n中止操作请输入'取消'"
# await group_manage_matcher.send(res_text)
state["_prompt"] = res_text
state["group_number_idx"] = group_number_idx
@@ -365,11 +365,16 @@ async def _parse_group_idx(state: T_State, event_msg: str = EventPlainText()):
group_number_idx: Optional[dict[int, int]] = state.get("group_number_idx")
assert group_number_idx
try:
assert event_msg != "取消", "userAbort"
idx = int(event_msg)
assert idx in group_number_idx.keys()
assert idx in group_number_idx.keys(), "idxNotInList"
state["group_idx"] = idx
except:
await group_manage_matcher.reject("请输入正确序号")
except AssertionError as AE:
errType = AE.args[0]
if errType == "userAbort":
await group_manage_matcher.finish("已取消")
elif errType == "idxNotInList":
await group_manage_matcher.reject("请输入正确序号")
@group_manage_matcher.got(
@@ -383,13 +388,13 @@ async def do_choose_group_number(state: T_State):
async def _check_command(event_msg: str = EventPlainText()):
if event_msg not in {"添加订阅", "查询订阅", "删除订阅"}:
if event_msg not in {"添加订阅", "查询订阅", "删除订阅", "取消"}:
await group_manage_matcher.reject("请输入正确的命令")
return
@group_manage_matcher.got(
"command", "请输入需要使用的命令:添加订阅,查询订阅,删除订阅", [Depends(_check_command)]
"command", "请输入需要使用的命令:添加订阅,查询订阅,删除订阅,取消", [Depends(_check_command)]
)
async def do_dispatch_command(
bot: Bot,
@@ -398,6 +403,8 @@ async def do_dispatch_command(
matcher: Matcher,
command: str = ArgStr(),
):
if command == "取消":
await group_manage_matcher.finish("已取消")
permission = await matcher.update_permission(bot, event)
new_matcher = Matcher.new(
"message",
+71 -1
View File
@@ -5,7 +5,7 @@ from typing import Any, Optional
from ..post import Post
from ..types import Category, RawPost, Tag, Target
from ..utils import http_client
from .platform import CategoryNotSupport, NewMessage
from .platform import CategoryNotSupport, NewMessage, StatusChange
class Bilibili(NewMessage):
@@ -155,3 +155,73 @@ class Bilibili(NewMessage):
else:
raise CategoryNotSupport(post_type)
return Post("bilibili", text=text, url=url, pics=pic, target_name=target_name)
class Bilibililive(StatusChange):
# Author : Sichongzou
# Date : 2022-5-18 8:54
# Description : bilibili开播提醒
# E-mail : 1557157806@qq.com
categories = {}
platform_name = "bilibili-live"
enable_tag = False
enabled = True
is_common = True
schedule_type = "interval"
schedule_kw = {"seconds": 10}
name = "Bilibili直播"
has_target = True
async def get_target_name(self, target: Target) -> Optional[str]:
async with http_client() as client:
res = await client.get(
"https://api.bilibili.com/x/space/acc/info", params={"mid": target}
)
res_data = json.loads(res.text)
if res_data["code"]:
return None
return res_data["data"]["name"]
async def get_status(self, target: Target):
async with http_client() as client:
params = {"mid": target}
res = await client.get(
"https://api.bilibili.com/x/space/acc/info",
params=params,
timeout=4.0,
)
res_dict = json.loads(res.text)
if res_dict["code"] == 0:
info = {}
info["uid"] = res_dict["data"]["mid"]
info["uname"] = res_dict["data"]["name"]
info["live_state"] = res_dict["data"]["live_room"]["liveStatus"]
info["room_id"] = res_dict["data"]["live_room"]["roomid"]
info["title"] = res_dict["data"]["live_room"]["title"]
info["cover"] = res_dict["data"]["live_room"]["cover"]
return info
else:
return []
def compare_status(self, target: Target, old_status, new_status) -> list[RawPost]:
if (
new_status["live_state"] != old_status["live_state"]
and new_status["live_state"] == 1
):
return [new_status]
else:
return []
async def parse(self, raw_post: RawPost) -> Post:
url = "https://live.bilibili.com/{}".format(raw_post["room_id"])
pic = [raw_post["cover"]]
target_name = raw_post["uname"]
title = raw_post["title"]
return Post(
self.name,
text=title,
url=url,
pics=pic,
target_name=target_name,
compress=True,
)
@@ -0,0 +1,265 @@
import re
import time
from typing import Literal
import httpx
from bs4 import BeautifulSoup, NavigableString, Tag
from ..post import Post
from ..types import Category, RawPost, Target
from .platform import CategoryNotSupport, NewMessage
class McbbsNews(NewMessage):
categories = {1: "Java版本资讯", 2: "基岩版本资讯", 3: "快讯", 4: "基岩快讯", 5: "周边消息"}
enable_tag = False
platform_name = "mcbbsnews"
name = "MCBBS幻翼块讯"
enabled = True
is_common = False
schedule_type = "interval"
schedule_kw = {"hours": 1}
has_target = False
async def get_target_name(self, _: Target) -> str:
return self.name
async def get_sub_list(self, _: Target) -> list[RawPost]:
url = "https://www.mcbbs.net/forum-news-1.html"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/51.0.2704.63 Safari/537.36"
}
async with httpx.AsyncClient() as client:
html = await client.get(url, headers=headers)
soup = BeautifulSoup(html.text, "html.parser")
raw_post_list = soup.find_all(
"tbody", id=re.compile(r"normalthread_[0-9]*")
)
post_list = self._gen_post_list(raw_post_list)
return post_list
@staticmethod
def _format_text(rawtext: str, mode: int) -> str:
"""处理BeautifulSoup生成的string中奇怪的回车+连续空格
mode 0:处理标题
mode 1:处理版本资讯类推文
mode 2:处理快讯类推文"""
if mode == 0:
ftext = re.sub(r"\n\s*", " ", rawtext)
elif mode == 1:
ftext = re.sub(r"[\n\s*]", "", rawtext)
elif mode == 2:
ftext = re.sub(r"\r\n", "", rawtext)
else:
raise NotImplementedError
return ftext
@staticmethod
def _stamp_date(rawdate: str) -> int:
"""将时间转化为时间戳yyyy-mm-dd->timestamp"""
time_stamp = int(time.mktime(time.strptime(rawdate, "%Y-%m-%d")))
return time_stamp
def _gen_post_list(self, raw_post_list) -> list[RawPost]:
"""解析生成推文列表"""
post_list = []
for raw_post in raw_post_list:
post = {}
post["url"] = raw_post.find("a", class_="s xst")["href"]
post["title"] = self._format_text(
raw_post.find("a", class_="s xst").string, 0
)
post["category"] = raw_post.select("th em a")[0].string
post["author"] = raw_post.select("td:nth-of-type(2) cite a")[0].string
post["id"] = raw_post["id"]
rawdate = (
raw_post.select("td:nth-of-type(2) em span span")[0]["title"]
if raw_post.select("td:nth-of-type(2) em span span")
else raw_post.select("td:nth-of-type(2) em span")[0].string
)
post["date"] = self._stamp_date(rawdate)
post_list.append(post)
return post_list
def get_id(self, post: RawPost) -> str:
return post["id"]
def get_date(self, post: RawPost) -> int:
# 获取datetime精度只到日期,故暂时舍弃
# return post["date"]
return None
def get_category(self, post: RawPost) -> Category:
if post["category"] == "Java版本资讯":
return Category(1)
elif post["category"] == "基岩版本资讯":
return Category(2)
else:
raise CategoryNotSupport("McbbsNews订阅暂不支持 `{}".format(post["category"]))
@staticmethod
def _check_str_chinese(check_str: str) -> bool:
"""检测字符串是否含有中文(有一个就算)"""
for ch in check_str:
if "\u4e00" <= ch <= "\u9fff":
return True
return False
def _news_parser(self, raw_text: str, news_type: Literal["Java版本资讯", "基岩版本资讯"]):
"""提取Java/Bedrock版本资讯的推送消息"""
raw_soup = BeautifulSoup(raw_text.replace("<br />", ""), "html.parser")
# 获取头图
if news_type == "Java版本资讯":
# 获取头图
pic_tag = raw_soup.find(
"img", file=re.compile(r"https://www.minecraft.net/\S*header.jpg")
)
pic_url: list[str] = (
[pic_tag.get("src", pic_tag.get("file"))] if pic_tag else []
)
# 获取blockquote标签下的内容
soup = raw_soup.find(
"td", id=re.compile(r"postmessage_[0-9]*")
).blockquote.blockquote
elif news_type == "基岩版本资讯":
# 获取头图
pic_tag_0 = raw_soup.find(
"img", file=re.compile(r"https://www.minecraft.net/\S*header.jpg")
)
pic_tag_1 = raw_soup.find(
"img",
file=re.compile(r"https://feedback.minecraft.net/\S*beta\S*.jpg"),
)
pic_url: list[str] = [
pic_tag_0.get("src", pic_tag_0.get("file")) if pic_tag_0 else None,
pic_tag_1.get("src", pic_tag_1.get("file")) if pic_tag_1 else None,
]
# 获取blockquote标签下的内容
soup = (
raw_soup.find("td", id=re.compile(r"postmessage_[0-9]*"))
.select("blockquote:nth-of-type(2)")[0]
.blockquote
)
else:
raise CategoryNotSupport(f"该函数不支持处理{news_type}")
# 通用步骤
# 删除无用的div和span段内容
for del_tag in soup.find_all(["div", "span"]):
del_tag.extract()
# 进一步删除无用尾部
# orig_info=soup.select("blockquote > strong")
# orig_info[0].extract()
# 展开所有的a,u和strong标签,展开ul,font标签里的font标签
for unwrap_tag in soup.find_all(["a", "strong", "u", "ul", "font"]):
if unwrap_tag.name in ["a", "strong", "u"]: # 展开所有的a,u和strong标签
unwrap_tag.unwrap()
elif unwrap_tag.name in ["ul", "font"]: # 展开ul,font里的font标签
for font_tag in unwrap_tag.find_all("font"):
font_tag.unwrap()
# 获取所有的中文句子
post_text = ""
last_is_empty_line = True
for element in soup.contents:
if isinstance(element, Tag):
if element.name == "font":
text = ""
for sub in element.contents:
if isinstance(sub, NavigableString):
text += sub
if self._check_str_chinese(text):
post_text += "{}\n".format(self._format_text(text, 1))
last_is_empty_line = False
elif element.name == "ul":
for li_tag in element.find_all("li"):
text = ""
for sub in li_tag.contents:
if isinstance(sub, NavigableString):
text += sub
if self._check_str_chinese(text):
post_text += "{}\n".format(self._format_text(text, 1))
last_is_empty_line = False
else:
continue
elif isinstance(element, NavigableString):
if str(element) == "\n":
if not last_is_empty_line:
post_text += "\n"
last_is_empty_line = True
else:
post_text += "{}\n".format(self._format_text(element, 1))
last_is_empty_line = False
else:
continue
return post_text, pic_url
def _express_parser(self, raw_text: str, news_type: Literal["快讯", "基岩快讯", "周边消息"]):
"""提取快讯/基岩快讯/周边消息的推送消息"""
raw_soup = BeautifulSoup(raw_text.replace("<br />", ""), "html.parser")
# 获取原始推文内容
soup = raw_soup.find("td", id=re.compile(r"postmessage_[0-9]*"))
if tag := soup.find("ignore_js_op"):
tag.extract()
# 获取所有图片
pic_urls = []
for img_tag in soup.find_all("img"):
pic_url = img_tag.get("file") or img_tag.get("src")
pic_urls.append(pic_url)
# 验证是否有blockquote标签
has_bolockquote = soup.find("blockquote")
# 删除无用的span,div段内容
for del_tag in soup.find_all("i"):
del_tag.extract()
if extag := soup.find(class_="attach_nopermission attach_tips"):
extag.extract()
# 展开所有的a,strong标签
for unwrap_tag in soup.find_all(["a", "strong"]):
unwrap_tag.unwrap()
# 展开blockquote标签里的blockquote标签
for b_tag in soup.find_all("blockquote"):
for unwrap_tag in b_tag.find_all("blockquote"):
unwrap_tag.unwrap()
# 获取推文
text = ""
if has_bolockquote:
for post in soup.find_all("blockquote"):
# post.font.unwrap()
for string in post.stripped_strings:
text += "{}\n".format(string)
else:
for string in soup.stripped_strings:
text += "{}\n".format(string)
ftext = self._format_text(text, 2)
return ftext, pic_urls
async def parse(self, raw_post: RawPost) -> Post:
"""获取并分配正式推文交由相应的函数解析"""
post_url = "https://www.mcbbs.net/{}".format(raw_post["url"])
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/51.0.2704.63 Safari/537.36"
}
async with httpx.AsyncClient() as client:
html = await client.get(post_url, headers=headers)
if raw_post["category"] in ["Java版本资讯", "基岩版本资讯"]:
# 事先删除不需要的尾部
raw_text = re.sub(r"【本文排版借助了:[\s\S]*】", "", html.text)
text, pic_urls = self._news_parser(raw_text, raw_post["category"])
elif raw_post["category"] in ["快讯", "基岩快讯", "周边消息"]:
text, pic_urls = self._express_parser(html.text, raw_post["category"])
else:
raise CategoryNotSupport("McbbsNews订阅暂不支持 `{}".format(raw_post["category"]))
return Post(
self.name,
text="{}\n\n{}".format(raw_post["title"], text),
url=post_url,
pics=pic_urls,
target_name=raw_post["category"],
)
+61 -74
View File
@@ -1,3 +1,4 @@
import json
import ssl
import time
from abc import ABC, abstractmethod
@@ -60,6 +61,25 @@ class Platform(metaclass=RegistryABCMeta, base=True):
) -> list[tuple[User, list[Post]]]:
...
async def do_fetch_new_post(
self, target: Target, users: list[UserSubInfo]
) -> list[tuple[User, list[Post]]]:
try:
return await self.fetch_new_post(target, users)
except httpx.RequestError as err:
logger.warning(
"network connection error: {}, url: {}".format(
type(err), err.request.url
)
)
return []
except ssl.SSLError as err:
logger.warning(f"ssl error: {err}")
return []
except json.JSONDecodeError as err:
logger.warning(f"json error, parsing: {err.doc}")
return []
@abstractmethod
async def parse(self, raw_post: RawPost) -> Post:
...
@@ -227,33 +247,22 @@ class NewMessage(MessageProcess, abstract=True):
async def fetch_new_post(
self, target: Target, users: list[UserSubInfo]
) -> list[tuple[User, list[Post]]]:
try:
post_list = await self.get_sub_list(target)
new_posts = await self.filter_common_with_diff(target, post_list)
if not new_posts:
return []
else:
for post in new_posts:
logger.info(
"fetch new post from {} {}: {}".format(
self.platform_name,
target if self.has_target else "-",
self.get_id(post),
)
post_list = await self.get_sub_list(target)
new_posts = await self.filter_common_with_diff(target, post_list)
if not new_posts:
return []
else:
for post in new_posts:
logger.info(
"fetch new post from {} {}: {}".format(
self.platform_name,
target if self.has_target else "-",
self.get_id(post),
)
res = await self.dispatch_user_post(target, new_posts, users)
self.parse_cache = {}
return res
except httpx.RequestError as err:
logger.warning(
"network connection error: {}, url: {}".format(
type(err), err.request.url
)
)
return []
except ssl.SSLError as err:
logger.warning(f"ssl error: {err}")
return []
res = await self.dispatch_user_post(target, new_posts, users)
self.parse_cache = {}
return res
class StatusChange(Platform, abstract=True):
@@ -274,33 +283,22 @@ class StatusChange(Platform, abstract=True):
async def fetch_new_post(
self, target: Target, users: list[UserSubInfo]
) -> list[tuple[User, list[Post]]]:
try:
new_status = await self.get_status(target)
res = []
if old_status := self.get_stored_data(target):
diff = self.compare_status(target, old_status, new_status)
if diff:
logger.info(
"status changes {} {}: {} -> {}".format(
self.platform_name,
target if self.has_target else "-",
old_status,
new_status,
)
new_status = await self.get_status(target)
res = []
if old_status := self.get_stored_data(target):
diff = self.compare_status(target, old_status, new_status)
if diff:
logger.info(
"status changes {} {}: {} -> {}".format(
self.platform_name,
target if self.has_target else "-",
old_status,
new_status,
)
res = await self.dispatch_user_post(target, diff, users)
self.set_stored_data(target, new_status)
return res
except httpx.RequestError as err:
logger.warning(
"network connection error: {}, url: {}".format(
type(err), err.request.url
)
)
return []
except ssl.SSLError as err:
logger.warning(f"ssl error: {err}")
return []
res = await self.dispatch_user_post(target, diff, users)
self.set_stored_data(target, new_status)
return res
class SimplePost(MessageProcess, abstract=True):
@@ -309,32 +307,21 @@ class SimplePost(MessageProcess, abstract=True):
async def fetch_new_post(
self, target: Target, users: list[UserSubInfo]
) -> list[tuple[User, list[Post]]]:
try:
new_posts = await self.get_sub_list(target)
if not new_posts:
return []
else:
for post in new_posts:
logger.info(
"fetch new post from {} {}: {}".format(
self.platform_name,
target if self.has_target else "-",
self.get_id(post),
)
new_posts = await self.get_sub_list(target)
if not new_posts:
return []
else:
for post in new_posts:
logger.info(
"fetch new post from {} {}: {}".format(
self.platform_name,
target if self.has_target else "-",
self.get_id(post),
)
res = await self.dispatch_user_post(target, new_posts, users)
self.parse_cache = {}
return res
except httpx.RequestError as err:
logger.warning(
"network connection error: {}, url: {}".format(
type(err), err.request.url
)
)
return []
except ssl.SSLError as err:
logger.warning(f"ssl error: {err}")
return []
res = await self.dispatch_user_post(target, new_posts, users)
self.parse_cache = {}
return res
class NoTargetGroup(Platform, abstract=True):
@@ -48,6 +48,10 @@ class Rss(NewMessage):
soup = bs(raw_post.description, "html.parser")
text += soup.text.strip()
pics = list(map(lambda x: x.attrs["src"], soup("img")))
if raw_post.get("media_content"):
for media in raw_post["media_content"]:
if media.get("medium") == "image" and media.get("url"):
pics.append(media.get("url"))
return Post(
"rss",
text=text,
+1 -1
View File
@@ -59,7 +59,7 @@ async def fetch_and_send(target_type: str):
send_user_list,
)
)
to_send = await platform_manager[target_type].fetch_new_post(
to_send = await platform_manager[target_type].do_fetch_new_post(
target, send_userinfo_list
)
if not to_send: