|
|
|
|
|
import os
|
|
from PIL import Image
|
|
import numpy as np
|
|
import json
|
|
|
|
Image.MAX_IMAGE_PIXELS = None
|
|
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from tqdm import tqdm
|
|
|
|
max_pixels=2048*2048
|
|
|
|
max_long_size=4096
|
|
def has_alpha(img:Image.Image):
|
|
for band in img.getbands():
|
|
if band in {'A','a','P'}:
|
|
return True
|
|
return False
|
|
|
|
def add_white_background(img:Image.Image)->Image.Image:
|
|
img=img.convert('RGBA')
|
|
background = Image.new('RGBA', img.size, (255, 255, 255))
|
|
img = Image.alpha_composite(background, img)
|
|
return img
|
|
|
|
def resize_image(image:Image.Image)->Image.Image:
|
|
|
|
width, height = image.size
|
|
max_side = max(width, height)
|
|
current_pixels=width*height
|
|
|
|
|
|
if max_side > max_long_size or current_pixels>max_pixels:
|
|
|
|
|
|
scale = min((max_long_size / max_side),
|
|
((max_pixels / current_pixels) ** 0.5))
|
|
|
|
new_width = int(width * scale)
|
|
new_height = int(height * scale)
|
|
|
|
resized_image = image.resize((new_width, new_height),
|
|
Image.BICUBIC
|
|
)
|
|
return resized_image
|
|
|
|
return image
|
|
|
|
def load_image(image_path:str)->Image.Image:
|
|
try:
|
|
with Image.open(image_path) as img:
|
|
img.load()
|
|
np.array(img)
|
|
img=resize_image(img)
|
|
if has_alpha(img):
|
|
img=add_white_background(img)
|
|
if not img.mode == "RGB":
|
|
img = img.convert("RGB")
|
|
return img
|
|
except:
|
|
return None
|
|
|
|
def get_image_metainfo(img):
|
|
if img is None:
|
|
return None
|
|
else:
|
|
width, height = img.size
|
|
return {'width':width,
|
|
'height':height,
|
|
'pixel_num':width*height,
|
|
|
|
}
|
|
|
|
|
|
def process_image(input_image_path:str,output_image_path:str):
|
|
|
|
img=load_image(input_image_path)
|
|
|
|
image_metainfo=get_image_metainfo(img)
|
|
|
|
output_image_json_path=output_image_path.replace(".webp",".json")
|
|
|
|
|
|
if img is not None and image_metainfo is not None:
|
|
img.save(output_image_path,"WEBP",quality=90)
|
|
with open(output_image_json_path,'w') as f:
|
|
json.dump(image_metainfo,f,indent=4)
|
|
|
|
def get_image_paths(input_dir, output_dir):
|
|
for root, _, files in os.walk(input_dir):
|
|
for file in files:
|
|
if file.lower().endswith(('.png', '.jpg', '.jpeg', '.webp', '.bmp')):
|
|
input_path = os.path.join(root, file)
|
|
rel_path = os.path.relpath(input_path,
|
|
input_dir)
|
|
output_path = os.path.join(output_dir,
|
|
os.path.splitext(rel_path)[0] + '.webp')
|
|
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
|
yield input_path, output_path
|
|
|
|
def process_images_with_thread_pool(input_image_dir:str,
|
|
output_image_dir:str,
|
|
num_threads=16):
|
|
os.makedirs(output_image_dir, exist_ok=True)
|
|
image_paths = get_image_paths(input_image_dir, output_image_dir)
|
|
with ThreadPoolExecutor(max_workers=num_threads) as executor:
|
|
|
|
futures = []
|
|
for input_path, output_path in image_paths:
|
|
futures.append(executor.submit(process_image,
|
|
input_path,
|
|
output_path))
|
|
for _ in tqdm(
|
|
executor.map(lambda f: f.result(), futures),
|
|
total=len(futures),
|
|
desc="Processing images"):
|
|
pass
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
|
|
|
process_images_with_thread_pool(input_image_dir=r"20240808\unsplash-research-dataset-lite-latest\test",
|
|
output_image_dir=r"20240808\unsplash-research-dataset-lite-latest\output",
|
|
num_threads=16) |