LAXMAYDAY's picture
Upload 2 files
b6ac2b8 verified
raw
history blame
No virus
4.63 kB
# -*- coding: utf-8 -*-
import os
from PIL import Image
import numpy as np
import json
Image.MAX_IMAGE_PIXELS = None
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
max_pixels=2048*2048
max_long_size=4096
def has_alpha(img:Image.Image):
for band in img.getbands():
if band in {'A','a','P'}:
return True
return False
def add_white_background(img:Image.Image)->Image.Image:
img=img.convert('RGBA') #转换为RGBA
background = Image.new('RGBA', img.size, (255, 255, 255))
img = Image.alpha_composite(background, img)
return img
def resize_image(image:Image.Image)->Image.Image:
width, height = image.size
max_side = max(width, height)
current_pixels=width*height
# 检查是否需要调整大小
if max_side > max_long_size or current_pixels>max_pixels:
# 计算缩放比例
scale = min((max_long_size / max_side),
((max_pixels / current_pixels) ** 0.5))
# 计算新的尺寸
new_width = int(width * scale)
new_height = int(height * scale)
# 调整图片大小
resized_image = image.resize((new_width, new_height),
Image.BICUBIC
)
return resized_image
# 如果不需要调整大小,返回原始图片
return image
def load_image(image_path:str)->Image.Image:
try:
with Image.open(image_path) as img:
img.load()#读取图片加载到内存
np.array(img) #尝试用numpy加载图片
img=resize_image(img) #resize图片
if has_alpha(img): #读取并移除透明图层
img=add_white_background(img) #添加白色背景
if not img.mode == "RGB":
img = img.convert("RGB")
return img
except:
return None
def get_image_metainfo(img):
if img is None:
return None
else:
width, height = img.size
return {'width':width,
'height':height,
'pixel_num':width*height,
}
def process_image(input_image_path:str,output_image_path:str):
img=load_image(input_image_path)
image_metainfo=get_image_metainfo(img)
output_image_json_path=output_image_path.replace(".webp",".json")
if img is not None and image_metainfo is not None:
img.save(output_image_path,"WEBP",quality=90) #保存图像
with open(output_image_json_path,'w') as f: #保存metainfo
json.dump(image_metainfo,f,indent=4)
def get_image_paths(input_dir, output_dir):
for root, _, files in os.walk(input_dir):
for file in files:
if file.lower().endswith(('.png', '.jpg', '.jpeg', '.webp', '.bmp')):
input_path = os.path.join(root, file)
rel_path = os.path.relpath(input_path,
input_dir)
output_path = os.path.join(output_dir,
os.path.splitext(rel_path)[0] + '.webp')
os.makedirs(os.path.dirname(output_path), exist_ok=True)
yield input_path, output_path
def process_images_with_thread_pool(input_image_dir:str,
output_image_dir:str,
num_threads=16):
os.makedirs(output_image_dir, exist_ok=True)
image_paths = get_image_paths(input_image_dir, output_image_dir)
with ThreadPoolExecutor(max_workers=num_threads) as executor:
# 创建任务列表
futures = []
for input_path, output_path in image_paths:
futures.append(executor.submit(process_image,
input_path,
output_path))
for _ in tqdm(
executor.map(lambda f: f.result(), futures),
total=len(futures),
desc="Processing images"):
pass
if __name__ == "__main__":
# process_image(
# input_image_path="test.png",
# output_image_path='test.webp')
process_images_with_thread_pool(input_image_dir=r"20240808\unsplash-research-dataset-lite-latest\test",
output_image_dir=r"20240808\unsplash-research-dataset-lite-latest\output",
num_threads=16)