import argparse import torch import os import json import random import numpy as np from tqdm import tqdm import shortuuid from tinyllava.utils import * from tinyllava.data import * from tinyllava.model import * from PIL import Image import math def split_list(lst, n): """Split a list into n (roughly) equal-sized chunks""" chunk_size = math.ceil(len(lst) / n) # integer division return [lst[i : i + chunk_size] for i in range(0, len(lst), chunk_size)] def get_chunk(lst, n, k): chunks = split_list(lst, n) return chunks[k] def parse_multi_choice_response(response, all_choices, index2ans): """ Parse the prediction from the generated response. Return the predicted index e.g., A, B, C, D. """ for char in [",", ".", "!", "?", ";", ":", "'"]: response = response.strip(char) response = " " + response + " " # add space to avoid partial match index_ans = True ans_with_brack = False candidates = [] for choice in all_choices: # e.g., (A) (B) (C) (D) if f"({choice})" in response: candidates.append(choice) ans_with_brack = True if len(candidates) == 0: for choice in all_choices: # e.g., A B C D if f" {choice} " in response: candidates.append(choice) # if all above doesn't get candidates, check if the content is larger than 5 tokens and try to parse the example if len(candidates) == 0 and len(response.split()) > 5: for index, ans in index2ans.items(): if ans.lower() in response.lower(): candidates.append(index) index_ans = False # it's content ans. if len(candidates) == 0: # still not get answer, randomly choose one. pred_index = random.choice(all_choices) elif len(candidates) > 1: start_indexes = [] if index_ans: if ans_with_brack: for can in candidates: index = response.rfind(f"({can})") start_indexes.append(index) # -1 will be ignored anyway # start_indexes = [generated_response.index(f'({can})') for can in candidates] else: for can in candidates: index = response.rfind(f" {can} ") start_indexes.append(index) else: for can in candidates: index = response.lower().rfind(index2ans[can].lower()) start_indexes.append(index) # get the last one pred_index = candidates[np.argmax(start_indexes)] else: # if only one candidate, use it. pred_index = candidates[0] return pred_index def eval_model(args): # Model disable_torch_init() model_path = os.path.expanduser(args.model_path) model, tokenizer, image_processor, context_len = load_pretrained_model(model_path) text_processor = TextPreprocess(tokenizer, args.conv_mode) data_args = model.config image_processor = ImagePreprocess(image_processor, data_args) questions = json.load(open(os.path.expanduser(args.question_file), "r")) questions = get_chunk(questions, args.num_chunks, args.chunk_idx) answers_file = os.path.expanduser(args.answers_file) os.makedirs(os.path.dirname(answers_file), exist_ok=True) ans_file = open(answers_file, "w") model.to(device="cuda") for i, line in enumerate(tqdm(questions)): idx = line["id"] question = line["prompt"] if "image" in line: image_file = line["image"] # image = Image.open(image_file).convert("RGB") image = Image.open(os.path.join(args.image_folder, image_file)).convert("RGB") image_sizes = [image.size] image = image_processor(image) images = image.unsqueeze(0).half().cuda() question = "" + "\n" + question else: images = None image_sizes = None msg = Message() msg.add_message(question) # print(msg.messages) result = text_processor(msg.messages, mode='eval') # print(result["prompt"]) input_ids = result['input_ids'] input_ids = input_ids.unsqueeze(0).cuda() with torch.inference_mode(): if images is not None: output_ids = model.generate( input_ids, images=images, image_sizes=image_sizes, do_sample=True if args.temperature > 0 else False, temperature=args.temperature, max_new_tokens=1024, use_cache=True, pad_token_id=tokenizer.pad_token_id, ) outputs = tokenizer.batch_decode(output_ids, skip_special_tokens=True)[0] else: if line["question_type"] == "multiple-choice": all_choices = line["all_choices"] outputs = random.choice(all_choices) else: outputs = "INVALID GENERATION FOR MULTIPLE IMAGE INPUTS" if line["question_type"] == "multiple-choice": pred_ans = parse_multi_choice_response( outputs, line["all_choices"], line["index2ans"] ) else: # open question pred_ans = outputs # print(outputs, pred_ans) ans_id = shortuuid.uuid() ans_file.write(json.dumps({"question_id": idx, "prompt": questions, "text": pred_ans, "answer_id": ans_id, "model_id": args.model_path.split("/")[-1], "metadata": {}}) + "\n") ans_file.flush() ans_file.close() if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--model-path", type=str, default="facebook/opt-350m") parser.add_argument("--model-base", type=str, default=None) parser.add_argument("--image-folder", type=str, default="") parser.add_argument("--question-file", type=str, default="tables/question.json") parser.add_argument("--answers-file", type=str, default="answer.jsonl") parser.add_argument("--conv-mode", type=str, default="llama") parser.add_argument("--num-chunks", type=int, default=1) parser.add_argument("--chunk-idx", type=int, default=0) parser.add_argument("--temperature", type=float, default=0.2) parser.add_argument("--answer-prompter", action="store_true") parser.add_argument("--image_aspect_ratio", type=str, default="pad") args = parser.parse_args() eval_model(args)