seperate post and abstract post

This commit is contained in:
felinae98 2022-05-25 20:55:13 +08:00
parent fcf8be38f0
commit 29574d28cf
No known key found for this signature in database
GPG Key ID: 00C8B010587FF610
3 changed files with 95 additions and 37 deletions

View File

@ -0,0 +1,3 @@
from .post import Post
__all__ = ["Post"]

View File

@ -0,0 +1,50 @@
from abc import abstractmethod
from dataclasses import dataclass, field
from functools import reduce
from typing import Optional
from nonebot.adapters.onebot.v11.message import Message, MessageSegment
from ..plugin_config import plugin_config
@dataclass
class BasePost:
@abstractmethod
async def generate_text_messages(self) -> list[MessageSegment]:
"Generate Message list from this instance"
...
@abstractmethod
async def generate_pic_messages(self) -> list[MessageSegment]:
"Generate Message list from this instance with `use_pic`"
...
@dataclass
class OptionalMixin:
# Because of https://stackoverflow.com/questions/51575931/class-inheritance-in-python-3-7-dataclasses
override_use_pic: Optional[bool] = None
compress: bool = False
extra_msg: list[Message] = field(default_factory=list)
def _use_pic(self):
if not self.override_use_pic is None:
return self.override_use_pic
return plugin_config.bison_use_pic
@dataclass
class AbstractPost(OptionalMixin, BasePost):
async def generate_messages(self) -> list[Message]:
if self._use_pic():
msg_segments = await self.generate_pic_messages()
else:
msg_segments = await self.generate_text_messages()
if self.compress:
msgs = [reduce(lambda x, y: x.append(y), msg_segments, Message())]
else:
msgs = list(map(lambda msg_segment: Message([msg_segment]), msg_segments))
msgs.extend(self.extra_msg)
return msgs

View File

@ -7,28 +7,21 @@ from nonebot.adapters.onebot.v11.message import Message, MessageSegment
from nonebot.log import logger
from PIL import Image
from .plugin_config import plugin_config
from .utils import http_client, parse_text
from ..utils import http_client, parse_text
from .abstract_post import BasePost, OptionalMixin
@dataclass
class Post:
class _Post(BasePost):
target_type: str
text: str
url: Optional[str] = None
target_name: Optional[str] = None
compress: bool = False
override_use_pic: Optional[bool] = None
pics: list[Union[str, bytes]] = field(default_factory=list)
extra_msg: list[Message] = field(default_factory=list)
_message: Optional[list[Message]] = None
def _use_pic(self):
if not self.override_use_pic is None:
return self.override_use_pic
return plugin_config.bison_use_pic
_message: Optional[list[MessageSegment]] = None
_pic_message: Optional[list[MessageSegment]] = None
async def _pic_url_to_image(self, data: Union[str, bytes]) -> Image.Image:
pic_buffer = BytesIO()
@ -106,42 +99,49 @@ class Post:
self.pics = self.pics[matrix[0] * matrix[1] :]
self.pics.insert(0, target_io.getvalue())
async def generate_messages(self) -> list[Message]:
async def generate_text_messages(self) -> list[MessageSegment]:
if self._message is None:
await self._pic_merge()
msg_segments: list[MessageSegment] = []
text = ""
if self.text:
if self._use_pic():
text += "{}".format(self.text)
else:
text += "{}".format(
self.text if len(self.text) < 500 else self.text[:500] + "..."
)
text += "\n来源: {}".format(self.target_type)
text += "{}".format(
self.text if len(self.text) < 500 else self.text[:500] + "..."
)
if text:
text += "\n"
text += "来源: {}".format(self.target_type)
if self.target_name:
text += " {}".format(self.target_name)
if self._use_pic():
msg_segments.append(await parse_text(text))
if not self.target_type == "rss" and self.url:
msg_segments.append(MessageSegment.text(self.url))
else:
if self.url:
text += " \n详情: {}".format(self.url)
msg_segments.append(MessageSegment.text(text))
if self.url:
text += " \n详情: {}".format(self.url)
msg_segments.append(MessageSegment.text(text))
for pic in self.pics:
msg_segments.append(MessageSegment.image(pic))
if self.compress:
msgs = [reduce(lambda x, y: x.append(y), msg_segments, Message())]
else:
msgs = list(
map(lambda msg_segment: Message([msg_segment]), msg_segments)
)
msgs.extend(self.extra_msg)
self._message = msgs
assert len(self._message) > 0, f"message list empty, {self}"
self._message = msg_segments
return self._message
async def generate_pic_messages(self) -> list[MessageSegment]:
if self._pic_message is None:
await self._pic_merge()
msg_segments: list[MessageSegment] = []
text = ""
if self.text:
text += "{}".format(self.text)
text += "\n"
text += "来源: {}".format(self.target_type)
if self.target_name:
text += " {}".format(self.target_name)
msg_segments.append(await parse_text(text))
if not self.target_type == "rss" and self.url:
msg_segments.append(MessageSegment.text(self.url))
for pic in self.pics:
msg_segments.append(MessageSegment.image(pic))
self._pic_message = msg_segments
return self._pic_message
def __str__(self):
return "type: {}\nfrom: {}\ntext: {}\nurl: {}\npic: {}".format(
self.target_type,
@ -157,3 +157,8 @@ class Post:
)
),
)
@dataclass
class Post(OptionalMixin, _Post):
pass