mirror of
https://github.com/suyiiyii/nonebot-bison.git
synced 2025-09-19 09:42:34 +08:00
Compare commits
3 Commits
80f924123d
...
75bbbb68e8
Author | SHA1 | Date | |
---|---|---|---|
75bbbb68e8 | |||
96573ec86e | |||
4c29cf10e4 |
@ -1,6 +1,8 @@
|
||||
import time
|
||||
|
||||
from fastapi import APIRouter
|
||||
from starlette.responses import Response
|
||||
from prometheus_client import CONTENT_TYPE_LATEST, Counter, generate_latest
|
||||
from prometheus_client import CONTENT_TYPE_LATEST, Gauge, Counter, Histogram, generate_latest
|
||||
|
||||
# Request counter
|
||||
request_counter = Counter(
|
||||
@ -10,6 +12,28 @@ request_counter = Counter(
|
||||
# Sent counter
|
||||
sent_counter = Counter("bison_sent_counter", "The number of sent messages", ["site_name", "platform_name", "target"])
|
||||
|
||||
cookie_choose_counter = Counter(
|
||||
"bison_cookie_choose_counter", "The number of cookie choose", ["site_name", "target", "cookie_id"]
|
||||
)
|
||||
|
||||
request_histogram = Histogram(
|
||||
"bison_request_histogram",
|
||||
"The time of platform used to request the source",
|
||||
["site_name", "platform_name"],
|
||||
buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60],
|
||||
)
|
||||
|
||||
render_histogram = Histogram(
|
||||
"bison_render_histogram",
|
||||
"The time of theme used to render",
|
||||
["site_name", "platform_name"],
|
||||
buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60],
|
||||
)
|
||||
|
||||
start_time = Gauge("bison_start_time", "The start time of the program")
|
||||
start_time.set(time.time())
|
||||
|
||||
|
||||
metrics_router = APIRouter(prefix="/api/metrics", tags=["metrics"])
|
||||
|
||||
|
||||
|
@ -1,12 +1,14 @@
|
||||
from dataclasses import dataclass
|
||||
from collections import defaultdict
|
||||
from collections.abc import Callable
|
||||
|
||||
from nonebot.log import logger
|
||||
from nonebot_plugin_apscheduler import scheduler
|
||||
from apscheduler.events import EVENT_JOB_MAX_INSTANCES
|
||||
from nonebot_plugin_saa.utils.exceptions import NoBotFound
|
||||
|
||||
from nonebot_bison.utils import ClientManager
|
||||
from nonebot_bison.metrics import sent_counter, request_counter
|
||||
from nonebot_bison.metrics import sent_counter, request_counter, render_histogram, request_histogram
|
||||
|
||||
from ..config import config
|
||||
from ..send import send_msgs
|
||||
@ -24,6 +26,15 @@ class Schedulable:
|
||||
use_batch: bool = False
|
||||
|
||||
|
||||
def handle_time_exceeded(event):
|
||||
# event.job_id 是该任务在 apscheduler 的 id, 进而可以获得该任务的函数,再获取该函数绑定的对象
|
||||
logger.warning(f"{scheduler.get_job(event.job_id).func.__self__.name} 抓取执行超时")
|
||||
scheduler.get_job(event.job_id).func.__self__.metrics_report(False)
|
||||
|
||||
|
||||
scheduler.add_listener(handle_time_exceeded, EVENT_JOB_MAX_INSTANCES)
|
||||
|
||||
|
||||
class Scheduler:
|
||||
schedulable_list: list[Schedulable] # for load weigth from db
|
||||
batch_api_target_cache: dict[str, dict[Target, list[Target]]] # platform_name -> (target -> [target])
|
||||
@ -56,6 +67,7 @@ class Scheduler:
|
||||
|
||||
self.platform_name_list = platform_name_list
|
||||
self.pre_weight_val = 0 # 轮调度中“本轮”增加权重和的初值
|
||||
self.metrics_report: Callable[[bool], None] | None = None # 作为函数变量,允许外部调用来上报此次抓取是否成功
|
||||
logger.info(
|
||||
f"register scheduler for {self.name} with "
|
||||
f"{self.scheduler_config.schedule_type} {self.scheduler_config.schedule_setting}"
|
||||
@ -97,20 +109,30 @@ class Scheduler:
|
||||
|
||||
success_flag = False
|
||||
platform_obj = platform_manager[schedulable.platform_name](context)
|
||||
# 通过闭包的形式,将此次抓取任务的信息保存为函数变量,允许在该任务无法正常结束时由外部上报
|
||||
self.metrics_report = lambda x: request_counter.labels(
|
||||
platform_name=schedulable.platform_name,
|
||||
site_name=platform_obj.site.name,
|
||||
target=schedulable.target,
|
||||
success=x,
|
||||
).inc()
|
||||
try:
|
||||
if schedulable.use_batch:
|
||||
batch_targets = self.batch_api_target_cache[schedulable.platform_name][schedulable.target]
|
||||
sub_units = []
|
||||
for batch_target in batch_targets:
|
||||
userinfo = await config.get_platform_target_subscribers(schedulable.platform_name, batch_target)
|
||||
sub_units.append(SubUnit(batch_target, userinfo))
|
||||
to_send = await platform_obj.do_batch_fetch_new_post(sub_units)
|
||||
else:
|
||||
send_userinfo_list = await config.get_platform_target_subscribers(
|
||||
schedulable.platform_name, schedulable.target
|
||||
)
|
||||
to_send = await platform_obj.do_fetch_new_post(SubUnit(schedulable.target, send_userinfo_list))
|
||||
success_flag = True
|
||||
with request_histogram.labels(
|
||||
platform_name=schedulable.platform_name, site_name=platform_obj.site.name
|
||||
).time():
|
||||
if schedulable.use_batch:
|
||||
batch_targets = self.batch_api_target_cache[schedulable.platform_name][schedulable.target]
|
||||
sub_units = []
|
||||
for batch_target in batch_targets:
|
||||
userinfo = await config.get_platform_target_subscribers(schedulable.platform_name, batch_target)
|
||||
sub_units.append(SubUnit(batch_target, userinfo))
|
||||
to_send = await platform_obj.do_batch_fetch_new_post(sub_units)
|
||||
else:
|
||||
send_userinfo_list = await config.get_platform_target_subscribers(
|
||||
schedulable.platform_name, schedulable.target
|
||||
)
|
||||
to_send = await platform_obj.do_fetch_new_post(SubUnit(schedulable.target, send_userinfo_list))
|
||||
success_flag = True
|
||||
except SkipRequestException as err:
|
||||
logger.debug(f"skip request: {err}")
|
||||
except Exception as err:
|
||||
@ -120,27 +142,23 @@ class Scheduler:
|
||||
err.args += (records,)
|
||||
raise
|
||||
|
||||
request_counter.labels(
|
||||
platform_name=schedulable.platform_name,
|
||||
site_name=platform_obj.site.name,
|
||||
target=schedulable.target,
|
||||
success=success_flag,
|
||||
).inc()
|
||||
self.metrics_report(success_flag)
|
||||
if not to_send:
|
||||
return
|
||||
sent_counter.labels(
|
||||
platform_name=schedulable.platform_name, site_name=platform_obj.site.name, target=schedulable.target
|
||||
).inc()
|
||||
for user, send_list in to_send:
|
||||
for send_post in send_list:
|
||||
logger.info(f"send to {user}: {send_post}")
|
||||
try:
|
||||
await send_msgs(
|
||||
user,
|
||||
await send_post.generate_messages(),
|
||||
)
|
||||
except NoBotFound:
|
||||
logger.warning("no bot connected")
|
||||
with render_histogram.labels(platform_name=schedulable.platform_name, site_name=platform_obj.site.name).time():
|
||||
for user, send_list in to_send:
|
||||
for send_post in send_list:
|
||||
logger.info(f"send to {user}: {send_post}")
|
||||
try:
|
||||
await send_msgs(
|
||||
user,
|
||||
await send_post.generate_messages(),
|
||||
)
|
||||
except NoBotFound:
|
||||
logger.warning("no bot connected")
|
||||
|
||||
def insert_new_schedulable(self, platform_name: str, target: Target):
|
||||
self.pre_weight_val += 1000
|
||||
|
@ -13,6 +13,7 @@ from ..types import Target
|
||||
from ..config import config
|
||||
from .http import http_client
|
||||
from ..config.db_model import Cookie
|
||||
from ..metrics import cookie_choose_counter
|
||||
|
||||
|
||||
class ClientManager(ABC):
|
||||
@ -131,6 +132,7 @@ class CookieClientManager(ClientManager):
|
||||
"""获取 client,根据 target 选择 cookie"""
|
||||
client = http_client()
|
||||
cookie = await self._choose_cookie(target)
|
||||
cookie_choose_counter.labels(site_name=self._site_name, target=target, cookie_id=cookie.id).inc()
|
||||
if cookie.is_universal:
|
||||
logger.trace(f"平台 {self._site_name} 未获取到用户cookie, 使用匿名cookie")
|
||||
else:
|
||||
|
Loading…
x
Reference in New Issue
Block a user