from config import MODEL_PATH, PROMPT_EXTRACT_NAME, PROMPT_EXTRACT_COMPONENTS, PROMPT_EXTRACT_KEYWORD, PROMPT_EXTRACT_PREVENTION,PROMPT_EXTRACT_SUPPLIER,PROMPT_EXTRACT_ICON from model import QwenOcr from io import BytesIO import base64 import json from PIL import Image, ImageFilter, ImageEnhance import time from concurrent.futures import ThreadPoolExecutor, as_completed import requests def image_to_base64(pil_image, image_format="JPEG"): """将PIL Image图像转换为Base64编码""" buffered = BytesIO() pil_image.save(buffered, format=image_format) img_byte_array = buffered.getvalue() encode_image = base64.b64encode(img_byte_array).decode('utf-8') return encode_image def resize_image(image, max_size=512): """缩放图像尺寸,保持 OCR 质量""" width, height = image.size max_dim = max(width, height) # 如果图像不需要缩小,直接返回 if max_dim <= max_size: return image scaling_factor = max_size / max_dim new_width = int(width * scaling_factor) new_height = int(height * scaling_factor) # 使用 LANCZOS 高质量缩放 resized = image.resize((new_width, new_height), Image.Resampling.LANCZOS) # 应用 UnsharpMask 锐化,补偿缩放损失 resized = resized.filter(ImageFilter.UnsharpMask(radius=1, percent=120, threshold=3)) # 轻微增强对比度,提高文字识别率 enhancer = ImageEnhance.Contrast(resized) resized = enhancer.enhance(1.1) return resized class OcrAgent: def __init__(self): self._url = "http://127.0.0.1:8000/api/v1/ocr" def extract_part_info(self, image_base64, prompt): """根据提示词提取信息""" response = requests.post( self._url, json={ "image": image_base64, "text": prompt } ) result = response.json() return json.loads(result['data'][0]) def agent_ocr(self, image): """qwen_ocr提取化学品安全标签信息""" image = resize_image(image, max_size=1024) image_base64 = image_to_base64(image) start_time = time.perf_counter() # 定义需要并行执行的任务 tasks = { 'icon': PROMPT_EXTRACT_ICON, 'name': PROMPT_EXTRACT_NAME, 'tag': PROMPT_EXTRACT_COMPONENTS, 'risk_notice': PROMPT_EXTRACT_KEYWORD, 'pre_notice': PROMPT_EXTRACT_PREVENTION, 'suppliers': PROMPT_EXTRACT_SUPPLIER } # 使用线程池并行执行所有提取任务 results = {} with ThreadPoolExecutor(max_workers=6) as executor: # 提交所有任务 future_to_task = { executor.submit(self.extract_part_info, image_base64, prompt): task_name for task_name, prompt in tasks.items() } # 收集结果 for future in as_completed(future_to_task): task_name = future_to_task[future] try: results[task_name] = future.result() except Exception as e: print(f"任务 {task_name} 执行失败: {e}") results[task_name] = {} # 从结果中提取数据 icon = results.get('icon', {}) name = results.get('name', {}) tag = results.get('tag', {}) risk_notice = results.get('risk_notice', {}) pre_notice = results.get('pre_notice', {}) suppliers = results.get('suppliers', {}) end_time = time.perf_counter() elapsed_time = end_time - start_time print(f"推理时间: {elapsed_time:.6f} 秒") result = { "tag": { "name_cn": name["name_cn"], "name_en": name["name_en"], "cf_list": tag["cf_list"] }, "tag_images": icon["tag_images"], "key_word": risk_notice["key_word"], "risk_notice": risk_notice["risk_notice"], "pre_notice": pre_notice["pre_notice"], "supplier": suppliers["supplier"], "acc_tel": suppliers["acc_tel"], } return result if __name__ == "__main__": image = Image.open("./test1.jpg").convert("RGB") agent = OcrAgent() agent.agent_ocr(image)