From 4db7e7b911e2dcdd7aa506772a6283a81857f374 Mon Sep 17 00:00:00 2001 From: suyiiyii Date: Sun, 8 Sep 2024 18:38:38 +0800 Subject: [PATCH] =?UTF-8?q?:recycles:=20DBConfig=E4=B8=AD=20=E6=9B=BF?= =?UTF-8?q?=E6=8D=A2platform=5Fname=E4=B8=BAsite=5Fname?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- nonebot_bison/config/db_config.py | 248 +++++++++++++++--------------- 1 file changed, 124 insertions(+), 124 deletions(-) diff --git a/nonebot_bison/config/db_config.py b/nonebot_bison/config/db_config.py index 4fdea77..4bb58ea 100644 --- a/nonebot_bison/config/db_config.py +++ b/nonebot_bison/config/db_config.py @@ -259,130 +259,130 @@ class DBConfig: ) return res - # async def get_cookie(self, platform_name: str = None, target: T_Target = None) -> Sequence[Cookie]: - # """根据平台名和订阅名获取 cookie,不会返回匿名cookie""" - # async with create_session() as sess: - # query = select(Cookie).distinct().where(Cookie.is_universal == False) # noqa: E712 - # if platform_name: - # query = query.where(Cookie.platform_name == platform_name) - # query = query.outerjoin(CookieTarget).options(selectinload(Cookie.targets)) - # res = (await sess.scalars(query)).all() - # if target: - # query = select(CookieTarget.cookie_id).join(Target).where(Target.target == target) - # ids = set((await sess.scalars(query)).all()) - # res = [cookie for cookie in res if cookie.id in ids] - # return res - # - # async def get_unviersal_cookie(self, platform_name: str = None) -> Sequence[Cookie]: - # async with create_session() as sess: - # query = select(Cookie).distinct().where(Cookie.is_universal == True) # noqa: E712 - # if platform_name: - # query = query.where(Cookie.platform_name == platform_name) - # res = (await sess.scalars(query)).all() - # return res - # - # async def add_cookie_with_content(self, platform_name: str, content: str) -> int: - # async with create_session() as sess: - # cookie = Cookie(platform_name=platform_name, content=content) - # sess.add(cookie) - # await sess.commit() - # await sess.refresh(cookie) - # return cookie.id - # - # async def add_cookie(self, cookie: Cookie) -> int: - # async with create_session() as sess: - # sess.add(cookie) - # await sess.commit() - # await sess.refresh(cookie) - # return cookie.id - # - # async def update_cookie(self, cookie: Cookie): - # async with create_session() as sess: - # cookie_in_db: Cookie | None = await sess.scalar(select(Cookie).where(Cookie.id == cookie.id)) - # if not cookie_in_db: - # return - # cookie_in_db.content = cookie.content - # cookie_in_db.last_usage = cookie.last_usage - # cookie_in_db.status = cookie.status - # cookie_in_db.tags = cookie.tags - # await sess.commit() - # - # async def delete_cookie_by_id(self, cookie_id: int): - # async with create_session() as sess: - # cookie = await sess.scalar( - # select(Cookie) - # .where(Cookie.id == cookie_id) - # .outerjoin(CookieTarget) - # .options(selectinload(Cookie.targets)) - # ) - # if len(cookie.targets) > 0: - # raise Exception(f"cookie {cookie.id} in use") - # await sess.execute(delete(Cookie).where(Cookie.id == cookie_id)) - # await sess.commit() - # - # async def get_cookie_by_target(self, target: T_Target, platform_name: str) -> Sequence[Cookie]: - # async with create_session() as sess: - # query = ( - # select(Cookie) - # .join(CookieTarget) - # .join(Target) - # .where(Target.platform_name == platform_name, Target.target == target) - # ) - # return (await sess.scalars(query)).all() - # - # async def get_universal_cookie(self, platform_name: str) -> Sequence[Cookie]: - # async with create_session() as sess: - # query = ( - # select(Cookie) - # .where(Cookie.platform_name == platform_name) - # .where(Cookie.is_universal == True) # noqa: E712 - # ) - # return (await sess.scalars(query)).all() - # - # async def add_cookie_target(self, target: T_Target, platform_name: str, cookie_id: int): - # async with create_session() as sess: - # target_obj = await sess.scalar( - # select(Target).where(Target.platform_name == platform_name, Target.target == target) - # ) - # # check if relation exists - # cookie_target = await sess.scalar( - # select(CookieTarget).where(CookieTarget.target == target_obj, CookieTarget.cookie_id == cookie_id) - # ) - # if cookie_target: - # raise DuplicateCookieTargetException() - # cookie_obj = await sess.scalar(select(Cookie).where(Cookie.id == cookie_id)) - # cookie_target = CookieTarget(target=target_obj, cookie=cookie_obj) - # sess.add(cookie_target) - # await sess.commit() - # - # async def delete_cookie_target(self, target: T_Target, platform_name: str, cookie_id: int): - # async with create_session() as sess: - # target_obj = await sess.scalar( - # select(Target).where(Target.platform_name == platform_name, Target.target == target) - # ) - # cookie_obj = await sess.scalar(select(Cookie).where(Cookie.id == cookie_id)) - # await sess.execute( - # delete(CookieTarget).where(CookieTarget.target == target_obj, CookieTarget.cookie == cookie_obj) - # ) - # await sess.commit() - # - # async def delete_cookie_target_by_id(self, cookie_target_id: int): - # async with create_session() as sess: - # await sess.execute(delete(CookieTarget).where(CookieTarget.id == cookie_target_id)) - # await sess.commit() - # - # async def get_cookie_target(self) -> list[CookieTarget]: - # async with create_session() as sess: - # query = ( - # select(CookieTarget) - # .outerjoin(Target) - # .options(selectinload(CookieTarget.target)) - # .outerjoin(Cookie) - # .options(selectinload(CookieTarget.cookie)) - # ) - # res = list((await sess.scalars(query)).all()) - # res.sort(key=lambda x: (x.target.platform_name, x.cookie_id, x.target_id)) - # return res + async def get_cookie(self, site_name: str = None, target: T_Target = None) -> Sequence[Cookie]: + """根据 site_name 和 target 获取 cookie,不会返回匿名cookie""" + async with create_session() as sess: + query = select(Cookie).distinct().where(Cookie.is_universal == False) # noqa: E712 + if site_name: + query = query.where(Cookie.site_name == site_name) + query = query.outerjoin(CookieTarget).options(selectinload(Cookie.targets)) + res = (await sess.scalars(query)).all() + if target: + query = select(CookieTarget.cookie_id).join(Target).where(Target.target == target) + ids = set((await sess.scalars(query)).all()) + res = [cookie for cookie in res if cookie.id in ids] + return res + + async def get_unviersal_cookie(self, site_name: str = None) -> Sequence[Cookie]: + async with create_session() as sess: + query = select(Cookie).distinct().where(Cookie.is_universal == True) # noqa: E712 + if site_name: + query = query.where(Cookie.site_name == site_name) + res = (await sess.scalars(query)).all() + return res + + async def add_cookie_with_content(self, site_name: str, content: str) -> int: + async with create_session() as sess: + cookie = Cookie(site_name=site_name, content=content) + sess.add(cookie) + await sess.commit() + await sess.refresh(cookie) + return cookie.id + + async def add_cookie(self, cookie: Cookie) -> int: + async with create_session() as sess: + sess.add(cookie) + await sess.commit() + await sess.refresh(cookie) + return cookie.id + + async def update_cookie(self, cookie: Cookie): + async with create_session() as sess: + cookie_in_db: Cookie | None = await sess.scalar(select(Cookie).where(Cookie.id == cookie.id)) + if not cookie_in_db: + return + cookie_in_db.content = cookie.content + cookie_in_db.last_usage = cookie.last_usage + cookie_in_db.status = cookie.status + cookie_in_db.tags = cookie.tags + await sess.commit() + + async def delete_cookie_by_id(self, cookie_id: int): + async with create_session() as sess: + cookie = await sess.scalar( + select(Cookie) + .where(Cookie.id == cookie_id) + .outerjoin(CookieTarget) + .options(selectinload(Cookie.targets)) + ) + if len(cookie.targets) > 0: + raise Exception(f"cookie {cookie.id} in use") + await sess.execute(delete(Cookie).where(Cookie.id == cookie_id)) + await sess.commit() + + async def get_cookie_by_target(self, target: T_Target, site_name: str) -> Sequence[Cookie]: + async with create_session() as sess: + query = ( + select(Cookie) + .join(CookieTarget) + .join(Target) + .where(Target.site_name == site_name, Target.target == target) + ) + return (await sess.scalars(query)).all() + + async def get_universal_cookie(self, site_name: str) -> Sequence[Cookie]: + async with create_session() as sess: + query = ( + select(Cookie) + .where(Cookie.site_name == site_name) + .where(Cookie.is_universal == True) # noqa: E712 + ) + return (await sess.scalars(query)).all() + + async def add_cookie_target(self, target: T_Target, site_name: str, cookie_id: int): + async with create_session() as sess: + target_obj = await sess.scalar( + select(Target).where(Target.site_name == site_name, Target.target == target) + ) + # check if relation exists + cookie_target = await sess.scalar( + select(CookieTarget).where(CookieTarget.target == target_obj, CookieTarget.cookie_id == cookie_id) + ) + if cookie_target: + raise DuplicateCookieTargetException() + cookie_obj = await sess.scalar(select(Cookie).where(Cookie.id == cookie_id)) + cookie_target = CookieTarget(target=target_obj, cookie=cookie_obj) + sess.add(cookie_target) + await sess.commit() + + async def delete_cookie_target(self, target: T_Target, site_name: str, cookie_id: int): + async with create_session() as sess: + target_obj = await sess.scalar( + select(Target).where(Target.site_name == site_name, Target.target == target) + ) + cookie_obj = await sess.scalar(select(Cookie).where(Cookie.id == cookie_id)) + await sess.execute( + delete(CookieTarget).where(CookieTarget.target == target_obj, CookieTarget.cookie == cookie_obj) + ) + await sess.commit() + + async def delete_cookie_target_by_id(self, cookie_target_id: int): + async with create_session() as sess: + await sess.execute(delete(CookieTarget).where(CookieTarget.id == cookie_target_id)) + await sess.commit() + + async def get_cookie_target(self) -> list[CookieTarget]: + async with create_session() as sess: + query = ( + select(CookieTarget) + .outerjoin(Target) + .options(selectinload(CookieTarget.target)) + .outerjoin(Cookie) + .options(selectinload(CookieTarget.cookie)) + ) + res = list((await sess.scalars(query)).all()) + res.sort(key=lambda x: (x.target.platform_name, x.cookie_id, x.target_id)) + return res config = DBConfig()