import torch, re, shutil, tempfile, os from transformers import AutoModelForSequenceClassification, AutoTokenizer from torch.nn import Softmax import huggingface_hub from PIL import Image from torchvision import transforms, models from torch import nn from collections import Counter from typing import List, Dict import concurrent.futures class BaseModel: def inference(self, *, image: Image = None, prompt: str = None): pass class ImageRaterModel(BaseModel): """ A class representing an image rating model. This class encapsulates a deep learning model for rating images into predefined categories. It provides methods for loading the model, preprocessing images, and making predictions. Attributes: repo_id (str): The identifier of the Hugging Face repository containing the model. model_id (str): The identifier of the specific model to be loaded. image_transform (torchvision.transforms.Compose): A sequence of image transformations to be applied to input images. num_classes (int): The number of rating classes/categories. class_names (List[str]): A list of human-readable names corresponding to each rating class. device (torch.device): The device (CPU or GPU) on which the model will be loaded and inference will be performed. Methods: __init__: Initializes the image rating model. get_architecture: Returns the architecture name of the loaded model. Currently supports resnet18 and resnet50 preprocess_image_object: Preprocesses an input image for model inference. inference: Performs inference on a single input image and returns the predicted rating class. load_model: Loads the deep learning model from the Hugging Face repository. """ def __init__(self, repo_id: str, model_id: str, image_transform: transforms = transforms.Compose([transforms.Resize((256, 256)), transforms.CenterCrop((224, 224)), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])]), num_classes: int = 5, class_names: List[str] = ["PG", "PG13", "R", "X", "XXX"], device: torch.device = torch.device('cpu'))-> nn.Module: self.repo_id = repo_id self.model_id = model_id self.num_classes = num_classes self.transform = image_transform self.device = device self.model = self.load_model() self.model.to(device) self.class_names = ["PG", "PG13", "R", "X", "XXX"] def get_architecture(self) -> str: """ returns the arictecture of the loaded model as string """ if 'resnet18' in self.model_id.lower(): return 'resnet18' elif 'resnet50' in self.model_id.lower(): return 'resnet50' else: raise ValueError("Unsupported architecture. Please specifiy 'resnet18' or 'resnet50'") def preprocess_image_object(self, imageObject: Image) -> torch.Tensor: """ Does the same preprocessing as the validation dataset for model training NOTE: THIS IS FOR RESNET18_100EPOCHS_MAXV2 """ if imageObject.mode == 'RGBA': imageObject = imageObject.convert("RGB") image = self.transform(imageObject).unsqueeze(0) return image def inference(self, *, image: Image = None, prompt: str = None) -> str: """ Similar to the batch_inference but for a single image object """ if image is None: raise ValueError("Image must be defined") self.model.eval() # Set model to evaluation mode image = self.preprocess_image_object(image) image = image.to(self.device) with torch.no_grad(): # No need to compute gradients during inference output = self.model(image) _, prediction = torch.max(output, 1) predicted_class = self.class_names[prediction.item()] return predicted_class def load_model(self) -> nn.Module: ##Keep load model """ Loads model specific architecture """ dl_file = huggingface_hub.hf_hub_download( repo_id = self.repo_id, filename = 'best_model_params.pt', subfolder = f'models/{self.model_id}' ) tempDir = tempfile.TemporaryDirectory() temp_dir_path = tempDir.name path_to_weights = os.path.join(temp_dir_path, "best_model_params.pt") shutil.copy(dl_file, path_to_weights) if 'resnet18' in self.model_id.lower(): model = models.resnet18(weights = 'IMAGENET1K_V1') elif 'resnet50' in self.model_id.lower(): model = models.resnet50(weights = 'IMAGENET1K_V1') else: raise ValueError("Unsupported architecture. Please specifiy 'resnet18' or 'resnet50'") num_ftrs = model.fc.in_features model.fc = nn.Linear(num_ftrs, self.num_classes) model.load_state_dict(torch.load(path_to_weights, map_location = self.device)) return model class PromptTransformerRaterModel(BaseModel): """ A class representing a transformer-based model for rating prompts into PG, PG13, R, X, and XXX categories This class encapsulates a transformer-based model for rating prompts or text inputs into predefined categories. It provides methods for loading the model, preprocessing text inputs, and making predictions. Attributes: repo_id (str): The identifier of the Hugging Face repository containing the model. model_id (str): The identifier of the specific model to be loaded. device (torch.device): The device (CPU or GPU) on which the model will be loaded and inference will be performed. Methods: __init__: Initializes the transformer-based rating model. load_model: Downloads and loads the pre-trained transformer model from the Hugging Face repository. clean_text: Cleans input text data by removing extraneous characters and spaces. inference: Performs inference on input text data using the transformer model and returns the predicted rating. """ def __init__(self, repo_id: str, model_id: str, model_directory: str|None = None, device: torch.device = torch.device('cpu')): self.repo_id = repo_id self.model_id = model_id if model_directory is None: tempDir = tempfile.TemporaryDirectory() self.model_directory = tempDir.name else: self.model_directory = model_directory self.load_model() self.model = AutoModelForSequenceClassification.from_pretrained( self.model_directory ) self.tokenizer = AutoTokenizer.from_pretrained( self.model_directory ) self.device = device self.model.to(device) self.softmax = Softmax(dim=1) def load_model(self) -> None: """ Downloads the files for the transformer model - may end up neglecting this and creating custom repos on HF for prompt models so we don't need to save files locally """ for file in ['config.json', 'model.safetensors', 'tokenizer_config.json','special_tokens_map.json', 'vocab.txt', 'vocab.json', 'merges.txt', 'tokenizer.json',]: try: dl_file = huggingface_hub.hf_hub_download( repo_id = self.repo_id, filename = file, subfolder = f'models/{self.model_id}' ) shutil.copy(dl_file, os.path.join(self.model_directory,file)) except Exception as e: # raise LookupError(f"file error {file} raised exception {e}") continue return None @staticmethod def clean_text(text: str) -> str: """ This method cleans prompt data, removing extraneous punctuation meant to denote blending, loras, or models without removing names or tags. We also get rid of extraneous spaces or line breaks to reduce tokens and maintain as much semantic logic as possible """ text = str(text) # Remove additional characters: ( ) : < > [ ] cleaned_text = re.sub(r'[():<>[\]]', ' ', text) cleaned_text = cleaned_text.replace('\n', ' ') # Replace multiple spaces with a single space cleaned_text = re.sub(r'\s+', ' ', cleaned_text) cleaned_text = re.sub(r'\s*,\s*', ', ', cleaned_text) return cleaned_text.strip() def inference(self, *, image: Image = None, prompt: str = None) -> str: """ Does inference on prompt data using the transformer model """ if prompt is None: raise ValueError("Prompt must be defined") text = self.clean_text(prompt) tokens = self.tokenizer(text, max_length = 512, truncation = True, padding = 'max_length', return_tensors = 'pt') with torch.no_grad(): for key in tokens: tokens[key] = tokens[key].to(self.device) outputs = self.model(**tokens) logits = outputs.logits probs = self.softmax(logits) _, pred = torch.max(probs,1) pred = pred.item() return self.model.config.id2label[pred] class MovieRaterModel(BaseModel): """ A class representing a movie rating model that combines multiple sub-models. This class combines multiple sub-models, including image-based and text-based rating models, to provide a comprehensive rating system for movies. It allows for the integration of various rating models into a single interface and provides methods for making predictions based on input prompts and images. Attributes: repo_id (str): The identifier of the Hugging Face repository containing the sub-models. models (List[str]): A list of identifiers for the sub-models to be loaded. device (torch.device): The device (CPU or GPU) on which the sub-models will be loaded and inference will be performed. mixtureDict (Dict[str|nn.Module]): A dictionary containing the loaded sub-models. Methods: __init__: Initializes the movie rating model and loads the sub-models. load_model: Loads the sub-models specified in the models list and populates the mixtureDict. inference_voting: Performs voting-based inference to determine the most common prediction among the sub-models. inference: Makes predictions for movie ratings based on input prompts and images using the loaded sub-models. """ def __init__(self, repo_id: str, mixtureDict: dict = {}, models: List[str] = ['baseresNet18', 'baseresNet50', 'bestresNet50', 'promptMovieBert','promptMovieRoberta'], device: torch.device = torch.device('cpu')): self.repo_id = repo_id self.models = models self.device = device self.mixtureDict = mixtureDict self.mixtureDict = self.load_model() def load_model(self) -> Dict[str,nn.Module]: """ Use established classes to load their models and populate the mixtureDict """ for model in self.models: if 'resnet' in model.lower(): self.mixtureDict[model] = ImageRaterModel(self.repo_id, model, device = self.device) elif 'prompt' in model.lower(): self.mixtureDict[model] = PromptTransformerRaterModel(self.repo_id, model, device = self.device) return self.mixtureDict @staticmethod def inference_voting(mylist: List[int]) -> int: """ A function used to determine the most common pred among the N-odd models in cases of tie, returns the most conservative answer """ counter = Counter(mylist) most_common = counter.most_common() most_common_element = sorted(Counter(mylist).most_common(), key = lambda x: (x[1], x[0]))[-1][0] return most_common_element @staticmethod def inference_worker(model, *,image: Image = None, prompt: str = None) -> int: """ Worker function to perform inference using a single model """ if isinstance(model, ImageRaterModel): return model.inference(image = image, prompt = prompt) elif isinstance(model, PromptTransformerRaterModel): return model.inference(image = image, prompt = prompt) def inference(self, *,image: Image = None, prompt: str = None) -> str: """ Uses class specific inference for individual preds and then calls inference_voting to return the most common pred """ if image is None or prompt is None: raise ValueError("Image AND Prompt must be defined") with concurrent.futures.ThreadPoolExecutor() as executor: # Submit inference tasks for all models futures = [executor.submit(self.inference_worker, model, image = image, prompt = prompt) for model in self.mixtureDict.values()] # Get results as they become available results = [future.result() for future in concurrent.futures.as_completed(futures)] preds = results label2id = {} id2label = {} for name, model in self.mixtureDict.items(): if 'prompt' in name.lower() and label2id == {}: label2id = model.model.config.label2id id2label = model.model.config.id2label break return id2label[self.inference_voting([label2id[i] for i in preds])]