mirror of
https://github.com/suyiiyii/nonebot-bison.git
synced 2026-05-09 18:27:56 +08:00
update
This commit is contained in:
@@ -2,15 +2,23 @@ import importlib
|
||||
from pathlib import Path
|
||||
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from nonebot import get_driver
|
||||
from nonebot import get_driver, on_command
|
||||
import nonebot
|
||||
from nonebot.adapters.cqhttp.bot import Bot
|
||||
from nonebot.adapters.cqhttp.event import GroupMessageEvent, PrivateMessageEvent
|
||||
from nonebot.drivers.fastapi import Driver
|
||||
from nonebot.log import logger
|
||||
from nonebot.rule import to_me
|
||||
from nonebot.typing import T_State
|
||||
import socketio
|
||||
|
||||
from .api import test, get_global_conf
|
||||
from .api import test, get_global_conf, auth
|
||||
from .token_manager import token_manager as tm
|
||||
from ..plugin_config import plugin_config
|
||||
|
||||
URL_BASE = '/hk_reporter/'
|
||||
GLOBAL_CONF_URL = f'{URL_BASE}api/global_conf'
|
||||
AUTH_URL = f'{URL_BASE}api/auth'
|
||||
TEST_URL = f'{URL_BASE}test'
|
||||
|
||||
sio = socketio.AsyncServer(async_mode="asgi", cors_allowed_origins="*")
|
||||
@@ -21,6 +29,7 @@ def register_router_fastapi(driver: Driver, socketio):
|
||||
static_path = str((Path(__file__).parent / "dist").resolve())
|
||||
app.get(TEST_URL)(test)
|
||||
app.get(GLOBAL_CONF_URL)(get_global_conf)
|
||||
app.get(AUTH_URL)(auth)
|
||||
app.mount(URL_BASE, StaticFiles(directory=static_path, html=True), name="hk_reporter")
|
||||
|
||||
def init():
|
||||
@@ -39,3 +48,15 @@ def init():
|
||||
f"<b><u>http://{host}:{port}{URL_BASE}</u></b>")
|
||||
|
||||
init()
|
||||
|
||||
get_token = on_command('后台管理', rule=to_me(), priority=5)
|
||||
@get_token.handle()
|
||||
async def send_token(bot: "Bot", event: PrivateMessageEvent, state: T_State):
|
||||
driver = nonebot.get_driver()
|
||||
superusers = driver.config.superusers
|
||||
if event.get_user_id() not in superusers:
|
||||
await get_token.finish('你不是管理员')
|
||||
else:
|
||||
token = tm.get_user_token((event.get_user_id(), event.sender.nickname))
|
||||
await get_token.finish(f'请访问: {plugin_config.hk_reporter_outer_url}auth/{token}')
|
||||
|
||||
|
||||
@@ -1,4 +1,9 @@
|
||||
from ..platform import platform_manager
|
||||
from .token_manager import token_manager
|
||||
from .jwt import pack_jwt
|
||||
from ..config import Config
|
||||
import nonebot
|
||||
from nonebot.adapters.cqhttp.bot import Bot
|
||||
|
||||
async def test():
|
||||
return {"status": 200, "text": "test"}
|
||||
@@ -15,3 +20,37 @@ async def get_global_conf():
|
||||
})
|
||||
return { 'platformConf': res }
|
||||
|
||||
|
||||
async def auth(token: str):
|
||||
if qq_tuple := token_manager.get_user(token):
|
||||
qq, nickname = qq_tuple
|
||||
bot = nonebot.get_bot()
|
||||
assert(isinstance(bot, Bot))
|
||||
groups = await bot.call_api('get_group_list')
|
||||
if str(qq) in nonebot.get_driver().config.superusers:
|
||||
jwt_obj = {
|
||||
'id': str(qq),
|
||||
'groups': list(map(
|
||||
lambda info: {'id': info['group_id'], 'name': info['group_name']},
|
||||
groups)),
|
||||
}
|
||||
ret_obj = {
|
||||
'type': 'admin',
|
||||
'name': nickname,
|
||||
'id': str(qq),
|
||||
'token': pack_jwt(jwt_obj)
|
||||
}
|
||||
return { 'status': 200, **ret_obj }
|
||||
else:
|
||||
return { 'status': 400, 'type': '', 'name': '', 'id': '', 'token': '' }
|
||||
else:
|
||||
return { 'status': 400, 'type': '', 'name': '', 'id': '', 'token': '' }
|
||||
|
||||
async def get_subs_info(jwt_obj: dict):
|
||||
groups = jwt_obj['groups']
|
||||
res = {}
|
||||
for group in groups:
|
||||
group_id = group['id']
|
||||
config = Config()
|
||||
|
||||
subs = list(map(lambda sub: {'targetType': sub['target_type'], 'target': sub['target'], 'targetName': sub['target_name'], 'cats': sub['cats'], 'tags': sub['tags']},
|
||||
|
||||
@@ -0,0 +1,20 @@
|
||||
import random
|
||||
import string
|
||||
from typing import Optional
|
||||
import jwt
|
||||
import datetime
|
||||
|
||||
_key = ''.join(random.SystemRandom().choice(string.ascii_letters) for _ in range(16))
|
||||
|
||||
def pack_jwt(obj: dict) -> str:
|
||||
return jwt.encode(
|
||||
{'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=1), **obj},
|
||||
_key, algorithm='HS256'
|
||||
)
|
||||
|
||||
def load_jwt(token: str) -> Optional[dict]:
|
||||
try:
|
||||
return jwt.decode(token, _key, algorithm='HS256')
|
||||
except:
|
||||
return None
|
||||
|
||||
@@ -0,0 +1,24 @@
|
||||
from typing import Optional
|
||||
from expiringdict import ExpiringDict
|
||||
import random
|
||||
import string
|
||||
|
||||
class TokenManager:
|
||||
|
||||
def __init__(self):
|
||||
self.token_manager = ExpiringDict(max_len=100, max_age_seconds=60*10)
|
||||
|
||||
def get_user(self, token: str) -> Optional[tuple]:
|
||||
res = self.token_manager.get(token)
|
||||
assert(res is None or isinstance(res, tuple))
|
||||
return res
|
||||
|
||||
def save_user(self, token: str, qq: tuple) -> None:
|
||||
self.token_manager[token] = qq
|
||||
|
||||
def get_user_token(self, qq: tuple) -> str:
|
||||
token = ''.join(random.choices(string.ascii_letters + string.digits, k=16))
|
||||
self.save_user(token, qq)
|
||||
return token
|
||||
|
||||
token_manager = TokenManager()
|
||||
@@ -64,6 +64,9 @@ class Config(metaclass=Singleton):
|
||||
if user_sub := self.user_target.get((query.user == user) & (query.user_type ==user_type)):
|
||||
return user_sub['subs']
|
||||
return []
|
||||
|
||||
def get_all_subscribe(self):
|
||||
return self.user_target
|
||||
|
||||
def del_subscribe(self, user, user_type, target, target_type):
|
||||
user_query = Query()
|
||||
|
||||
@@ -10,6 +10,7 @@ class PlugConfig(BaseSettings):
|
||||
hk_reporter_use_local: bool = False
|
||||
hk_reporter_browser: str = ''
|
||||
hk_reporter_init_filter: bool = True
|
||||
hk_reporter_outer_url: str = 'http://localhost:8080/hk_reporter/'
|
||||
|
||||
class Config:
|
||||
extra = 'ignore'
|
||||
|
||||
Reference in New Issue
Block a user