Spaces:
Runtime error
Runtime error
import re | |
import time | |
import asyncio | |
from io import BytesIO | |
from typing import List, Optional | |
import httpx | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import torch | |
import PIL | |
import imagehash | |
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer, CLIPModel, CLIPProcessor | |
from PIL import Image | |
class Translator: | |
def __init__(self, model_id: str, device: Optional[str] = None): | |
self.device = device or ("cuda" if torch.cuda.is_available() else "cpu") | |
self.model_id = model_id | |
self.tokenizer = AutoTokenizer.from_pretrained( | |
model_id) | |
self.model = AutoModelForSeq2SeqLM.from_pretrained(model_id).to(self.device) | |
def _bos_token_attr(self): | |
if hasattr(self.tokenizer, "get_lang_id"): | |
return self.tokenizer.get_lang_id | |
elif hasattr(self.tokenizer, "lang_code_to_id"): | |
return self.tokenizer.lang_code_to_id | |
else: | |
return | |
def _language_code_mapper(self): | |
if "nllb" in self.model_id.lower(): | |
return {"en": "eng_Latn", | |
"es": "spa_Latn", | |
"pt": "por_Latn"} | |
elif "m2m" in self.model_id.lower(): | |
return {"en": "en", | |
"es": "es", | |
"pt": "pt"} | |
else: | |
return {"en": "eng", | |
"es": "spa", | |
"pt": "por"} | |
def translate(self, texts: List[str], src_lang: str, dest_lang: str = "en", max_length: int = 100): | |
self.tokenizer.src_lang = self._language_code_mapper[src_lang] | |
inputs = self.tokenizer(texts, return_tensors="pt").to(self.device) | |
if "opus" in self.model_id.lower(): | |
forced_bos_token_id = None | |
else: | |
forced_bos_token_id = self._bos_token_attr[self._language_code_mapper["en"]] | |
translated_tokens = self.model.generate( | |
**inputs, forced_bos_token_id=forced_bos_token_id, max_length=max_length | |
) | |
return self.tokenizer.batch_decode(translated_tokens, skip_special_tokens=True) | |
class OffTopicDetector: | |
def __init__(self, model_id: str, device: Optional[str] = None, image_size: str = "E", translator: Optional[Translator] = None): | |
self.device = device or ("cuda" if torch.cuda.is_available() else "cpu") | |
self.processor = CLIPProcessor.from_pretrained(model_id) | |
self.model = CLIPModel.from_pretrained(model_id).to(self.device) | |
self.image_size = image_size | |
self.translator = translator | |
def predict_probas(self, images: List[PIL.Image.Image], domain: str, site: str, | |
title: Optional[str] = None, | |
valid_templates: Optional[List[str]] = None, | |
invalid_classes: Optional[List[str]] = None, | |
autocast: bool = True): | |
domain = domain.lower() | |
if valid_templates: | |
valid_classes = [template.format(domain) for template in valid_templates] | |
else: | |
valid_classes = [f"a photo of {domain}", f"brochure with {domain} image", f"instructions for {domain}", f"{domain} diagram"] | |
if title: | |
if site == "CBT": | |
translated_title = title | |
else: | |
if site == "MLB": | |
src_lang = "pt" | |
else: | |
src_lang = "es" | |
translated_title = self.translator.translate(title, src_lang=src_lang, dest_lang="en", max_length=100)[0] | |
valid_classes.append(translated_title.lower()) | |
if not invalid_classes: | |
invalid_classes = ["promotional ad with store information", "promotional text", "google maps screenshot", "business card", "qr code"] | |
n_valid = len(valid_classes) | |
classes = valid_classes + invalid_classes | |
print(f"Valid classes: {valid_classes}", f"Invalid classes: {invalid_classes}", sep="\n") | |
n_classes = len(classes) | |
if self.device == "cuda": | |
torch.cuda.synchronize() | |
start = time.time() | |
inputs = self.processor(text=classes, images=images, return_tensors="pt", padding=True).to(self.device) | |
if self.device == "cpu" and autocast is True: | |
autocast = False | |
with torch.autocast(self.device, enabled=autocast): | |
with torch.no_grad(): | |
outputs = self.model(**inputs) | |
probas = outputs.logits_per_image.softmax(dim=1).cpu().numpy() # we can take the softmax to get the label probabilities | |
if self.device == "cuda": | |
torch.cuda.synchronize() | |
end = time.time() | |
duration = end - start | |
print(f"Model time: {round(duration, 2)} s", | |
f"Model time per image: {round(duration/len(images) * 1000, 0)} ms", | |
sep="\n") | |
valid_probas = probas[:, 0:n_valid].sum(axis=1, keepdims=True) | |
invalid_probas = probas[:, n_valid:n_classes].sum(axis=1, keepdims=True) | |
return probas, valid_probas, invalid_probas | |
def predict_probas_url(self, img_urls: List[str], domain: str, site:str, | |
title: Optional[str] = None, | |
valid_templates: Optional[List[str]] = None, | |
invalid_classes: Optional[List[str]] = None, | |
autocast: bool = True): | |
images = self.get_images(img_urls) | |
dedup_images = self._filter_dups(images) | |
return dedup_images, self.predict_probas(dedup_images, domain, site, title, valid_templates, invalid_classes, autocast) | |
def predict_probas_item(self, url_or_id: str, | |
use_title: bool = False, | |
valid_templates: Optional[List[str]] = None, | |
invalid_classes: Optional[List[str]] = None): | |
images, domain, site, title = self.get_item_data(url_or_id) | |
title = title if use_title else None | |
probas, valid_probas, invalid_probas = self.predict_probas(images, domain, site, title, valid_templates, | |
invalid_classes) | |
return images, domain, probas, valid_probas, invalid_probas | |
def apply_threshold(self, valid_probas: np.ndarray, threshold: float = 0.4): | |
return valid_probas >= threshold | |
def get_item_data(self, url_or_id: str): | |
if url_or_id.startswith("http"): | |
item_id = "".join(url_or_id.split("/")[3].split("-")[:2]) | |
else: | |
item_id = re.sub("-", "", url_or_id) | |
start = time.time() | |
response = httpx.get(f"https://api.mercadolibre.com/items/{item_id}").json() | |
title = response["title"] | |
site, domain = response["domain_id"].split("-") | |
img_urls = [x["url"] for x in response["pictures"]] | |
img_urls = [x.replace("-O.jpg", f"-{self.image_size}.jpg") for x in img_urls] | |
domain_name = httpx.get(f"https://api.mercadolibre.com/catalog_domains/CBT-{domain}").json()["name"] | |
end = time.time() | |
duration = end - start | |
print(f"Items API time: {round(duration * 1000, 0)} ms") | |
images = self.get_images(img_urls) | |
dedup_images = self._filter_dups(images) | |
return dedup_images, domain_name, site, title | |
def _filter_dups(self, images: List): | |
if len(images) > 1: | |
hashes = {} | |
for img in images: | |
hashes.update({str(imagehash.average_hash(img)): img}) | |
dedup_hashes = list(dict.fromkeys(hashes)) | |
dedup_images = [img for hash, img in hashes.items() if hash in dedup_hashes] | |
else: | |
dedup_images = images | |
if (diff := len(images) - len(dedup_images)) > 0: | |
print(f"Filtered {diff} images out of {len(images)} due to matching hashes.") | |
return dedup_images | |
def get_images(self, urls: List[str]): | |
start = time.time() | |
images = asyncio.run(self._gather_download_tasks(urls)) | |
end = time.time() | |
duration = end - start | |
print(f"Download time: {round(duration, 2)} s", | |
f"Download time per image: {round(duration/len(urls) * 1000, 0)} ms", | |
sep="\n") | |
return asyncio.run(self._gather_download_tasks(urls)) | |
async def _gather_download_tasks(self, urls: List[str]): | |
async def _process_download(url: str, client: httpx.AsyncClient): | |
response = await client.get(url) | |
return Image.open(BytesIO(response.content)) | |
async with httpx.AsyncClient() as client: | |
tasks = [_process_download(url, client) for url in urls] | |
return await asyncio.gather(*tasks) | |
def show(self, images: List[PIL.Image.Image], valid_probas: np.ndarray, n_cols: int = 3, | |
title: Optional[str] = None, threshold: Optional[float] = None): | |
if threshold is not None: | |
prediction = self.apply_threshold(valid_probas, threshold) | |
title_scores = [f"Valid: {pred.squeeze()}" for pred in prediction] | |
else: | |
prediction = np.round(valid_probas[:, 0], 2) | |
title_scores = [f"Valid: {pred:.2f}" for pred in prediction] | |
n_images = len(images) | |
n_rows = int(np.ceil(n_images / n_cols)) | |
fig, axes = plt.subplots(n_rows, n_cols, figsize=(16, 16)) | |
for i, ax in enumerate(axes.ravel()): | |
ax.axis("off") | |
try: | |
ax.imshow(images[i]) | |
ax.set_title(title_scores[i]) | |
except IndexError: | |
continue | |
if title: | |
fig.suptitle(title) | |
fig.tight_layout() | |
return |