以下是支持批量格式转换、尺寸压缩、水印添加和特效处理的核心代码,采用多线程提升效率,支持配置化操作:
from PIL import Image, ImageDraw, ImageFont
import os
import glob
import json
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import argparse
# 配置日志
logging.basicConfig(
filename='image_processor_log.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
encoding='utf-8'
)
# 支持的图片格式
SUPPORTED_FORMATS = ('.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff')
# 加载处理配置
def load_config(config_path):
with open(config_path, 'r', encoding='utf-8') as f:
return json.load(f)
# 获取文件夹内所有图片文件(支持递归)
def get_all_image_files(input_dir, recursive=True):
image_files = []
if recursive:
for root, dirs, files in os.walk(input_dir):
for file in files:
if os.path.splitext(file.lower())[1] in SUPPORTED_FORMATS:
image_files.append(os.path.join(root, file))
else:
for ext in SUPPORTED_FORMATS:
image_files.extend(glob.glob(os.path.join(input_dir, f'*{ext}'), recursive=False))
return image_files
# 图片尺寸压缩(按比例或固定尺寸)
def resize_image(image, config):
width, height = image.size
resize_type = config.get('resize_type', 'ratio') # ratio/fixed
if resize_type == 'ratio':
# 按比例压缩
ratio = config.get('ratio', 0.8)
new_width = int(width * ratio)
new_height = int(height * ratio)
else:
# 固定尺寸
new_width = config.get('width', width)
new_height = config.get('height', height)
# 保持宽高比(可选)
if config.get('keep_aspect_ratio', True):
aspect_ratio = width / height
new_aspect_ratio = new_width / new_height
if new_aspect_ratio != aspect_ratio:
if new_width / width < new_height / height:
new_height = int(new_width / aspect_ratio)
else:
new_width = int(new_height * aspect_ratio)
# 调整图片尺寸(使用抗锯齿)
resized_image = image.resize((new_width, new_height), Image.Resampling.LANCZOS)
return resized_image
# 添加文字水印
def add_text_watermark(image, config):
watermark_text = config.get('text', '')
if not watermark_text:
return image
# 水印配置
font_size = config.get('font_size', 20)
color = config.get('color', '#ffffff')
opacity = config.get('opacity', 50) # 0-100
position = config.get('position', 'bottom_right') # top_left/top_right/bottom_left/bottom_right
margin = config.get('margin', 20)
# 创建透明图层
watermark_layer = Image.new('RGBA', image.size, (0, 0, 0, 0))
draw = ImageDraw.Draw(watermark_layer)
# 加载字体(使用默认字体,可指定路径)
try:
font = ImageFont.truetype('arial.ttf', font_size)
except:
font = ImageFont.load_default(size=font_size)
# 计算文字位置
text_width, text_height = draw.textbbox((0, 0), watermark_text, font=font)[2:]
width, height = image.size
if position == 'top_left':
x, y = margin, margin
elif position == 'top_right':
x, y = width - text_width - margin, margin
elif position == 'bottom_left':
x, y = margin, height - text_height - margin
else: # bottom_right
x, y = width - text_width - margin, height - text_height - margin
# 绘制文字(设置透明度)
draw.text((x, y), watermark_text, font=font, fill=color + hex(int(opacity * 2.55))[2:].zfill(2))
# 合并图层
if image.mode != 'RGBA':
image = image.convert('RGBA')
combined = Image.alpha_composite(image, watermark_layer)
return combined.convert('RGB') # 转换回 RGB 避免透明通道问题
# 添加图片水印
def add_image_watermark(image, config):
watermark_path = config.get('path', '')
if not watermark_path or not os.path.exists(watermark_path):
logging.warning("水印图片不存在")
return image
# 水印配置
opacity = config.get('opacity', 50)
position = config.get('position', 'bottom_right')
margin = config.get('margin', 20)
scale = config.get('scale', 0.2) # 水印相对于原图的比例
# 加载水印图片
watermark = Image.open(watermark_path).convert('RGBA')
# 调整水印尺寸
img_width, img_height = image.size
wm_width = int(img_width * scale)
wm_height = int(watermark.size[1] * (wm_width / watermark.size[0]))
watermark = watermark.resize((wm_width, wm_height), Image.Resampling.LANCZOS)
# 设置水印透明度
alpha = watermark.split()[3]
alpha = alpha.point(lambda p: p * (opacity / 100))
watermark.putalpha(alpha)
# 计算水印位置
wm_width, wm_height = watermark.size
if position == 'top_left':
x, y = margin, margin
elif position == 'top_right':
x, y = img_width - wm_width - margin, margin
elif position == 'bottom_left':
x, y = margin, img_height - wm_height - margin
else: # bottom_right
x, y = img_width - wm_width - margin, img_height - wm_height - margin
# 合并水印
if image.mode != 'RGBA':
image = image.convert('RGBA')
image.paste(watermark, (x, y), watermark)
return image.convert('RGB')
# 图片特效处理(灰度、反转等)
def apply_image_effect(image, config):
effect_type = config.get('type', '')
if not effect_type:
return image
if effect_type == 'grayscale':
# 灰度处理
image = image.convert('L').convert('RGB')
elif effect_type == 'invert':
# 颜色反转
image = Image.eval(image, lambda x: 255 - x)
elif effect_type == 'brightness':
# 亮度调整
brightness = config.get('value', 0) # -100 到 100
factor = 1 + (brightness / 100)
image = Image.eval(image, lambda x: int(min(255, max(0, x * factor))))
elif effect_type == 'contrast':
# 对比度调整
contrast = config.get('value', 0) # -100 到 100
factor = 1 + (contrast / 100)
image = Image.eval(image, lambda x: int(min(255, max(0, (x - 128) * factor + 128))))
return image
# 单张图片处理
def process_single_image(input_path, output_dir, config):
try:
# 加载图片
with Image.open(input_path) as img:
# 1. 尺寸压缩
if config.get('resize', {}).get('enable', False):
img = resize_image(img, config['resize'])
# 2. 特效处理
if config.get('effect', {}).get('enable', False):
img = apply_image_effect(img, config['effect'])
# 3. 添加水印
if config.get('watermark', {}).get('enable', False):
watermark_type = config['watermark'].get('type', 'text')
if watermark_type == 'text':
img = add_text_watermark(img, config['watermark'])
else:
img = add_image_watermark(img, config['watermark'])
# 4. 格式转换与保存
output_format = config.get('output_format', 'jpg').lower()
quality = config.get('quality', 85) # JPG 质量 0-100
# 构建输出路径(保持原文件结构)
relative_path = os.path.relpath(os.path.dirname(input_path), config['input_dir'])
output_path = os.path.join(output_dir, relative_path)
os.makedirs(output_path, exist_ok=True)
# 重命名(可选)
filename = os.path.basename(input_path)
name, _ = os.path.splitext(filename)
output_filename = f"{name}.{output_format}"
if config.get('rename_prefix', ''):
output_filename = f"{config['rename_prefix']}_{output_filename}"
final_output_path = os.path.join(output_path, output_filename)
# 保存图片
if output_format == 'jpg':
img.save(final_output_path, 'JPEG', quality=quality, optimize=True)
elif output_format == 'png':
img.save(final_output_path, 'PNG', optimize=True)
elif output_format == 'gif':
img.save(final_output_path, 'GIF')
else:
img.save(final_output_path)
logging.info(f"处理成功:{input_path} -> {final_output_path}")
return True
except Exception as e:
logging.error(f"处理失败 {input_path}:{str(e)}")
return False
# 批量图片处理(多线程)
def batch_process_images(config):
input_dir = config.get('input_dir', '.')
output_dir = config.get('output_dir', 'output')
max_workers = config.get('max_workers', 5)
recursive = config.get('recursive', True)
# 验证输入目录
if not os.path.isdir(input_dir):
logging.error(f"输入目录不存在:{input_dir}")
return
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
# 获取所有图片文件
image_files = get_all_image_files(input_dir, recursive=recursive)
if not image_files:
logging.warning("未找到任何图片文件")
return
print(f"找到 {len(image_files)} 张图片,开始批量处理...")
success_count = 0
fail_count = 0
# 多线程处理
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_file = {
executor.submit(process_single_image, file, output_dir, config): file
for file in image_files
}
# 处理结果
for future in as_completed(future_to_file):
if future.result():
success_count += 1
else:
fail_count += 1
# 输出统计信息
print("\n批量处理完成!")
print(f"总数量:{len(image_files)}")
print(f"成功:{success_count}")
print(f"失败:{fail_count}")
print(f"输出目录:{os.path.abspath(output_dir)}")
logging.info(f"批量处理完成 - 成功:{success_count},失败:{fail_count}")
# 主函数
def main():
# 解析命令行参数
parser = argparse.ArgumentParser(description='图片批量处理工具')
parser.add_argument('--config', type=str, default='config.json', help='配置文件路径')
args = parser.parse_args()
# 加载配置
try:
config = load_config(args.config)
except FileNotFoundError:
print(f"配置文件未找到:{args.config}")
return
except json.JSONDecodeError:
print(f"配置文件格式错误:{args.config}")
return
# 执行批量处理
batch_process_images(config)
if __name__ == '__main__':
main()