7 Commits

Author SHA1 Message Date
suyiiyii 75bbbb68e8 支持上报超时的任务 2024-12-16 21:48:22 +08:00
suyiiyii 96573ec86e 添加 request_histogram 和 render_histogram 2024-12-12 19:43:44 +08:00
suyiiyii 4c29cf10e4 添加 cookie_choose_counter 2024-12-12 14:54:12 +08:00
suyiiyii 80f924123d 使用 label 简化 request_counter 2024-12-12 13:08:27 +08:00
suyiiyii 073bd314fc ♻️ 整理代码结构 2024-12-12 11:28:32 +08:00
suyiiyii 63f59ada3c 初步添加 metrics 2024-12-12 11:28:28 +08:00
suyiiyii 9fef8028c5 添加 NewMessage 类型 Platform 的抓取数量日志提示 2024-12-12 11:27:50 +08:00
7 changed files with 122 additions and 26 deletions
+2
View File
@@ -12,6 +12,7 @@ from nonebot.adapters.onebot.v11.event import PrivateMessageEvent
from .api import router as api_router from .api import router as api_router
from ..plugin_config import plugin_config from ..plugin_config import plugin_config
from .token_manager import token_manager as tm from .token_manager import token_manager as tm
from ..metrics import metrics_router as metrics_router
if TYPE_CHECKING: if TYPE_CHECKING:
from nonebot.drivers.fastapi import Driver from nonebot.drivers.fastapi import Driver
@@ -46,6 +47,7 @@ def init_fastapi(driver: "Driver"):
description="nonebot-bison webui and api", description="nonebot-bison webui and api",
) )
nonebot_app.include_router(api_router) nonebot_app.include_router(api_router)
nonebot_app.include_router(metrics_router)
nonebot_app.mount("/", SinglePageApplication(directory=static_path), name="bison-frontend") nonebot_app.mount("/", SinglePageApplication(directory=static_path), name="bison-frontend")
app = driver.server_app app = driver.server_app
+42
View File
@@ -0,0 +1,42 @@
import time
from fastapi import APIRouter
from starlette.responses import Response
from prometheus_client import CONTENT_TYPE_LATEST, Gauge, Counter, Histogram, generate_latest
# Request counter
request_counter = Counter(
"bison_request_counter", "The number of requests", ["site_name", "platform_name", "target", "success"]
)
# Sent counter
sent_counter = Counter("bison_sent_counter", "The number of sent messages", ["site_name", "platform_name", "target"])
cookie_choose_counter = Counter(
"bison_cookie_choose_counter", "The number of cookie choose", ["site_name", "target", "cookie_id"]
)
request_histogram = Histogram(
"bison_request_histogram",
"The time of platform used to request the source",
["site_name", "platform_name"],
buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60],
)
render_histogram = Histogram(
"bison_render_histogram",
"The time of theme used to render",
["site_name", "platform_name"],
buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60],
)
start_time = Gauge("bison_start_time", "The start time of the program")
start_time.set(time.time())
metrics_router = APIRouter(prefix="/api/metrics", tags=["metrics"])
@metrics_router.get("")
async def metrics():
return Response(media_type=CONTENT_TYPE_LATEST, content=generate_latest())
+1
View File
@@ -315,6 +315,7 @@ class NewMessage(MessageProcess, abstract=True):
res.append(raw_post) res.append(raw_post)
store.exists_posts.add(post_id) store.exists_posts.add(post_id)
self.set_stored_data(target, store) self.set_stored_data(target, store)
logger.trace(f"本次抓取 {len(raw_post_list)} 条,过滤后 {len(filtered_post)} 条,新消息 {len(res)}")
return res return res
async def _handle_new_post( async def _handle_new_post(
+31 -2
View File
@@ -1,11 +1,14 @@
from dataclasses import dataclass from dataclasses import dataclass
from collections import defaultdict from collections import defaultdict
from collections.abc import Callable
from nonebot.log import logger from nonebot.log import logger
from nonebot_plugin_apscheduler import scheduler from nonebot_plugin_apscheduler import scheduler
from apscheduler.events import EVENT_JOB_MAX_INSTANCES
from nonebot_plugin_saa.utils.exceptions import NoBotFound from nonebot_plugin_saa.utils.exceptions import NoBotFound
from nonebot_bison.utils import ClientManager from nonebot_bison.utils import ClientManager
from nonebot_bison.metrics import sent_counter, request_counter, render_histogram, request_histogram
from ..config import config from ..config import config
from ..send import send_msgs from ..send import send_msgs
@@ -23,6 +26,15 @@ class Schedulable:
use_batch: bool = False use_batch: bool = False
def handle_time_exceeded(event):
# event.job_id 是该任务在 apscheduler 的 id, 进而可以获得该任务的函数,再获取该函数绑定的对象
logger.warning(f"{scheduler.get_job(event.job_id).func.__self__.name} 抓取执行超时")
scheduler.get_job(event.job_id).func.__self__.metrics_report(False)
scheduler.add_listener(handle_time_exceeded, EVENT_JOB_MAX_INSTANCES)
class Scheduler: class Scheduler:
schedulable_list: list[Schedulable] # for load weigth from db schedulable_list: list[Schedulable] # for load weigth from db
batch_api_target_cache: dict[str, dict[Target, list[Target]]] # platform_name -> (target -> [target]) batch_api_target_cache: dict[str, dict[Target, list[Target]]] # platform_name -> (target -> [target])
@@ -55,6 +67,7 @@ class Scheduler:
self.platform_name_list = platform_name_list self.platform_name_list = platform_name_list
self.pre_weight_val = 0 # 轮调度中“本轮”增加权重和的初值 self.pre_weight_val = 0 # 轮调度中“本轮”增加权重和的初值
self.metrics_report: Callable[[bool], None] | None = None # 作为函数变量,允许外部调用来上报此次抓取是否成功
logger.info( logger.info(
f"register scheduler for {self.name} with " f"register scheduler for {self.name} with "
f"{self.scheduler_config.schedule_type} {self.scheduler_config.schedule_setting}" f"{self.scheduler_config.schedule_type} {self.scheduler_config.schedule_setting}"
@@ -94,8 +107,19 @@ class Scheduler:
context = ProcessContext(self.client_mgr) context = ProcessContext(self.client_mgr)
try: success_flag = False
platform_obj = platform_manager[schedulable.platform_name](context) platform_obj = platform_manager[schedulable.platform_name](context)
# 通过闭包的形式,将此次抓取任务的信息保存为函数变量,允许在该任务无法正常结束时由外部上报
self.metrics_report = lambda x: request_counter.labels(
platform_name=schedulable.platform_name,
site_name=platform_obj.site.name,
target=schedulable.target,
success=x,
).inc()
try:
with request_histogram.labels(
platform_name=schedulable.platform_name, site_name=platform_obj.site.name
).time():
if schedulable.use_batch: if schedulable.use_batch:
batch_targets = self.batch_api_target_cache[schedulable.platform_name][schedulable.target] batch_targets = self.batch_api_target_cache[schedulable.platform_name][schedulable.target]
sub_units = [] sub_units = []
@@ -108,6 +132,7 @@ class Scheduler:
schedulable.platform_name, schedulable.target schedulable.platform_name, schedulable.target
) )
to_send = await platform_obj.do_fetch_new_post(SubUnit(schedulable.target, send_userinfo_list)) to_send = await platform_obj.do_fetch_new_post(SubUnit(schedulable.target, send_userinfo_list))
success_flag = True
except SkipRequestException as err: except SkipRequestException as err:
logger.debug(f"skip request: {err}") logger.debug(f"skip request: {err}")
except Exception as err: except Exception as err:
@@ -117,9 +142,13 @@ class Scheduler:
err.args += (records,) err.args += (records,)
raise raise
self.metrics_report(success_flag)
if not to_send: if not to_send:
return return
sent_counter.labels(
platform_name=schedulable.platform_name, site_name=platform_obj.site.name, target=schedulable.target
).inc()
with render_histogram.labels(platform_name=schedulable.platform_name, site_name=platform_obj.site.name).time():
for user, send_list in to_send: for user, send_list in to_send:
for send_post in send_list: for send_post in send_list:
logger.info(f"send to {user}: {send_post}") logger.info(f"send to {user}: {send_post}")
+2
View File
@@ -13,6 +13,7 @@ from ..types import Target
from ..config import config from ..config import config
from .http import http_client from .http import http_client
from ..config.db_model import Cookie from ..config.db_model import Cookie
from ..metrics import cookie_choose_counter
class ClientManager(ABC): class ClientManager(ABC):
@@ -131,6 +132,7 @@ class CookieClientManager(ClientManager):
"""获取 client,根据 target 选择 cookie""" """获取 client,根据 target 选择 cookie"""
client = http_client() client = http_client()
cookie = await self._choose_cookie(target) cookie = await self._choose_cookie(target)
cookie_choose_counter.labels(site_name=self._site_name, target=target, cookie_id=cookie.id).inc()
if cookie.is_universal: if cookie.is_universal:
logger.trace(f"平台 {self._site_name} 未获取到用户cookie, 使用匿名cookie") logger.trace(f"平台 {self._site_name} 未获取到用户cookie, 使用匿名cookie")
else: else:
Generated
+21 -2
View File
@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. # This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand.
[[package]] [[package]]
name = "aiodns" name = "aiodns"
@@ -3207,6 +3207,25 @@ type = "legacy"
url = "https://pypi.org/simple" url = "https://pypi.org/simple"
reference = "offical-source" reference = "offical-source"
[[package]]
name = "prometheus-client"
version = "0.21.0"
description = "Python client for the Prometheus monitoring system."
optional = false
python-versions = ">=3.8"
files = [
{file = "prometheus_client-0.21.0-py3-none-any.whl", hash = "sha256:4fa6b4dd0ac16d58bb587c04b1caae65b8c5043e85f778f42f5f632f6af2e166"},
{file = "prometheus_client-0.21.0.tar.gz", hash = "sha256:96c83c606b71ff2b0a433c98889d275f51ffec6c5e267de37c7a2b5c9aa9233e"},
]
[package.extras]
twisted = ["twisted"]
[package.source]
type = "legacy"
url = "https://pypi.org/simple"
reference = "offical-source"
[[package]] [[package]]
name = "prompt-toolkit" name = "prompt-toolkit"
version = "3.0.47" version = "3.0.47"
@@ -5166,4 +5185,4 @@ yaml = []
[metadata] [metadata]
lock-version = "2.0" lock-version = "2.0"
python-versions = ">=3.10,<4.0.0" python-versions = ">=3.10,<4.0.0"
content-hash = "3d3bd947b91b8053fc5fed4873b6d0ed4017a5be118611cd93d30ffa265e04fb" content-hash = "5e4ea27ea11e18451d1ad0d4bbf3b44da9334c20728889041a4c908addbfdda8"
+1
View File
@@ -42,6 +42,7 @@ yarl = ">=1.11.1"
hishel = "^0.0.30" hishel = "^0.0.30"
expiringdictx = "^1.1.0" expiringdictx = "^1.1.0"
rapidfuzz = "^3.9.7" rapidfuzz = "^3.9.7"
prometheus-client = "^0.21.0"
[tool.poetry.group.dev.dependencies] [tool.poetry.group.dev.dependencies]
black = ">=24.8.0,<25.0" black = ">=24.8.0,<25.0"