stealth-edits / evaluation /py /eval_utils_counterfact.py
qinghuazhou
Initial commit
85e172b
"""
Contains evaluation utilities for pytorch-based rewriting methods.
To use, simply call `compute_rewrite_quality_counterfact` with the
appropriate arguments, which returns a dictionary containing them.
Script from memit ROME implementation
MIT License
Copyright (c) 2022 Kevin Meng
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import typing
from itertools import chain
import nltk
import numpy as np
import scipy
import torch
from sklearn.feature_extraction.text import TfidfVectorizer
from transformers import AutoModelForCausalLM, AutoTokenizer
from util.generate import generate_fast
def perplexity(
model: AutoModelForCausalLM,
tok: AutoTokenizer,
text: str,
max_input_length: int = None,
):
"""
Computes perplexity of a piece of text, measured on a reference model.
Text is truncated to max_input_length tokens.
"""
inputs = tok(
[text], return_tensors="pt", max_length=max_input_length, truncation=True
).to("cuda")
logits = torch.nn.functional.log_softmax(model(**inputs).logits, dim=2)
log_probs = torch.gather(logits[:, :-1, :], 2, inputs["input_ids"][:, 1:, None])[0]
# Perplexity = exp(-1/N * log P(x_1, ..., x_n))
return torch.exp(-1 / inputs["input_ids"].size(1) * log_probs.sum()).item()
def compute_rewrite_quality_counterfact(
model: AutoModelForCausalLM,
tok: AutoTokenizer,
record: typing.Dict,
vec: TfidfVectorizer,
) -> typing.Dict:
"""
Given a rewritten model, computes generalization and specificity metrics for
the desired rewrite (passed in via the CounterFact dataset record). Returns a
dictionary containing those metrics.
:param model: Rewritten model
:param tok: Tokenizer
:param record: CounterFact dataset record
:param vec: ???
:return: Dictionary containing rewriting metrics
"""
# First, unpack rewrite evaluation record.
subject, target_new, target_true = (
record["requested_rewrite"][x] for x in ["subject", "target_new", "target_true"]
)
rewrite_prompts = [record["requested_rewrite"]["prompt"].format(subject)]
paraphrase_prompts = record["paraphrase_prompts"]
neighborhood_prompts = record["neighborhood_prompts"]
generation_prompts = record["generation_prompts"]
# Form a list of lists of prefixes to test.
prob_prompts = [
rewrite_prompts,
paraphrase_prompts,
neighborhood_prompts,
]
which_correct = [
[0 for _ in range(len(rewrite_prompts))],
[0 for _ in range(len(paraphrase_prompts))],
[1 for _ in range(len(neighborhood_prompts))],
]
# Flatten all the evaluated prefixes into one list.
probs, targets_correct = test_batch_prediction(
model,
tok,
list(chain(*prob_prompts)),
list(chain(*which_correct)),
target_new["str"],
target_true["str"],
)
# Unflatten the results again into a list of lists.
cutoffs = [0] + np.cumsum(list(map(len, prob_prompts))).tolist()
ret_probs = [probs[cutoffs[i - 1] : cutoffs[i]] for i in range(1, len(cutoffs))]
ret_corrects = [
targets_correct[cutoffs[i - 1] : cutoffs[i]] for i in range(1, len(cutoffs))
]
# Structure the results as a dictionary.
ret = {
f"{key}_probs": ret_probs[i]
for i, key in enumerate(
[
"rewrite_prompts",
"paraphrase_prompts",
"neighborhood_prompts",
]
)
} | {
f"{key}_correct": ret_corrects[i]
for i, key in enumerate(
[
"rewrite_prompts",
"paraphrase_prompts",
"neighborhood_prompts",
]
)
}
return ret
def test_batch_prediction(
model,
tok,
prefixes: typing.List[str],
which_correct: str,
target_new: str,
target_true: str,
):
"""
which_correct: Which target to consider correct. Either 0 for "new" or 1 for "true".
"""
# prefix_lens = [len(n) for n in tok(prefixes)["input_ids"]]
prefix_lens = [len(n) for n in tok(prefixes, add_special_tokens=False)["input_ids"]]
prompt_tok = tok(
[
f"{prefix} {suffix}"
for prefix in prefixes
for suffix in [target_new, target_true]
],
padding=True,
return_tensors="pt",
).to("cuda")
# a_tok, b_tok = (tok(f" {n}")["input_ids"] for n in [target_new, target_true])
a_tok, b_tok = (tok(f" {n}", add_special_tokens=False)["input_ids"] for n in [target_new, target_true])
choice_a_len, choice_b_len = (len(n) for n in [a_tok, b_tok])
with torch.no_grad():
logits = model(**prompt_tok).logits
probs = np.zeros((logits.size(0),), dtype=np.float32)
targets_correct = []
for i in range(logits.size(0)):
cur_len = choice_a_len if i % 2 == 0 else choice_b_len
# additional indices to account for weird tokenizers (like that of gemma) which pads in front instead of back!
additional = len(prompt_tok['attention_mask'][i][:torch.where(prompt_tok['attention_mask'][i]==1)[0][0]])
if additional!=0: additional = additional + 1
# Compute suffix probabilities
for j in range(cur_len):
cur_tok = (a_tok if i % 2 == 0 else b_tok)[j]
probs[i] += -torch.nn.functional.log_softmax(
logits[i, additional + prefix_lens[i // 2] + j - 1, :], dim=0
)[cur_tok].item()
probs[i] /= cur_len
# Compute accuracy on new targets
if (which_correct[i // 2] == 0 and i % 2 == 0) or (
which_correct[i // 2] == 1 and i % 2 == 1
):
correct = True
for j in range(cur_len):
cur_tok = (a_tok if i % 2 == 0 else b_tok)[j]
if logits[i, additional + prefix_lens[i // 2] + j - 1, :].argmax().item() != cur_tok:
correct = False
break
targets_correct.append(correct)
return [
{"target_new": probs[i].item(), "target_true": probs[i + 1].item()}
for i in range(0, len(probs), 2)
], targets_correct
def test_generation(
model,
tok,
prefixes: typing.List[str],
consistency_texts: typing.List[str],
essence_texts: typing.List[str],
vec: TfidfVectorizer,
):
gen_texts = generate_fast(
model,
tok,
prefixes,
n_gen_per_prompt=1,
max_out_len=100,
)
ngram_entropy = n_gram_entropy(gen_texts)
consistency_tfidf = tfidf_similarity(
" ".join(gen_texts), " ".join(consistency_texts), vec
)
ret = {
"ngram_entropy": ngram_entropy,
"reference_score": consistency_tfidf,
"text": gen_texts,
}
if len(essence_texts) > 0:
ppl = perplexity(model, tok, " ".join(essence_texts), max_input_length=100)
ret.update({"essence_score": ppl, "essence_text": essence_texts})
return ret
def n_gram_entropy(gen_texts, agg="arith"):
assert agg in ["arith", "geom"]
return (scipy.stats.mstats.gmean if agg == "geom" else np.mean)(
[compute_n_gram_entropy(txt) for txt in gen_texts]
).item()
def compute_n_gram_entropy(sentence, ns=None, weights=None, agg="arith"):
if ns is None:
ns = [2, 3]
if weights is None:
weights = [2 / 3, 4 / 3]
assert agg in ["arith", "geom"]
entropy_list = []
for n in ns:
fdist = compute_freq(sentence, n)
freqs = np.array([freq for _, freq in fdist.items()])
freqs = freqs / freqs.sum()
entropy_list.append(np.sum(-freqs * np.log(freqs) / np.log(2)))
entropy_list = np.array(entropy_list) * np.array(weights)
return (scipy.stats.mstats.gmean if agg == "geom" else np.mean)(entropy_list)
def compute_freq(sentence, n=2):
tokens = nltk.word_tokenize(sentence)
ngrams = nltk.ngrams(tokens, n)
return nltk.FreqDist(ngrams)
def tfidf_similarity(text_a, text_b, vec):
encs = vec.transform([text_a, text_b]).A
norm = np.linalg.norm
return (np.dot(encs[0], encs[1]) / norm(encs[0]) / norm(encs[1])).item()