Spaces:
Sleeping
Sleeping
import argparse | |
import json | |
import os | |
import openai | |
import time | |
NUM_SECONDS_TO_SLEEP = 0.5 | |
def get_eval(content: str, max_tokens: int): | |
while True: | |
try: | |
response = openai.ChatCompletion.create( | |
model='gpt-4-0314', | |
messages=[{ | |
'role': 'system', | |
'content': 'You are a helpful and precise assistant for checking the quality of the answer.' | |
}, { | |
'role': 'user', | |
'content': content, | |
}], | |
temperature=0.2, # TODO: figure out which temperature is best for evaluation | |
max_tokens=max_tokens, | |
) | |
break | |
except openai.error.RateLimitError: | |
pass | |
except Exception as e: | |
print(e) | |
time.sleep(NUM_SECONDS_TO_SLEEP) | |
return response['choices'][0]['message']['content'] | |
def parse_score(review): | |
try: | |
score_pair = review.split('\n')[0] | |
score_pair = score_pair.replace(',', ' ') | |
sp = score_pair.split(' ') | |
if len(sp) == 2: | |
return [float(sp[0]), float(sp[1])] | |
else: | |
print('error', review) | |
return [-1, -1] | |
except Exception as e: | |
print(e) | |
print('error', review) | |
return [-1, -1] | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description='ChatGPT-based QA evaluation.') | |
parser.add_argument('-q', '--question') | |
parser.add_argument('-c', '--context') | |
parser.add_argument('-a', '--answer-list', nargs='+', default=[]) | |
parser.add_argument('-r', '--rule') | |
parser.add_argument('-o', '--output') | |
parser.add_argument('--max-tokens', type=int, default=1024, help='maximum number of tokens produced in the output') | |
args = parser.parse_args() | |
f_q = open(os.path.expanduser(args.question)) | |
f_ans1 = open(os.path.expanduser(args.answer_list[0])) | |
f_ans2 = open(os.path.expanduser(args.answer_list[1])) | |
rule_dict = json.load(open(os.path.expanduser(args.rule), 'r')) | |
if os.path.isfile(os.path.expanduser(args.output)): | |
cur_reviews = [json.loads(line) for line in open(os.path.expanduser(args.output))] | |
else: | |
cur_reviews = [] | |
review_file = open(f'{args.output}', 'a') | |
context_list = [json.loads(line) for line in open(os.path.expanduser(args.context))] | |
image_to_context = {context['image']: context for context in context_list} | |
handles = [] | |
idx = 0 | |
for ques_js, ans1_js, ans2_js in zip(f_q, f_ans1, f_ans2): | |
ques = json.loads(ques_js) | |
ans1 = json.loads(ans1_js) | |
ans2 = json.loads(ans2_js) | |
inst = image_to_context[ques['image']] | |
cap_str = '\n'.join(inst['captions']) | |
box_str = '\n'.join([f'{instance["category"]}: {instance["bbox"]}' for instance in inst['instances']]) | |
category = json.loads(ques_js)['category'] | |
if category in rule_dict: | |
rule = rule_dict[category] | |
else: | |
assert False, f"Visual QA category not found in rule file: {category}." | |
prompt = rule['prompt'] | |
role = rule['role'] | |
content = (f'[Context]\n{cap_str}\n\n{box_str}\n\n' | |
f'[Question]\n{ques["text"]}\n\n' | |
f'[{role} 1]\n{ans1["text"]}\n\n[End of {role} 1]\n\n' | |
f'[{role} 2]\n{ans2["text"]}\n\n[End of {role} 2]\n\n' | |
f'[System]\n{prompt}\n\n') | |
cur_js = { | |
'id': idx+1, | |
'question_id': ques['question_id'], | |
'answer1_id': ans1.get('answer_id', ans1['question_id']), | |
'answer2_id': ans2.get('answer_id', ans2['answer_id']), | |
'category': category | |
} | |
if idx >= len(cur_reviews): | |
review = get_eval(content, args.max_tokens) | |
scores = parse_score(review) | |
cur_js['content'] = review | |
cur_js['tuple'] = scores | |
review_file.write(json.dumps(cur_js) + '\n') | |
review_file.flush() | |
else: | |
print(f'Skipping {idx} as we already have it.') | |
idx += 1 | |
print(idx) | |
review_file.close() | |