mirror of
https://github.com/suyiiyii/nonebot-bison.git
synced 2026-05-09 18:27:56 +08:00
new image merge
This commit is contained in:
@@ -34,37 +34,65 @@ class Post:
|
||||
pic_buffer.write(res.content)
|
||||
return Image.open(pic_buffer)
|
||||
|
||||
def _check_image_square(self, size: tuple[int, int]) -> bool:
|
||||
return abs(size[0] - size[1]) / size[0] < 0.01
|
||||
|
||||
async def _pic_merge(self) -> None:
|
||||
if len(self.pics) < 6:
|
||||
return
|
||||
first_image = await self._pic_url_to_image(self.pics[0])
|
||||
if first_image.size[0] != first_image.size[1]:
|
||||
if not self._check_image_square(first_image.size):
|
||||
return
|
||||
pic_size = first_image.size[0]
|
||||
images = [first_image]
|
||||
for pic in self.pics[1:]:
|
||||
cur_image = await self._pic_url_to_image(pic)
|
||||
if cur_image.size[0] != pic_size or cur_image.size[1] != pic_size:
|
||||
break
|
||||
images.append(cur_image)
|
||||
if len(images) == 6:
|
||||
matrix = (3, 2)
|
||||
self.pics = self.pics[6:]
|
||||
elif len(images) >= 9:
|
||||
matrix = (3, 3)
|
||||
self.pics = self.pics[9:]
|
||||
else:
|
||||
images: list[Image.Image] = [first_image]
|
||||
# first row
|
||||
for i in range(1, 3):
|
||||
cur_img = await self._pic_url_to_image(self.pics[i])
|
||||
if not self._check_image_square(cur_img.size):
|
||||
return
|
||||
if cur_img.size[1] != images[0].size[1]: # height not equal
|
||||
return
|
||||
images.append(cur_img)
|
||||
_tmp = 0
|
||||
x_coord = [0]
|
||||
for i in range(3):
|
||||
_tmp += images[i].size[0]
|
||||
x_coord.append(_tmp)
|
||||
y_coord = [0, first_image.size[1]]
|
||||
async def process_row(row: int) -> bool:
|
||||
row_first_img = await self._pic_url_to_image(self.pics[row * 3])
|
||||
if not self._check_image_square(row_first_img.size):
|
||||
return False
|
||||
if row_first_img.size[0] != images[0].size[0]:
|
||||
return False
|
||||
image_row: list[Image.Image] = [row_first_img]
|
||||
for i in range(row * 3 + 1, row * 3 + 3):
|
||||
cur_img = await self._pic_url_to_image(self.pics[i])
|
||||
if not self._check_image_square(cur_img.size):
|
||||
return False
|
||||
if cur_img.size[1] != row_first_img.size[1]:
|
||||
return False
|
||||
if cur_img.size[0] != images[i % 3].size[0]:
|
||||
return False
|
||||
image_row.append(cur_img)
|
||||
images.extend(image_row)
|
||||
y_coord.append(y_coord[-1] + row_first_img.size[1])
|
||||
return True
|
||||
if not await process_row(1):
|
||||
return
|
||||
matrix = (3,2)
|
||||
if await process_row(2):
|
||||
matrix = (3,3)
|
||||
logger.info('trigger merge image')
|
||||
target = Image.new('RGB', (matrix[0] * pic_size, matrix[1] * pic_size))
|
||||
target = Image.new('RGB', (x_coord[-1], y_coord[-1]))
|
||||
for y in range(matrix[1]):
|
||||
for x in range(matrix[0]):
|
||||
target.paste(images[y * matrix[0] + x], (
|
||||
x * pic_size, y * pic_size, (x + 1) * pic_size, (y + 1) * pic_size
|
||||
x_coord[x], y_coord[y], x_coord[x+1], y_coord[y+1]
|
||||
))
|
||||
target_io = BytesIO()
|
||||
target.save(target_io, 'JPEG')
|
||||
b64image = 'base64://' + base64.b64encode(target_io.getvalue()).decode()
|
||||
self.pics = self.pics[matrix[0] * matrix[1]: ]
|
||||
self.pics.insert(0, b64image)
|
||||
|
||||
async def generate_messages(self):
|
||||
|
||||
Reference in New Issue
Block a user