♻️ 使用更加简约的方法生成mcbbsnews的推送,修改测试用例 (#170)

* change(mcbbsnews):使用更加简约的方法生成mcbbsnews的推送,修改测试用例

test(mcbbsnews):添加测试函数小工具

change(mcbbsnews):优化代码

test(mcbbsnews):调整测试

test(mcbbsnews):完善细节部分

fix(mcbbsnews):修改traceback的import位置

test fix(mcbbsnews):删除错误传参

* fix(mcbbsnews): 更新过时的category名称

feat(platform): 添加新的异常(CategoryNotRecognize), 用以区别已知但不支持的category(CategoryNotSupport)和未知的新增category(CategoryNotRecognize)

chore: 为各处的CategoryNotRecognize和CategoryNotSupport添加异常描述

test(mcbbsnews): 更新测试用文件的过时category名称
This commit is contained in:
AzideCupric
2023-02-05 17:00:11 +08:00
committed by felinae98
parent 15e8dca5f8
commit 7fa31b6060
31 changed files with 343 additions and 13010 deletions
@@ -8,7 +8,7 @@ from nonebot.plugin import require
from ..post import Post
from ..types import Category, RawPost, Target
from ..utils.scheduler_config import SchedulerConfig
from .platform import CategoryNotSupport, NewMessage, StatusChange
from .platform import CategoryNotRecognize, NewMessage, StatusChange
class ArknightsSchedConf(SchedulerConfig):
@@ -79,7 +79,7 @@ class Arknights(NewMessage):
elif pic := soup.find("img", class_="banner-image"):
pics.append(pic["src"]) # type: ignore
else:
raise CategoryNotSupport()
raise CategoryNotRecognize("未找到可渲染部分")
return Post(
"arknights",
text=text,
@@ -12,7 +12,7 @@ from typing_extensions import Self
from ..post import Post
from ..types import ApiError, Category, RawPost, Tag, Target
from ..utils import SchedulerConfig
from .platform import CategoryNotSupport, NewMessage, StatusChange
from .platform import CategoryNotRecognize, CategoryNotSupport, NewMessage, StatusChange
class BilibiliSchedConf(SchedulerConfig):
@@ -121,7 +121,7 @@ class Bilibili(NewMessage):
elif post_type == 1:
# 转发
return Category(5)
raise CategoryNotSupport()
raise CategoryNotRecognize(post_type)
def get_category(self, post: RawPost) -> Category:
post_type = post["desc"]["type"]
@@ -153,7 +153,7 @@ class Bilibili(NewMessage):
text = card["item"]["content"]
pic = []
else:
raise CategoryNotSupport()
raise CategoryNotSupport(post_type)
return text, pic
async def parse(self, raw_post: RawPost) -> Post:
+138 -238
View File
@@ -1,290 +1,190 @@
import re
import time
from typing import Literal, Optional
import traceback
from typing import Literal
import httpx
from bs4 import BeautifulSoup, NavigableString, Tag
from bs4 import BeautifulSoup, Tag
from httpx import AsyncClient
from nonebot.plugin import require
from ..plugin_config import plugin_config
from ..post import Post
from ..types import Category, RawPost, Target
from ..utils import scheduler
from .platform import CategoryNotSupport, NewMessage
from ..utils import SchedulerConfig, http_client
from .platform import CategoryNotRecognize, CategoryNotSupport, NewMessage
def _format_text(rawtext: str, mode: int) -> str:
"""处理BeautifulSoup生成的string中奇怪的回车+连续空格
mode 0:处理标题
mode 1:处理版本资讯类推文
mode 2:处理快讯类推文"""
match mode:
case 0:
ftext = re.sub(r"\n\s*", " ", rawtext)
case 1:
ftext = re.sub(r"[\n\s*]", "", rawtext)
case 2:
ftext = re.sub(r"\r\n", "", rawtext)
return ftext
def _stamp_date(rawdate: str) -> int:
"""将时间转化为时间戳yyyy-mm-dd->timestamp"""
time_stamp = int(time.mktime(time.strptime(rawdate, "%Y-%m-%d")))
return time_stamp
class McbbsnewsSchedConf(SchedulerConfig):
name = "mcbbsnews"
schedule_type = "interval"
schedule_setting = {"minutes": 30}
class McbbsNews(NewMessage):
categories = {1: "Java版本资讯", 2: "基岩版本资讯", 3: "快讯", 4: "基岩快讯", 5: "周边消息"}
enable_tag = False
platform_name = "mcbbsnews"
name = "MCBBS幻翼块讯"
enabled = True
is_common = False
scheduler = scheduler("interval", {"hours": 1})
has_target = False
categories: dict[int, str] = {
1: "Java版资讯",
2: "基岩版资讯",
3: "块讯",
4: "基岩块讯",
5: "周边",
6: "主机",
7: "时评",
}
enable_tag: bool = False
platform_name: str = "mcbbsnews"
name: str = "MCBBS幻翼块讯"
enabled: bool = True
is_common: bool = False
scheduler = McbbsnewsSchedConf
has_target: bool = False
_known_cats: dict[int, str] = {
1: "Java版资讯",
2: "基岩版资讯",
3: "块讯",
4: "基岩块讯",
5: "周边",
6: "主机",
7: "时评",
}
@classmethod
async def get_target_name(
cls, client: AsyncClient, target: Target
) -> Optional[str]:
async def get_target_name(cls, client: AsyncClient, target: Target) -> str:
return cls.name
async def get_sub_list(self, _: Target) -> list[RawPost]:
url = "https://www.mcbbs.net/forum-news-1.html"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/51.0.2704.63 Safari/537.36"
}
url: str = "https://www.mcbbs.net/forum-news-1.html"
async with httpx.AsyncClient() as client:
html = await client.get(url, headers=headers)
soup = BeautifulSoup(html.text, "html.parser")
raw_post_list = soup.find_all(
"tbody", id=re.compile(r"normalthread_[0-9]*")
)
post_list = self._gen_post_list(raw_post_list)
html = await self.client.get(url)
soup = BeautifulSoup(html.text, "html.parser")
raw_post_list = soup.find_all("tbody", id=re.compile(r"normalthread_[0-9]*"))
post_list = self._gen_post_list(raw_post_list)
return post_list
@staticmethod
def _format_text(rawtext: str, mode: int) -> str:
"""处理BeautifulSoup生成的string中奇怪的回车+连续空格
mode 0:处理标题
mode 1:处理版本资讯类推文
mode 2:处理快讯类推文"""
if mode == 0:
ftext = re.sub(r"\n\s*", " ", rawtext)
elif mode == 1:
ftext = re.sub(r"[\n\s*]", "", rawtext)
elif mode == 2:
ftext = re.sub(r"\r\n", "", rawtext)
else:
raise NotImplementedError
return ftext
@staticmethod
def _stamp_date(rawdate: str) -> int:
"""将时间转化为时间戳yyyy-mm-dd->timestamp"""
time_stamp = int(time.mktime(time.strptime(rawdate, "%Y-%m-%d")))
return time_stamp
def _gen_post_list(self, raw_post_list) -> list[RawPost]:
def _gen_post_list(self, raw_post_list: list[Tag]) -> list[RawPost]:
"""解析生成推文列表"""
post_list = []
for raw_post in raw_post_list:
post = {}
post["url"] = raw_post.find("a", class_="s xst")["href"]
post["title"] = self._format_text(
raw_post.find("a", class_="s xst").string, 0
)
url_tag = raw_post.find("a", class_="s xst")
if isinstance(url_tag, Tag):
post["url"] = url_tag.get("href")
title_tag = raw_post.find("a", class_="s xst")
if isinstance(title_tag, Tag):
title_string = title_tag.string
if isinstance(title_string, str):
post["title"] = self._format_text(title_string, "title")
post["category"] = raw_post.select("th em a")[0].string
post["author"] = raw_post.select("td:nth-of-type(2) cite a")[0].string
post["id"] = raw_post["id"]
rawdate = (
raw_date = (
raw_post.select("td:nth-of-type(2) em span span")[0]["title"]
if raw_post.select("td:nth-of-type(2) em span span")
else raw_post.select("td:nth-of-type(2) em span")[0].string
)
post["date"] = self._stamp_date(rawdate)
if isinstance(raw_date, str):
post["date"] = self._stamp_date(raw_date)
post_list.append(post)
return post_list
@staticmethod
def _format_text(raw_text: str, mode: str) -> str:
"""
处理BeautifulSoup生成的string中奇怪的回车+连续空格
参数:
title: 处理标题
"""
match mode:
case "title":
ftext = re.sub(r"\n\s*", " ", raw_text)
case _:
raise NotImplementedError("不支持的处理模式: {mode}")
return ftext
@staticmethod
def _stamp_date(raw_date: str) -> int:
"""
将时间转化为时间戳:
yyyy-mm-dd -> timestamp
"""
time_stamp = int(time.mktime(time.strptime(raw_date, "%Y-%m-%d")))
return time_stamp
def get_id(self, post: RawPost) -> str:
return post["id"]
def get_date(self, post: RawPost) -> int:
def get_date(self, _: RawPost) -> int | None:
# 获取datetime精度只到日期,故暂时舍弃
# return post["date"]
return None
def get_category(self, post: RawPost) -> Category:
if post["category"] == "Java版本资讯":
return Category(1)
elif post["category"] == "基岩版本资讯":
return Category(2)
categoty_name = post["category"]
category_keys = list(self.categories.keys())
category_values = list(self.categories.values())
known_category_values = list(self._known_cats.values())
if categoty_name in category_values:
category_id = category_keys[category_values.index(categoty_name)]
elif categoty_name in known_category_values:
raise CategoryNotSupport("McbbsNews订阅暂不支持 {}".format(categoty_name))
else:
raise CategoryNotSupport("McbbsNews订阅暂不支持 `{}".format(post["category"]))
raise CategoryNotRecognize("Mcbbsnews订阅尚未识别 {}".format(categoty_name))
return category_id
@staticmethod
def _check_str_chinese(check_str: str) -> bool:
"""检测字符串是否含有中文(有一个就算)"""
for ch in check_str:
if "\u4e00" <= ch <= "\u9fff":
return True
return False
async def parse(self, post: RawPost) -> Post:
"""获取并分配正式推文交由相应的函数渲染"""
post_url = "https://www.mcbbs.net/{}".format(post["url"])
async with http_client() as client:
html = await client.get(post_url)
html.raise_for_status()
def _news_parser(self, raw_text: str, news_type: Literal["Java版本资讯", "基岩版本资讯"]):
"""提取Java/Bedrock版本资讯的推送消息"""
raw_soup = BeautifulSoup(raw_text.replace("<br />", ""), "html.parser")
# 获取头图
if news_type == "Java版本资讯":
# 获取头图
pic_tag = raw_soup.find(
"img", file=re.compile(r"https://www.minecraft.net/\S*header.jpg")
)
pic_url: list[str] = (
[pic_tag.get("src", pic_tag.get("file"))] if pic_tag else []
)
# 获取blockquote标签下的内容
soup = raw_soup.find(
"td", id=re.compile(r"postmessage_[0-9]*")
).blockquote.blockquote
elif news_type == "基岩版本资讯":
# 获取头图
pic_tag_0 = raw_soup.find(
"img", file=re.compile(r"https://www.minecraft.net/\S*header.jpg")
)
pic_tag_1 = raw_soup.find(
"img",
file=re.compile(r"https://feedback.minecraft.net/\S*beta\S*.jpg"),
)
pic_url: list[str] = [
pic_tag_0.get("src", pic_tag_0.get("file")) if pic_tag_0 else None,
pic_tag_1.get("src", pic_tag_1.get("file")) if pic_tag_1 else None,
]
# 获取blockquote标签下的内容
soup = (
raw_soup.find("td", id=re.compile(r"postmessage_[0-9]*"))
.select("blockquote:nth-of-type(2)")[0]
.blockquote
)
soup = BeautifulSoup(html.text, "html.parser")
post_body = soup.find("td", id=re.compile(r"postmessage_[0-9]*"))
if isinstance(post_body, Tag):
post_id = post_body.attrs.get("id")
else:
raise CategoryNotSupport(f"该函数不支持处理{news_type}")
# 通用步骤
# 删除无用的div和span段内容
for del_tag in soup.find_all(["div", "span"]):
del_tag.extract()
# 进一步删除无用尾部
# orig_info=soup.select("blockquote > strong")
# orig_info[0].extract()
# 展开所有的a,u和strong标签,展开ul,font标签里的font标签
for unwrap_tag in soup.find_all(["a", "strong", "u", "ul", "font"]):
if unwrap_tag.name in ["a", "strong", "u"]: # 展开所有的a,u和strong标签
unwrap_tag.unwrap()
elif unwrap_tag.name in ["ul", "font"]: # 展开ul,font里的font标签
for font_tag in unwrap_tag.find_all("font"):
font_tag.unwrap()
# 获取所有的中文句子
post_text = ""
last_is_empty_line = True
for element in soup.contents:
if isinstance(element, Tag):
if element.name == "font":
text = ""
for sub in element.contents:
if isinstance(sub, NavigableString):
text += sub
if self._check_str_chinese(text):
post_text += "{}\n".format(self._format_text(text, 1))
last_is_empty_line = False
elif element.name == "ul":
for li_tag in element.find_all("li"):
text = ""
for sub in li_tag.contents:
if isinstance(sub, NavigableString):
text += sub
if self._check_str_chinese(text):
post_text += "{}\n".format(self._format_text(text, 1))
last_is_empty_line = False
else:
continue
elif isinstance(element, NavigableString):
if str(element) == "\n":
if not last_is_empty_line:
post_text += "\n"
last_is_empty_line = True
else:
post_text += "{}\n".format(self._format_text(element, 1))
last_is_empty_line = False
else:
continue
return post_text, pic_url
def _express_parser(self, raw_text: str, news_type: Literal["快讯", "基岩快讯", "周边消息"]):
"""提取快讯/基岩快讯/周边消息的推送消息"""
raw_soup = BeautifulSoup(raw_text.replace("<br />", ""), "html.parser")
# 获取原始推文内容
soup = raw_soup.find("td", id=re.compile(r"postmessage_[0-9]*"))
if tag := soup.find("ignore_js_op"):
tag.extract()
# 获取所有图片
pic_urls = []
for img_tag in soup.find_all("img"):
pic_url = img_tag.get("file") or img_tag.get("src")
pic_urls.append(pic_url)
# 验证是否有blockquote标签
has_bolockquote = soup.find("blockquote")
# 删除无用的span,div段内容
for del_tag in soup.find_all("i"):
del_tag.extract()
if extag := soup.find(class_="attach_nopermission attach_tips"):
extag.extract()
# 展开所有的a,strong标签
for unwrap_tag in soup.find_all(["a", "strong"]):
unwrap_tag.unwrap()
# 展开blockquote标签里的blockquote标签
for b_tag in soup.find_all("blockquote"):
for unwrap_tag in b_tag.find_all("blockquote"):
unwrap_tag.unwrap()
# 获取推文
text = ""
if has_bolockquote:
for post in soup.find_all("blockquote"):
# post.font.unwrap()
for string in post.stripped_strings:
text += "{}\n".format(string)
else:
for string in soup.stripped_strings:
text += "{}\n".format(string)
ftext = self._format_text(text, 2)
return ftext, pic_urls
async def parse(self, raw_post: RawPost) -> Post:
"""获取并分配正式推文交由相应的函数解析"""
post_url = "https://www.mcbbs.net/{}".format(raw_post["url"])
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/51.0.2704.63 Safari/537.36"
}
async with httpx.AsyncClient() as client:
html = await client.get(post_url, headers=headers)
if raw_post["category"] in ["Java版本资讯", "基岩版本资讯"]:
# 事先删除不需要的尾部
raw_text = re.sub(r"【本文排版借助了:[\s\S]*】", "", html.text)
text, pic_urls = self._news_parser(raw_text, raw_post["category"])
elif raw_post["category"] in ["快讯", "基岩快讯", "周边消息"]:
text, pic_urls = self._express_parser(html.text, raw_post["category"])
else:
raise CategoryNotSupport("McbbsNews订阅暂不支持 `{}".format(raw_post["category"]))
post_id = None
pics = await self._news_render(post_url, f"#{post_id}")
return Post(
self.name,
text="{}\n\n{}".format(raw_post["title"], text),
text="{}\n\n└由 {} 发表".format(post["title"], post["author"]),
url=post_url,
pics=pic_urls,
target_name=raw_post["category"],
pics=list(pics),
target_name=post["category"],
)
async def _news_render(self, url: str, selector: str) -> list[bytes]:
"""
将给定的url网页的指定CSS选择器部分渲染成图片
注意:
一般而言每条新闻的长度都很可观,图片生成时间比较喜人
"""
require("nonebot_plugin_htmlrender")
from nonebot_plugin_htmlrender import capture_element, text_to_pic
try:
assert url
pic_data = await capture_element(
url,
selector,
viewport={"width": 1000, "height": 6400},
device_scale_factor=3,
)
assert pic_data
except:
err_pic0 = await text_to_pic("错误发生!")
err_pic1 = await text_to_pic(traceback.format_exc())
return [err_pic0, err_pic1]
else:
return [pic_data]
+13 -4
View File
@@ -18,7 +18,11 @@ from ..utils import ProcessContext, SchedulerConfig
class CategoryNotSupport(Exception):
"raise in get_category, when post category is not supported"
"raise in get_category, when you know the category of the post but don't want to support it or don't support its parsing yet"
class CategoryNotRecognize(Exception):
"raise in get_category, when you don't know the category of post"
class RegistryMeta(type):
@@ -181,8 +185,9 @@ class Platform(metaclass=PlatformABCMeta, base=True):
if cats and cat not in cats:
continue
if self.enable_tag and tags:
if self.is_banned_post(
self.get_tags(raw_post), *self.tag_separator(tags)
raw_post_tags = self.get_tags(raw_post)
if isinstance(raw_post_tags, Collection) and self.is_banned_post(
raw_post_tags, *self.tag_separator(tags)
):
continue
res.append(raw_post)
@@ -255,7 +260,11 @@ class MessageProcess(Platform, abstract=True):
continue
try:
self.get_category(raw_post)
except CategoryNotSupport:
except CategoryNotSupport as e:
logger.info("未支持解析的推文类别:" + repr(e) + ",忽略")
continue
except CategoryNotRecognize as e:
logger.warning("未知推文类别:" + repr(e))
msgs = self.ctx.gen_req_records()
for m in msgs:
logger.warning(m)