mirror of
https://github.com/suyiiyii/nonebot-bison.git
synced 2025-06-02 09:26:12 +08:00
223 lines
8.4 KiB
Python
223 lines
8.4 KiB
Python
import re
|
||
import json
|
||
from datetime import datetime
|
||
from typing import Any, override
|
||
from urllib.parse import unquote
|
||
|
||
from yarl import URL
|
||
from lxml.etree import HTML
|
||
from httpx import AsyncClient
|
||
from nonebot.log import logger
|
||
from bs4 import BeautifulSoup as bs
|
||
|
||
from ..post import Post
|
||
from .platform import NewMessage
|
||
from ..utils import http_client, text_fletten
|
||
from ..utils.site import Site, CookieClientManager
|
||
from ..types import Tag, Target, RawPost, ApiError, Category
|
||
|
||
_HEADER = {
|
||
"accept": (
|
||
"text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,"
|
||
"*/*;q=0.8,application/signed-exchange;v=b3;q=0.9"
|
||
),
|
||
"accept-language": "zh-CN,zh;q=0.9",
|
||
"authority": "m.weibo.cn",
|
||
"cache-control": "max-age=0",
|
||
"sec-fetch-dest": "empty",
|
||
"sec-fetch-mode": "same-origin",
|
||
"sec-fetch-site": "same-origin",
|
||
"upgrade-insecure-requests": "1",
|
||
"user-agent": (
|
||
"Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) "
|
||
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.72 "
|
||
"Mobile Safari/537.36"
|
||
),
|
||
}
|
||
|
||
|
||
class WeiboClientManager(CookieClientManager):
|
||
_site_name = "weibo.com"
|
||
|
||
async def _get_current_user_name(self, cookies: dict) -> str:
|
||
url = "https://m.weibo.cn/setup/nick/detail"
|
||
async with http_client() as client:
|
||
r = await client.get(url, headers=_HEADER, cookies=cookies)
|
||
data = json.loads(r.text)
|
||
name = data["data"]["user"]["screen_name"]
|
||
return name
|
||
|
||
@override
|
||
async def get_cookie_name(self, content: str) -> str:
|
||
"""从cookie内容中获取cookie的友好名字,添加cookie时调用,持久化在数据库中"""
|
||
name = await self._get_current_user_name(json.loads(content))
|
||
|
||
return text_fletten(f"weibo: [{name[:10]}]")
|
||
|
||
|
||
class WeiboSite(Site):
|
||
name = "weibo.com"
|
||
schedule_type = "interval"
|
||
schedule_setting = {"seconds": 3}
|
||
client_mgr = WeiboClientManager
|
||
|
||
|
||
class Weibo(NewMessage):
|
||
categories = {
|
||
1: "转发",
|
||
2: "视频",
|
||
3: "图文",
|
||
4: "文字",
|
||
}
|
||
enable_tag = True
|
||
platform_name = "weibo"
|
||
name = "新浪微博"
|
||
enabled = True
|
||
is_common = True
|
||
site = WeiboSite
|
||
has_target = True
|
||
parse_target_promot = "请输入用户主页(包含数字UID)的链接"
|
||
|
||
@classmethod
|
||
async def get_target_name(cls, client: AsyncClient, target: Target) -> str | None:
|
||
param = {"containerid": "100505" + target}
|
||
res = await client.get("https://m.weibo.cn/api/container/getIndex", params=param)
|
||
res_dict = json.loads(res.text)
|
||
if res_dict.get("ok") == 1:
|
||
return res_dict["data"]["userInfo"]["screen_name"]
|
||
else:
|
||
return None
|
||
|
||
@classmethod
|
||
async def parse_target(cls, target_text: str) -> Target:
|
||
if re.match(r"\d+", target_text):
|
||
return Target(target_text)
|
||
elif match := re.match(r"(?:https?://)?weibo\.com/u/(\d+)", target_text):
|
||
# 都2202年了应该不会有http了吧,不过还是防一手
|
||
return Target(match.group(1))
|
||
else:
|
||
raise cls.ParseTargetException(prompt="正确格式:\n1. 用户数字UID\n2. https://weibo.com/u/xxxx")
|
||
|
||
async def get_sub_list(self, target: Target) -> list[RawPost]:
|
||
client = await self.ctx.get_client(target)
|
||
header = {"Referer": f"https://m.weibo.cn/u/{target}", "MWeibo-Pwa": "1", "X-Requested-With": "XMLHttpRequest"}
|
||
# 获取 cookie 见 https://docs.rsshub.app/zh/deploy/config#%E5%BE%AE%E5%8D%9A
|
||
params = {"containerid": "107603" + target}
|
||
res = await client.get("https://m.weibo.cn/api/container/getIndex?", headers=header, params=params, timeout=4.0)
|
||
res_data = json.loads(res.text)
|
||
if not res_data["ok"] and res_data["msg"] != "这里还没有内容":
|
||
raise ApiError(res.request.url)
|
||
|
||
def custom_filter(d: RawPost) -> bool:
|
||
return d["card_type"] == 9
|
||
|
||
return list(filter(custom_filter, res_data["data"]["cards"]))
|
||
|
||
def get_id(self, post: RawPost) -> Any:
|
||
return post["mblog"]["id"]
|
||
|
||
def filter_platform_custom(self, raw_post: RawPost) -> bool:
|
||
return raw_post["card_type"] == 9
|
||
|
||
def get_date(self, raw_post: RawPost) -> float:
|
||
created_time = datetime.strptime(raw_post["mblog"]["created_at"], "%a %b %d %H:%M:%S %z %Y")
|
||
return created_time.timestamp()
|
||
|
||
def get_tags(self, raw_post: RawPost) -> list[Tag] | None:
|
||
"Return Tag list of given RawPost"
|
||
text = raw_post["mblog"]["text"]
|
||
soup = bs(text, "html.parser")
|
||
res = [
|
||
x[1:-1]
|
||
for x in filter(
|
||
lambda s: s[0] == "#" and s[-1] == "#",
|
||
(x.text for x in soup.find_all("span", class_="surl-text")),
|
||
)
|
||
]
|
||
super_topic_img = soup.find("img", src=re.compile(r"timeline_card_small_super_default"))
|
||
if super_topic_img:
|
||
try:
|
||
res.append(super_topic_img.parent.parent.find("span", class_="surl-text").text + "超话") # type: ignore
|
||
except Exception:
|
||
logger.info(f"super_topic extract error: {text}")
|
||
return res
|
||
|
||
def get_category(self, raw_post: RawPost) -> Category:
|
||
if raw_post["mblog"].get("retweeted_status"):
|
||
return Category(1)
|
||
elif raw_post["mblog"].get("page_info") and raw_post["mblog"]["page_info"].get("type") == "video":
|
||
return Category(2)
|
||
elif raw_post["mblog"].get("pics"):
|
||
return Category(3)
|
||
else:
|
||
return Category(4)
|
||
|
||
def _get_text(self, raw_text: str) -> str:
|
||
text = raw_text.replace("<br/>", "\n").replace("<br />", "\n")
|
||
selector = HTML(text, parser=None)
|
||
if selector is None:
|
||
return text
|
||
url_elems = selector.xpath("//a[@href]/span[@class='surl-text']")
|
||
for br in selector.xpath("br"):
|
||
br.tail = "\n" + br.tail
|
||
for elem in url_elems:
|
||
url = elem.getparent().get("href")
|
||
if (
|
||
not elem.text.startswith("#")
|
||
and not elem.text.endswith("#")
|
||
and (url.startswith("https://weibo.cn/sinaurl?u=") or url.startswith("https://video.weibo.com"))
|
||
):
|
||
url = unquote(url.replace("https://weibo.cn/sinaurl?u=", ""))
|
||
elem.text = f"{elem.text}( {url} )"
|
||
return selector.xpath("string(.)")
|
||
|
||
async def _get_long_weibo(self, weibo_id: str) -> dict:
|
||
try:
|
||
client = await self.ctx.get_client()
|
||
weibo_info = await client.get(
|
||
"https://m.weibo.cn/statuses/show",
|
||
params={"id": weibo_id},
|
||
headers=_HEADER,
|
||
)
|
||
weibo_info = weibo_info.json()
|
||
if not weibo_info or weibo_info["ok"] != 1:
|
||
return {}
|
||
return weibo_info["data"]
|
||
except (KeyError, TimeoutError):
|
||
logger.info(f"detail message error: https://m.weibo.cn/detail/{weibo_id}")
|
||
return {}
|
||
|
||
async def _parse_weibo(self, info: dict) -> Post:
|
||
if info["isLongText"] or info["pic_num"] > 9:
|
||
info["text"] = (await self._get_long_weibo(info["mid"]))["text"]
|
||
parsed_text = self._get_text(info["text"])
|
||
raw_pics_list = info.get("pics", [])
|
||
pic_urls = [img["large"]["url"] for img in raw_pics_list]
|
||
# 视频cover
|
||
if "page_info" in info and info["page_info"].get("type") == "video":
|
||
crop_url = info["page_info"]["page_pic"]["url"]
|
||
pic_urls.append(
|
||
f"{URL(crop_url).scheme}://{URL(crop_url).host}/large/{info['page_info']['page_pic']['pid']}"
|
||
)
|
||
pics = []
|
||
for pic_url in pic_urls:
|
||
async with http_client(headers={"referer": "https://weibo.com"}) as client:
|
||
res = await client.get(pic_url)
|
||
res.raise_for_status()
|
||
pics.append(res.content)
|
||
detail_url = f"https://weibo.com/{info['user']['id']}/{info['bid']}"
|
||
return Post(
|
||
self,
|
||
content=parsed_text,
|
||
url=detail_url,
|
||
images=pics,
|
||
nickname=info["user"]["screen_name"],
|
||
)
|
||
|
||
async def parse(self, raw_post: RawPost) -> Post:
|
||
info = raw_post["mblog"]
|
||
post = await self._parse_weibo(info)
|
||
if "retweeted_status" in info:
|
||
post.repost = await self._parse_weibo(info["retweeted_status"])
|
||
return post
|