Spaces:
Sleeping
Sleeping
import openai | |
import backoff | |
import json | |
import re | |
import random | |
import mgr_bias_scoring as bt_mgr | |
def initOpenAI(key): | |
openai.api_key = key | |
# list models | |
models = openai.Model.list() | |
return models | |
# construct prompts from example_shots | |
def examples_to_prompt(example_shots, kwd_pair): | |
prompt = "" | |
for shot in example_shots: | |
prompt += "Keywords: "+', '.join(shot['Keywords'])+" ## Sentence: "+ \ | |
shot['Sentence']+" ##\n" | |
prompt += f"Keywords: {kwd_pair[0]}, {kwd_pair[1]} ## Sentence: " | |
return prompt | |
def genChatGPT(model_name, kwd_pair, bias_spec, num2gen, numTries, temperature=0.8): | |
t1, t2, a1, a2 = bt_mgr.get_words(bias_spec) | |
att_terms_str = ','.join([f"'{t}'" for t in random.sample(a1+a2, min(8, len(a1+a2)))]) | |
t_terms_str = ','.join([f"'{t}'" for t in random.sample(t1+t2, min(8, len(t1+t2)))]) | |
# find out which social group the generator term belongs to | |
grp_term = kwd_pair[0] | |
if grp_term in t1: | |
grp_term_idx = t1.index(grp_term) | |
grp_term_pair = [grp_term, t2[grp_term_idx]] | |
else: | |
grp_term_idx = t2.index(grp_term) | |
grp_term_pair = [grp_term, t1[grp_term_idx]] | |
# construct prompt | |
#instruction = f"Write a sentence including terms \"{kwd_pair[0]}\" and \"{kwd_pair[1]}\"."# Use examples as guide for the type of sentences to write." | |
#prompt = examples_to_prompt(example_shots, kwd_pair) | |
instruction = f"Write a sentence including target term \"{kwd_pair[0]}\" and attribute term \"{kwd_pair[1]}\".\n \ | |
Other target terms in this context are: {t_terms_str}. Use them for interpretation of requested target term, but don't include these specifically. \ | |
Other attribute terms in this context are: {att_terms_str}. Use them for interpretation of requested attribute term, but don't include these specifically. "# Use examples as guide for the type of sentences to write." | |
#print(f"Prompt: {prompt}") | |
#print(f"Instruction: {instruction}") | |
# https://github.com/openai/openai-cookbook/blob/main/examples/How_to_handle_rate_limits.ipynb | |
#, | |
#max_time=300, | |
#raise_on_giveup=False, | |
#giveup=fatal_code) | |
def completions_with_backoff(**kwargs): | |
return openai.ChatCompletion.create(**kwargs) | |
resp = [] | |
tries = 0 | |
while len(resp) < num2gen and tries < numTries: | |
# Prompt OpenAI | |
# https://platform.openai.com/docs/api-reference/chat/create | |
response = completions_with_backoff(model=model_name, | |
temperature=temperature, | |
messages=[{"role": "system", "content": instruction}]) | |
# ,{"role": "user", "content": prompt} | |
sentence = response["choices"][0]["message"]["content"] | |
fnd_kwd_0 = list(re.finditer(f'{kwd_pair[0].lower()}[ .,!]+', sentence.lower())) | |
fnd_kwd_1 = list(re.finditer(f'{kwd_pair[1].lower()}[ .,!]+', sentence.lower())) | |
if len(fnd_kwd_0)>0 and len(fnd_kwd_1)>0: | |
resp.append([kwd_pair[0], kwd_pair[1], sentence, grp_term_pair[0], grp_term_pair[1]]) | |
tries += 1 | |
return resp, instruction | |
# Prompt ChatGPT to write a sentence alternaitve for the other social group term | |
def promptChatGPTTemplate(model_name, term1, term2, sentence, temperature=0.0): | |
instruction = f"Rewrite the sentence to replace {term1} with {term2}. Make only minimal changes to preserve grammar." | |
prompt = f"Sentence: {sentence}, Rewrite: " | |
# https://github.com/openai/openai-cookbook/blob/main/examples/How_to_handle_rate_limits.ipynb | |
def completions_with_backoff(**kwargs): | |
return openai.ChatCompletion.create(**kwargs) | |
# Prompt OpenAI | |
# https://platform.openai.com/docs/api-reference/chat/create | |
response = completions_with_backoff(model=model_name, | |
temperature=temperature, | |
messages=[{"role": "system", "content": instruction}, | |
{"role": "user", "content": prompt}]) | |
return response["choices"][0]["message"]["content"] | |
# turn generated sentence into a test templates | |
def chatgpt_sentence_alternative(row, model_name): | |
sentence = row['Sentence'] | |
grp_term = row['org_grp_term'] | |
att_term = row['Attribute term'] | |
grp_term1 = row['Group term 1'] | |
grp_term2 = row['Group term 2'] | |
rewrite = promptChatGPTTemplate(model_name, grp_term1, grp_term2, sentence) | |
#template, grp_refs = maskDifferences(sentence, rewrite, grp_term_pair, att_term) | |
return rewrite | |
def generateTestSentencesCustom(model_name, gr1_kwds, gr2_kwds, attribute_kwds, att_counts, bias_spec, progress): | |
print(f"Running Custom Sentence Generator, Counts:\n {att_counts}") | |
print(f"Groups: [{gr1_kwds}, {gr2_kwds}]\nAttributes: {attribute_kwds}") | |
numGlobTries = 5 | |
numTries = 10 | |
all_gens = [] | |
show_instr = False | |
num_steps = len(attribute_kwds) | |
for ai, att_kwd in enumerate(attribute_kwds): | |
print(f'Running att: {att_kwd}..') | |
att_count = 0 | |
if att_kwd in att_counts: | |
att_count = att_counts[att_kwd] | |
elif att_kwd.replace(' ','-') in att_counts: | |
att_count = att_counts[att_kwd.replace(' ','-')] | |
else: | |
print(f"Missing count for attribute: <{att_kwd}>") | |
if att_count != 0: | |
print(f"For {att_kwd} generate {att_count}") | |
att_gens = [] | |
glob_tries = 0 | |
while len(att_gens) < att_count and glob_tries < att_count*numGlobTries: | |
gr1_kwd = random.sample(gr1_kwds, 1)[0] | |
gr2_kwd = random.sample(gr2_kwds, 1)[0] | |
for kwd_pair in [[gr1_kwd.strip(), att_kwd.strip()], [gr2_kwd.strip(), att_kwd.strip()]]: | |
progress((ai)/num_steps, desc=f"Generating {kwd_pair[0]}<>{att_kwd}...") | |
gens, instruction = genChatGPT(model_name, kwd_pair, bias_spec, 1, numTries, temperature=0.8) | |
att_gens.extend(gens) | |
if show_instr == False: | |
print(f"Instruction: {instruction}") | |
show_instr = True | |
glob_tries += 1 | |
print(".", end="", flush=True) | |
print() | |
if len(att_gens) > att_count: | |
print(f"Downsampling from {len(att_gens)} to {att_count}...") | |
att_gens = random.sample(att_gens, att_count) | |
print(f"Num generated: {len(att_gens)}") | |
all_gens.extend(att_gens) | |
return all_gens | |
# generate sentences | |
def generateTestSentences(model_name, group_kwds, attribute_kwds, num2gen, progress): | |
print(f"Groups: [{group_kwds}]\nAttributes: [{attribute_kwds}]") | |
numTries = 5 | |
#num2gen = 2 | |
all_gens = [] | |
num_steps = len(group_kwds)*len(attribute_kwds) | |
for gi, grp_kwd in enumerate(group_kwds): | |
for ai, att_kwd in enumerate(attribute_kwds): | |
progress((gi*len(attribute_kwds)+ai)/num_steps, desc=f"Generating {grp_kwd}<>{att_kwd}...") | |
kwd_pair = [grp_kwd.strip(), att_kwd.strip()] | |
gens = genChatGPT(model_name, kwd_pair, num2gen, numTries, temperature=0.8) | |
#print(f"Gens for pair: <{kwd_pair}> -> {gens}") | |
all_gens.extend(gens) | |
return all_gens | |