mirror of
				https://github.com/suyiiyii/nonebot-bison.git
				synced 2025-11-04 13:34:52 +08:00 
			
		
		
		
	Compare commits
	
		
			3 Commits
		
	
	
		
			80f924123d
			...
			75bbbb68e8
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 75bbbb68e8 | |||
| 96573ec86e | |||
| 4c29cf10e4 | 
@ -1,6 +1,8 @@
 | 
			
		||||
import time
 | 
			
		||||
 | 
			
		||||
from fastapi import APIRouter
 | 
			
		||||
from starlette.responses import Response
 | 
			
		||||
from prometheus_client import CONTENT_TYPE_LATEST, Counter, generate_latest
 | 
			
		||||
from prometheus_client import CONTENT_TYPE_LATEST, Gauge, Counter, Histogram, generate_latest
 | 
			
		||||
 | 
			
		||||
# Request counter
 | 
			
		||||
request_counter = Counter(
 | 
			
		||||
@ -10,6 +12,28 @@ request_counter = Counter(
 | 
			
		||||
# Sent counter
 | 
			
		||||
sent_counter = Counter("bison_sent_counter", "The number of sent messages", ["site_name", "platform_name", "target"])
 | 
			
		||||
 | 
			
		||||
cookie_choose_counter = Counter(
 | 
			
		||||
    "bison_cookie_choose_counter", "The number of cookie choose", ["site_name", "target", "cookie_id"]
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
request_histogram = Histogram(
 | 
			
		||||
    "bison_request_histogram",
 | 
			
		||||
    "The time of platform used to request the source",
 | 
			
		||||
    ["site_name", "platform_name"],
 | 
			
		||||
    buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60],
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
render_histogram = Histogram(
 | 
			
		||||
    "bison_render_histogram",
 | 
			
		||||
    "The time of theme used to render",
 | 
			
		||||
    ["site_name", "platform_name"],
 | 
			
		||||
    buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60],
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
start_time = Gauge("bison_start_time", "The start time of the program")
 | 
			
		||||
start_time.set(time.time())
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
metrics_router = APIRouter(prefix="/api/metrics", tags=["metrics"])
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -1,12 +1,14 @@
 | 
			
		||||
from dataclasses import dataclass
 | 
			
		||||
from collections import defaultdict
 | 
			
		||||
from collections.abc import Callable
 | 
			
		||||
 | 
			
		||||
from nonebot.log import logger
 | 
			
		||||
from nonebot_plugin_apscheduler import scheduler
 | 
			
		||||
from apscheduler.events import EVENT_JOB_MAX_INSTANCES
 | 
			
		||||
from nonebot_plugin_saa.utils.exceptions import NoBotFound
 | 
			
		||||
 | 
			
		||||
from nonebot_bison.utils import ClientManager
 | 
			
		||||
from nonebot_bison.metrics import sent_counter, request_counter
 | 
			
		||||
from nonebot_bison.metrics import sent_counter, request_counter, render_histogram, request_histogram
 | 
			
		||||
 | 
			
		||||
from ..config import config
 | 
			
		||||
from ..send import send_msgs
 | 
			
		||||
@ -24,6 +26,15 @@ class Schedulable:
 | 
			
		||||
    use_batch: bool = False
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def handle_time_exceeded(event):
 | 
			
		||||
    # event.job_id 是该任务在 apscheduler 的 id, 进而可以获得该任务的函数,再获取该函数绑定的对象
 | 
			
		||||
    logger.warning(f"{scheduler.get_job(event.job_id).func.__self__.name} 抓取执行超时")
 | 
			
		||||
    scheduler.get_job(event.job_id).func.__self__.metrics_report(False)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
scheduler.add_listener(handle_time_exceeded, EVENT_JOB_MAX_INSTANCES)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Scheduler:
 | 
			
		||||
    schedulable_list: list[Schedulable]  # for load weigth from db
 | 
			
		||||
    batch_api_target_cache: dict[str, dict[Target, list[Target]]]  # platform_name -> (target -> [target])
 | 
			
		||||
@ -56,6 +67,7 @@ class Scheduler:
 | 
			
		||||
 | 
			
		||||
        self.platform_name_list = platform_name_list
 | 
			
		||||
        self.pre_weight_val = 0  # 轮调度中“本轮”增加权重和的初值
 | 
			
		||||
        self.metrics_report: Callable[[bool], None] | None = None  # 作为函数变量,允许外部调用来上报此次抓取是否成功
 | 
			
		||||
        logger.info(
 | 
			
		||||
            f"register scheduler for {self.name} with "
 | 
			
		||||
            f"{self.scheduler_config.schedule_type} {self.scheduler_config.schedule_setting}"
 | 
			
		||||
@ -97,20 +109,30 @@ class Scheduler:
 | 
			
		||||
 | 
			
		||||
        success_flag = False
 | 
			
		||||
        platform_obj = platform_manager[schedulable.platform_name](context)
 | 
			
		||||
        # 通过闭包的形式,将此次抓取任务的信息保存为函数变量,允许在该任务无法正常结束时由外部上报
 | 
			
		||||
        self.metrics_report = lambda x: request_counter.labels(
 | 
			
		||||
            platform_name=schedulable.platform_name,
 | 
			
		||||
            site_name=platform_obj.site.name,
 | 
			
		||||
            target=schedulable.target,
 | 
			
		||||
            success=x,
 | 
			
		||||
        ).inc()
 | 
			
		||||
        try:
 | 
			
		||||
            if schedulable.use_batch:
 | 
			
		||||
                batch_targets = self.batch_api_target_cache[schedulable.platform_name][schedulable.target]
 | 
			
		||||
                sub_units = []
 | 
			
		||||
                for batch_target in batch_targets:
 | 
			
		||||
                    userinfo = await config.get_platform_target_subscribers(schedulable.platform_name, batch_target)
 | 
			
		||||
                    sub_units.append(SubUnit(batch_target, userinfo))
 | 
			
		||||
                to_send = await platform_obj.do_batch_fetch_new_post(sub_units)
 | 
			
		||||
            else:
 | 
			
		||||
                send_userinfo_list = await config.get_platform_target_subscribers(
 | 
			
		||||
                    schedulable.platform_name, schedulable.target
 | 
			
		||||
                )
 | 
			
		||||
                to_send = await platform_obj.do_fetch_new_post(SubUnit(schedulable.target, send_userinfo_list))
 | 
			
		||||
            success_flag = True
 | 
			
		||||
            with request_histogram.labels(
 | 
			
		||||
                platform_name=schedulable.platform_name, site_name=platform_obj.site.name
 | 
			
		||||
            ).time():
 | 
			
		||||
                if schedulable.use_batch:
 | 
			
		||||
                    batch_targets = self.batch_api_target_cache[schedulable.platform_name][schedulable.target]
 | 
			
		||||
                    sub_units = []
 | 
			
		||||
                    for batch_target in batch_targets:
 | 
			
		||||
                        userinfo = await config.get_platform_target_subscribers(schedulable.platform_name, batch_target)
 | 
			
		||||
                        sub_units.append(SubUnit(batch_target, userinfo))
 | 
			
		||||
                    to_send = await platform_obj.do_batch_fetch_new_post(sub_units)
 | 
			
		||||
                else:
 | 
			
		||||
                    send_userinfo_list = await config.get_platform_target_subscribers(
 | 
			
		||||
                        schedulable.platform_name, schedulable.target
 | 
			
		||||
                    )
 | 
			
		||||
                    to_send = await platform_obj.do_fetch_new_post(SubUnit(schedulable.target, send_userinfo_list))
 | 
			
		||||
                success_flag = True
 | 
			
		||||
        except SkipRequestException as err:
 | 
			
		||||
            logger.debug(f"skip request: {err}")
 | 
			
		||||
        except Exception as err:
 | 
			
		||||
@ -120,27 +142,23 @@ class Scheduler:
 | 
			
		||||
            err.args += (records,)
 | 
			
		||||
            raise
 | 
			
		||||
 | 
			
		||||
        request_counter.labels(
 | 
			
		||||
            platform_name=schedulable.platform_name,
 | 
			
		||||
            site_name=platform_obj.site.name,
 | 
			
		||||
            target=schedulable.target,
 | 
			
		||||
            success=success_flag,
 | 
			
		||||
        ).inc()
 | 
			
		||||
        self.metrics_report(success_flag)
 | 
			
		||||
        if not to_send:
 | 
			
		||||
            return
 | 
			
		||||
        sent_counter.labels(
 | 
			
		||||
            platform_name=schedulable.platform_name, site_name=platform_obj.site.name, target=schedulable.target
 | 
			
		||||
        ).inc()
 | 
			
		||||
        for user, send_list in to_send:
 | 
			
		||||
            for send_post in send_list:
 | 
			
		||||
                logger.info(f"send to {user}: {send_post}")
 | 
			
		||||
                try:
 | 
			
		||||
                    await send_msgs(
 | 
			
		||||
                        user,
 | 
			
		||||
                        await send_post.generate_messages(),
 | 
			
		||||
                    )
 | 
			
		||||
                except NoBotFound:
 | 
			
		||||
                    logger.warning("no bot connected")
 | 
			
		||||
        with render_histogram.labels(platform_name=schedulable.platform_name, site_name=platform_obj.site.name).time():
 | 
			
		||||
            for user, send_list in to_send:
 | 
			
		||||
                for send_post in send_list:
 | 
			
		||||
                    logger.info(f"send to {user}: {send_post}")
 | 
			
		||||
                    try:
 | 
			
		||||
                        await send_msgs(
 | 
			
		||||
                            user,
 | 
			
		||||
                            await send_post.generate_messages(),
 | 
			
		||||
                        )
 | 
			
		||||
                    except NoBotFound:
 | 
			
		||||
                        logger.warning("no bot connected")
 | 
			
		||||
 | 
			
		||||
    def insert_new_schedulable(self, platform_name: str, target: Target):
 | 
			
		||||
        self.pre_weight_val += 1000
 | 
			
		||||
 | 
			
		||||
@ -13,6 +13,7 @@ from ..types import Target
 | 
			
		||||
from ..config import config
 | 
			
		||||
from .http import http_client
 | 
			
		||||
from ..config.db_model import Cookie
 | 
			
		||||
from ..metrics import cookie_choose_counter
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ClientManager(ABC):
 | 
			
		||||
@ -131,6 +132,7 @@ class CookieClientManager(ClientManager):
 | 
			
		||||
        """获取 client,根据 target 选择 cookie"""
 | 
			
		||||
        client = http_client()
 | 
			
		||||
        cookie = await self._choose_cookie(target)
 | 
			
		||||
        cookie_choose_counter.labels(site_name=self._site_name, target=target, cookie_id=cookie.id).inc()
 | 
			
		||||
        if cookie.is_universal:
 | 
			
		||||
            logger.trace(f"平台 {self._site_name} 未获取到用户cookie, 使用匿名cookie")
 | 
			
		||||
        else:
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user