from agent.agent import Agent from db import MongoDao import json5 from model import BrandModelInference, BrandCompareModelInference from utils.utils import load_image_from_url, load_image_from_cos import pandas as pd license_dao = MongoDao("ProductStandard") # license_infernece = BrandModelInference('全棉时代') license_infernece = BrandCompareModelInference() class ApiService: agent = Agent() @staticmethod def keyword_judgement(url_data, basic_data): """关键词引流判断""" if url_data['title'] == None or url_data['brand_name'] == None or basic_data['brand_name'] == None or len(url_data['product_images']) == 0: return '信息不全,无法判定' # 标题判定 if basic_data['brand_name'] in url_data['title'] and url_data['brand_name'] != basic_data['brand_name']: # 当产品名称中包含与判定商标一致的品牌名称,且详情页中的品牌名称与判定的品牌名称不一致时判定为关键词引流 return True if url_data['brand_name'] == basic_data['brand_name'] and basic_data['brand_name'] in url_data['title']: """商品信息中的商品名称、品牌名称于基础数据中的商标名称完全一致时,使用图像判定""" # 判断商标 logos_info = json5.loads(ApiService.agent.get_log_from_product_images(url_data['product_images'])) if logos_info['is_contained_logo']: for log_name in logos_info['logo_infos']: # if log_name.strip() != basic_data['brand_name'] and log_name.strip() != 'LI-NING': # return True if log_name.strip() != basic_data['brand_name'] and log_name.strip() != 'Purcotton': print(f"brand_name:{basic_data['brand_name']}") print(f"log_name:{log_name}") return True # 与授权商品对比 # similarity_judgement = json5.loads(ApiService.agent.multi_products_images_similarity_judgement(url_data['product_images'], basic_data['product_images'])) # if similarity_judgement['is_similarity_product']: # return False # else: # return True if len(url_data['product_images']) != 0: for image_url in url_data['product_images']: # product_image = load_image_from_cos(image_url).resize((512, 512)) product_image = load_image_from_url(image_url).resize((512, 512)) similarity_map = license_infernece.calculate_similarity(product_image, basic_data['product_images']) if similarity_map[0]['similarity'] >= 90.0: return False return True # 图像判定 return "无法判定" @staticmethod def similarity_logo_judgement(url_data, basic_data): """近似商标侵权判定""" if basic_data['similarity_logos'] == None or len(basic_data['similarity_logos']) == 0: return '信息不全,无法判断' if url_data['title'] == None: url_data['title'] = '' if url_data['brand_name'] == None: url_data['brand_name'] = '' for similarity_logo in basic_data['similarity_logos']: if similarity_logo in url_data['title'] or similarity_logo in url_data['brand_name']: return True # 判断基础数据中的名称是否包含在标题或者详情品牌名称中 if basic_data['brand_name'] in url_data['title'] or basic_data['brand_name'] in url_data['brand_name']: return False else: return '无法判定' @staticmethod def logo_1to1_judgement(url_data, basic_data): """商标一比一侵权""" if basic_data['base_price'] == None or basic_data['price_percent'] == None or url_data['title'] == None or basic_data['product_name'] == None: return '信息不全,无法判定' # 首先计算基础价格百分比 base_price = basic_data['base_price'] * basic_data['price_percent'] if url_data['title'].strip() == basic_data['product_name'].strip(): # 当商品名称与提供的产品名称完全一致时,进行判定 if url_data['price'] < base_price: return True else: return False else: if url_data['product_images'] == None or len(url_data['product_images']) == 0 or basic_data['product_images'] == None or len(basic_data['product_images']) == 0: return '信息不全,无法判定' for image_url1 in url_data['product_images']: for image_url2 in basic_data['product_images']: similarity_product_judgement = json5.loads(ApiService.agent.product_image_similarity_judgement(image_url1, image_url2)) if similarity_product_judgement['is_similarity_product']: if url_data['price'] < base_price: return True else: return False return False @staticmethod def low_price_judgement(url_data, basic_data): """低价判断""" if url_data['price'] == None or basic_data['base_price'] == None: return '价格信息缺失,无法判定' if url_data['title'] == basic_data['product_name']: if url_data['price'] < basic_data['base_price']: return True else: return False else: # 进行图像判定 if url_data['product_images'] == None or len(url_data['product_images']) == 0 or basic_data['product_images'] == None or len(basic_data['product_images']) == 0: return '信息不全,无法判定' for image_url1 in url_data['product_images']: for image_url2 in basic_data['product_images']: similarity_product_judgement = json5.loads(ApiService.agent.product_image_similarity_judgement(image_url1, image_url2)) if similarity_product_judgement['is_similarity_product']: if url_data['price'] < basic_data['base_price']: return True else: return False return False @staticmethod def get_license_list(brand_name): """获取品牌方授权生成的商品列表""" license_list = [] records = license_dao.get_records_by_query({"brand_name": brand_name}) for record in records: license_list.append(record['product_name'].strip()) return license_list @staticmethod def license_judgement(url_data, basic_data): """未生产产品判断""" if basic_data['brand_name'] == None: return '信息不全,无法判定' license_list = ApiService.get_license_list(basic_data['brand_name']) if len(license_list) == 0: return '授权列表空,无法判定' result = json5.loads(ApiService.agent.license_product_judgement(url_data['title'], license_list)) return result['in_list'] if __name__ == '__main__': url_data = { 'title': '休闲短裤女士运动潮流系列夏季女装裤子梭织运动裤', 'brand_name': '李宁', 'product_images': [ 'http://h2.appsimg.com/a.appsimg.com/upload/merchandise/pdcvis/613214/2024/0902/118/27466cf6-fb28-4580-9009-95a3763e06bf.jpg', 'http://h2.appsimg.com/a.appsimg.com/upload/merchandise/pdcvis/613214/2024/1120/169/8ca15632-9cb9-40e7-8915-e6773e17a05e.jpg' ], 'price': 199 } basic_data = { 'product_name': '休闲短裤女士运动潮流系列夏季女装裤子梭织运动裤', 'brand_name': '李宁', 'similarity_logos': ['李宇', '李柠'], 'product_images': [ 'http://h2.appsimg.com/a.appsimg.com/upload/merchandise/pdcvis/613214/2024/1120/169/8ca15632-9cb9-40e7-8915-e6773e17a05e.jpg', 'http://h2.appsimg.com/a.appsimg.com/upload/merchandise/pdcvis/613214/2024/1120/169/8ca15632-9cb9-40e7-8915-e6773e17a05e.jpg' ], 'base_price': 200, 'price_percent': 0.9 } result = ApiService.low_price_judgement(url_data, basic_data) print(result) product_list = [ "https://gw.alicdn.com/imgextra/O1CN01SRLgJ11JdrhxIPXGd_!!3378851052.jpg_q95.jpg_.webp", "https://img.alicdn.com/imgextra/i2/3378851052/O1CN01UpWwyY1JdrUq6MJvu_!!3378851052.jpg_q75.jpg_.webp", "https://img.alicdn.com/imgextra/i2/3378851052/O1CN01qRzu8W1JdraPoEnyK_!!3378851052.jpg_q75.jpg_.webp", "https://img.alicdn.com/imgextra/i2/3378851052/O1CN0150gjbG1JdrUplO3bP_!!3378851052.jpg_q75.jpg_.webp", "https://img.alicdn.com/imgextra/i4/3378851052/O1CN01Cw6aaG1Jdra8AwqW7_!!3378851052.jpg_q75.jpg_.webp", "https://img.alicdn.com/imgextra/i1/3378851052/O1CN01PMPzQN1JdrUoKrsqo_!!3378851052.jpg_q75.jpg_.webp" ] basic_product_list = [ "https://gw.alicdn.com/imgextra/O1CN01EFpxoy1JdrhyyD8Gp_!!3378851052.jpg_q95.jpg_.webp", "https://img.alicdn.com/imgextra/i2/3378851052/O1CN01N1VBKz1JdriDQ7v3s_!!3378851052.jpg_q75.jpg_.webp", "https://img.alicdn.com/imgextra/i4/3378851052/O1CN01d1T16h1JdriBxQW8j_!!3378851052.jpg_q75.jpg_.webp", "https://img.alicdn.com/imgextra/i4/3378851052/O1CN01O5Gb861JdriAyGLo0_!!3378851052.jpg_q75.jpg_.webp", "https://img.alicdn.com/imgextra/i1/3378851052/O1CN01WT8Kg81JdriDTZ4lq_!!3378851052.jpg_q75.jpg_.webp", "https://img.alicdn.com/imgextra/i2/3378851052/O1CN01v4KL0D1JdriCrG3SH_!!3378851052.jpg_q75.jpg_.webp" ] response = ApiService.agent.multi_products_images_similarity_judgement(product_list, basic_product_list) print(response)