mirror of
https://github.com/suyiiyii/nonebot-bison.git
synced 2026-05-09 18:27:56 +08:00
unfinish
This commit is contained in:
@@ -1,5 +1,7 @@
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
import os
|
||||
from typing import Union
|
||||
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from fastapi.templating import Jinja2Templates
|
||||
@@ -31,6 +33,17 @@ TEST_URL = f'{URL_BASE}test'
|
||||
sio = socketio.AsyncServer(async_mode="asgi", cors_allowed_origins="*")
|
||||
socket_app = socketio.ASGIApp(sio, socketio_path="socket")
|
||||
|
||||
class SinglePageApplication(StaticFiles):
|
||||
|
||||
def __init__(self, directory: os.PathLike, index='index.html'):
|
||||
self.index = index
|
||||
super().__init__(directory=directory, packages=None, html=True, check_dir=True)
|
||||
|
||||
async def lookup_path(self, path: str) -> tuple[str, Union[os.stat_result, None]]:
|
||||
full_path, stat_res = await super().lookup_path(path)
|
||||
if stat_res is None:
|
||||
return await super().lookup_path(self.index)
|
||||
return (full_path, stat_res)
|
||||
|
||||
def register_router_fastapi(driver: Driver, socketio):
|
||||
from fastapi.security import OAuth2PasswordBearer
|
||||
@@ -61,7 +74,7 @@ def register_router_fastapi(driver: Driver, socketio):
|
||||
tags: list[str]
|
||||
|
||||
app = driver.server_app
|
||||
static_path = str((Path(__file__).parent / "dist").resolve())
|
||||
static_path = (Path(__file__).parent / "dist").resolve()
|
||||
app.get(TEST_URL)(test)
|
||||
app.get(GLOBAL_CONF_URL)(get_global_conf)
|
||||
app.get(AUTH_URL)(auth)
|
||||
@@ -76,15 +89,12 @@ def register_router_fastapi(driver: Driver, socketio):
|
||||
async def _add_group_subs(groupNumber: str, req: AddSubscribeReq):
|
||||
return await add_group_sub(group_number=groupNumber, platform_name=req.platformName,
|
||||
target=req.target, target_name=req.targetName, cats=req.categories, tags=req.tags)
|
||||
|
||||
@app.delete(SUBSCRIBE_URL, dependencies=[Depends(check_group_permission)])
|
||||
async def _del_group_subs(groupNumber: str, target: str, platformName: str):
|
||||
return await del_group_sub(groupNumber, platformName, target)
|
||||
app.mount(URL_BASE, StaticFiles(directory=static_path, html=True), name="bison")
|
||||
templates = Jinja2Templates(directory=static_path)
|
||||
|
||||
@app.get(f'{URL_BASE}{{rest_path:path}}')
|
||||
async def serve_sap(request: Request, rest_path: str):
|
||||
return templates.TemplateResponse("index.html", {"request": request})
|
||||
app.mount(URL_BASE, SinglePageApplication(directory=static_path), name="bison")
|
||||
|
||||
|
||||
def init():
|
||||
@@ -108,10 +118,6 @@ get_token = on_command('后台管理', rule=to_me(), priority=5)
|
||||
@get_token.handle()
|
||||
async def send_token(bot: "Bot", event: PrivateMessageEvent, state: T_State):
|
||||
driver = nonebot.get_driver()
|
||||
superusers = driver.config.superusers
|
||||
if event.get_user_id() not in superusers:
|
||||
await get_token.finish('你不是管理员')
|
||||
else:
|
||||
token = tm.get_user_token((event.get_user_id(), event.sender.nickname))
|
||||
await get_token.finish(f'请访问: {plugin_config.bison_outer_url}auth/{token}')
|
||||
token = tm.get_user_token((event.get_user_id(), event.sender.nickname))
|
||||
await get_token.finish(f'请访问: {plugin_config.bison_outer_url}auth/{token}')
|
||||
|
||||
|
||||
@@ -20,6 +20,17 @@ async def get_global_conf():
|
||||
}
|
||||
return { 'platformConf': res }
|
||||
|
||||
async def get_admin_groups(qq: int):
|
||||
bot = nonebot.get_bot()
|
||||
groups = await bot.call_api('get_group_list')
|
||||
res = []
|
||||
for group in groups:
|
||||
group_id = group['group_id']
|
||||
users = await bot.call_api('get_group_member_list', group_id=group_id)
|
||||
for user in users:
|
||||
if user['user_id'] == qq and user['role'] in ('owner', 'admin'):
|
||||
res.append({'id': group_id, 'name': group['group_name']})
|
||||
return res
|
||||
|
||||
async def auth(token: str):
|
||||
if qq_tuple := token_manager.get_user(token):
|
||||
@@ -41,6 +52,18 @@ async def auth(token: str):
|
||||
'token': pack_jwt(jwt_obj)
|
||||
}
|
||||
return { 'status': 200, **ret_obj }
|
||||
if admin_groups := await get_admin_groups(int(qq)):
|
||||
jwt_obj = {
|
||||
'id': str(qq),
|
||||
'groups': admin_groups
|
||||
}
|
||||
ret_obj = {
|
||||
'type': 'user',
|
||||
'name': nickname,
|
||||
'id': str(qq),
|
||||
'token': pack_jwt(jwt_obj)
|
||||
}
|
||||
return { 'status': 200, **ret_obj }
|
||||
else:
|
||||
return { 'status': 400, 'type': '', 'name': '', 'id': '', 'token': '' }
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user