返回功能详情

图片批量处理工具 - 核心示例代码

以下是支持批量格式转换、尺寸压缩、水印添加和特效处理的核心代码,采用多线程提升效率,支持配置化操作:

复制成功!
from PIL import Image, ImageDraw, ImageFont
import os
import glob
import json
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import argparse

# 配置日志
logging.basicConfig(
    filename='image_processor_log.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    encoding='utf-8'
)

# 支持的图片格式
SUPPORTED_FORMATS = ('.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff')

# 加载处理配置
def load_config(config_path):
    with open(config_path, 'r', encoding='utf-8') as f:
        return json.load(f)

# 获取文件夹内所有图片文件(支持递归)
def get_all_image_files(input_dir, recursive=True):
    image_files = []
    if recursive:
        for root, dirs, files in os.walk(input_dir):
            for file in files:
                if os.path.splitext(file.lower())[1] in SUPPORTED_FORMATS:
                    image_files.append(os.path.join(root, file))
    else:
        for ext in SUPPORTED_FORMATS:
            image_files.extend(glob.glob(os.path.join(input_dir, f'*{ext}'), recursive=False))
    return image_files

# 图片尺寸压缩(按比例或固定尺寸)
def resize_image(image, config):
    width, height = image.size
    resize_type = config.get('resize_type', 'ratio')  # ratio/fixed
    
    if resize_type == 'ratio':
        # 按比例压缩
        ratio = config.get('ratio', 0.8)
        new_width = int(width * ratio)
        new_height = int(height * ratio)
    else:
        # 固定尺寸
        new_width = config.get('width', width)
        new_height = config.get('height', height)
    
    # 保持宽高比(可选)
    if config.get('keep_aspect_ratio', True):
        aspect_ratio = width / height
        new_aspect_ratio = new_width / new_height
        if new_aspect_ratio != aspect_ratio:
            if new_width / width < new_height / height:
                new_height = int(new_width / aspect_ratio)
            else:
                new_width = int(new_height * aspect_ratio)
    
    # 调整图片尺寸(使用抗锯齿)
    resized_image = image.resize((new_width, new_height), Image.Resampling.LANCZOS)
    return resized_image

# 添加文字水印
def add_text_watermark(image, config):
    watermark_text = config.get('text', '')
    if not watermark_text:
        return image
    
    # 水印配置
    font_size = config.get('font_size', 20)
    color = config.get('color', '#ffffff')
    opacity = config.get('opacity', 50)  # 0-100
    position = config.get('position', 'bottom_right')  # top_left/top_right/bottom_left/bottom_right
    margin = config.get('margin', 20)
    
    # 创建透明图层
    watermark_layer = Image.new('RGBA', image.size, (0, 0, 0, 0))
    draw = ImageDraw.Draw(watermark_layer)
    
    # 加载字体(使用默认字体,可指定路径)
    try:
        font = ImageFont.truetype('arial.ttf', font_size)
    except:
        font = ImageFont.load_default(size=font_size)
    
    # 计算文字位置
    text_width, text_height = draw.textbbox((0, 0), watermark_text, font=font)[2:]
    width, height = image.size
    
    if position == 'top_left':
        x, y = margin, margin
    elif position == 'top_right':
        x, y = width - text_width - margin, margin
    elif position == 'bottom_left':
        x, y = margin, height - text_height - margin
    else: # bottom_right
        x, y = width - text_width - margin, height - text_height - margin
    
    # 绘制文字(设置透明度)
    draw.text((x, y), watermark_text, font=font, fill=color + hex(int(opacity * 2.55))[2:].zfill(2))
    
    # 合并图层
    if image.mode != 'RGBA':
        image = image.convert('RGBA')
    combined = Image.alpha_composite(image, watermark_layer)
    return combined.convert('RGB')  # 转换回 RGB 避免透明通道问题

# 添加图片水印
def add_image_watermark(image, config):
    watermark_path = config.get('path', '')
    if not watermark_path or not os.path.exists(watermark_path):
        logging.warning("水印图片不存在")
        return image
    
    # 水印配置
    opacity = config.get('opacity', 50)
    position = config.get('position', 'bottom_right')
    margin = config.get('margin', 20)
    scale = config.get('scale', 0.2)  # 水印相对于原图的比例
    
    # 加载水印图片
    watermark = Image.open(watermark_path).convert('RGBA')
    
    # 调整水印尺寸
    img_width, img_height = image.size
    wm_width = int(img_width * scale)
    wm_height = int(watermark.size[1] * (wm_width / watermark.size[0]))
    watermark = watermark.resize((wm_width, wm_height), Image.Resampling.LANCZOS)
    
    # 设置水印透明度
    alpha = watermark.split()[3]
    alpha = alpha.point(lambda p: p * (opacity / 100))
    watermark.putalpha(alpha)
    
    # 计算水印位置
    wm_width, wm_height = watermark.size
    if position == 'top_left':
        x, y = margin, margin
    elif position == 'top_right':
        x, y = img_width - wm_width - margin, margin
    elif position == 'bottom_left':
        x, y = margin, img_height - wm_height - margin
    else: # bottom_right
        x, y = img_width - wm_width - margin, img_height - wm_height - margin
    
    # 合并水印
    if image.mode != 'RGBA':
        image = image.convert('RGBA')
    image.paste(watermark, (x, y), watermark)
    return image.convert('RGB')

# 图片特效处理(灰度、反转等)
def apply_image_effect(image, config):
    effect_type = config.get('type', '')
    if not effect_type:
        return image
    
    if effect_type == 'grayscale':
        # 灰度处理
        image = image.convert('L').convert('RGB')
    elif effect_type == 'invert':
        # 颜色反转
        image = Image.eval(image, lambda x: 255 - x)
    elif effect_type == 'brightness':
        # 亮度调整
        brightness = config.get('value', 0)  # -100 到 100
        factor = 1 + (brightness / 100)
        image = Image.eval(image, lambda x: int(min(255, max(0, x * factor))))
    elif effect_type == 'contrast':
        # 对比度调整
        contrast = config.get('value', 0)  # -100 到 100
        factor = 1 + (contrast / 100)
        image = Image.eval(image, lambda x: int(min(255, max(0, (x - 128) * factor + 128))))
    
    return image

# 单张图片处理
def process_single_image(input_path, output_dir, config):
    try:
        # 加载图片
        with Image.open(input_path) as img:
            # 1. 尺寸压缩
            if config.get('resize', {}).get('enable', False):
                img = resize_image(img, config['resize'])
            
            # 2. 特效处理
            if config.get('effect', {}).get('enable', False):
                img = apply_image_effect(img, config['effect'])
            
            # 3. 添加水印
            if config.get('watermark', {}).get('enable', False):
                watermark_type = config['watermark'].get('type', 'text')
                if watermark_type == 'text':
                    img = add_text_watermark(img, config['watermark'])
                else:
                    img = add_image_watermark(img, config['watermark'])
            
            # 4. 格式转换与保存
            output_format = config.get('output_format', 'jpg').lower()
            quality = config.get('quality', 85)  # JPG 质量 0-100
            
            # 构建输出路径(保持原文件结构)
            relative_path = os.path.relpath(os.path.dirname(input_path), config['input_dir'])
            output_path = os.path.join(output_dir, relative_path)
            os.makedirs(output_path, exist_ok=True)
            
            # 重命名(可选)
            filename = os.path.basename(input_path)
            name, _ = os.path.splitext(filename)
            output_filename = f"{name}.{output_format}"
            if config.get('rename_prefix', ''):
                output_filename = f"{config['rename_prefix']}_{output_filename}"
            
            final_output_path = os.path.join(output_path, output_filename)
            
            # 保存图片
            if output_format == 'jpg':
                img.save(final_output_path, 'JPEG', quality=quality, optimize=True)
            elif output_format == 'png':
                img.save(final_output_path, 'PNG', optimize=True)
            elif output_format == 'gif':
                img.save(final_output_path, 'GIF')
            else:
                img.save(final_output_path)
            
            logging.info(f"处理成功:{input_path} -> {final_output_path}")
            return True
    except Exception as e:
        logging.error(f"处理失败 {input_path}:{str(e)}")
        return False

# 批量图片处理(多线程)
def batch_process_images(config):
    input_dir = config.get('input_dir', '.')
    output_dir = config.get('output_dir', 'output')
    max_workers = config.get('max_workers', 5)
    recursive = config.get('recursive', True)
    
    # 验证输入目录
    if not os.path.isdir(input_dir):
        logging.error(f"输入目录不存在:{input_dir}")
        return
    
    # 创建输出目录
    os.makedirs(output_dir, exist_ok=True)
    
    # 获取所有图片文件
    image_files = get_all_image_files(input_dir, recursive=recursive)
    if not image_files:
        logging.warning("未找到任何图片文件")
        return
    
    print(f"找到 {len(image_files)} 张图片,开始批量处理...")
    success_count = 0
    fail_count = 0
    
    # 多线程处理
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # 提交所有任务
        future_to_file = {
            executor.submit(process_single_image, file, output_dir, config): file
            for file in image_files
        }
        
        # 处理结果
        for future in as_completed(future_to_file):
            if future.result():
                success_count += 1
            else:
                fail_count += 1
    
    # 输出统计信息
    print("\n批量处理完成!")
    print(f"总数量:{len(image_files)}")
    print(f"成功:{success_count}")
    print(f"失败:{fail_count}")
    print(f"输出目录:{os.path.abspath(output_dir)}")
    logging.info(f"批量处理完成 - 成功:{success_count},失败:{fail_count}")

# 主函数
def main():
    # 解析命令行参数
    parser = argparse.ArgumentParser(description='图片批量处理工具')
    parser.add_argument('--config', type=str, default='config.json', help='配置文件路径')
    args = parser.parse_args()
    
    # 加载配置
    try:
        config = load_config(args.config)
    except FileNotFoundError:
        print(f"配置文件未找到:{args.config}")
        return
    except json.JSONDecodeError:
        print(f"配置文件格式错误:{args.config}")
        return
    
    # 执行批量处理
    batch_process_images(config)

if __name__ == '__main__':
    main()