#!/usr/bin/env python3 """ T5 This code a slight modification of perplexity by hugging face https://huggingface.co/docs/transformers/perplexity Both this code and the orignal code are published under the MIT license. by Burhan Ul tayyab and Nicholas Chua """ import time import torch import itertools import math import numpy as np import random import re import transformers from transformers import GPT2LMHeadModel, GPT2TokenizerFast from transformers import pipeline from transformers import T5Tokenizer from transformers import AutoTokenizer, BartForConditionalGeneration from collections import OrderedDict from scipy.stats import norm from difflib import SequenceMatcher from multiprocessing.pool import ThreadPool def similar(a, b): return SequenceMatcher(None, a, b).ratio() def normCdf(x): return norm.cdf(x) def likelihoodRatio(x, y): return normCdf(x)/normCdf(y) torch.manual_seed(0) np.random.seed(0) # find a better way to abstract the class class GPT2PPLV2: def __init__(self, device="cpu", model_id="gpt2-medium"): self.device = device self.model_id = model_id self.model = GPT2LMHeadModel.from_pretrained(model_id).to(device) self.tokenizer = GPT2TokenizerFast.from_pretrained(model_id) self.max_length = self.model.config.n_positions self.stride = 51 self.threshold = 0.7 self.t5_model = transformers.AutoModelForSeq2SeqLM.from_pretrained("t5-large").to(device) self.t5_tokenizer = T5Tokenizer.from_pretrained("t5-large", model_max_length=512) def apply_extracted_fills(self, masked_texts, extracted_fills): texts = [] for idx, (text, fills) in enumerate(zip(masked_texts, extracted_fills)): tokens = list(re.finditer("", text)) if len(fills) < len(tokens): continue offset = 0 for fill_idx in range(len(tokens)): start, end = tokens[fill_idx].span() text = text[:start+offset] + fills[fill_idx] + text[end+offset:] offset = offset - (end - start) + len(fills[fill_idx]) texts.append(text) return texts def unmasker(self, text, num_of_masks): num_of_masks = max(num_of_masks) stop_id = self.t5_tokenizer.encode(f"")[0] tokens = self.t5_tokenizer(text, return_tensors="pt", padding=True) for key in tokens: tokens[key] = tokens[key].to(self.device) output_sequences = self.t5_model.generate(**tokens, max_length=512, do_sample=True, top_p=0.96, num_return_sequences=1, eos_token_id=stop_id) results = self.t5_tokenizer.batch_decode(output_sequences, skip_special_tokens=False) texts = [x.replace("", "").replace("", "").strip() for x in results] pattern = re.compile("") extracted_fills = [pattern.split(x)[1:-1] for x in texts] extracted_fills = [[y.strip() for y in x] for x in extracted_fills] perturbed_texts = self.apply_extracted_fills(text, extracted_fills) return perturbed_texts def __call__(self, *args): version = args[-1] sentence = args[0] if version == "v1.1": return self.call_1_1(sentence, args[1]) elif version == "v1": return self.call_1(sentence) else: return "Model version not defined" #################################ppp############### # Version 1.1 apis ############################################### def replaceMask(self, text, num_of_masks): with torch.no_grad(): list_generated_texts = self.unmasker(text, num_of_masks) return list_generated_texts def isSame(self, text1, text2): return text1 == text2 # code took reference from https://github.com/eric-mitchell/detect-gpt def maskRandomWord(self, text, ratio): span = 2 tokens = text.split(' ') mask_string = '<<>>' n_spans = ratio//(span + 2) n_masks = 0 while n_masks < n_spans: start = np.random.randint(0, len(tokens) - span) end = start + span search_start = max(0, start - 1) search_end = min(len(tokens), end + 1) if mask_string not in tokens[search_start:search_end]: tokens[start:end] = [mask_string] n_masks += 1 # replace each occurrence of mask_string with , where NUM increments num_filled = 0 for idx, token in enumerate(tokens): if token == mask_string: tokens[idx] = f'' num_filled += 1 assert num_filled == n_masks, f"num_filled {num_filled} != n_masks {n_masks}" text = ' '.join(tokens) return text, n_masks def multiMaskRandomWord(self, text, ratio, n): mask_texts = [] list_num_of_masks = [] for i in range(n): mask_text, num_of_masks = self.maskRandomWord(text, ratio) mask_texts.append(mask_text) list_num_of_masks.append(num_of_masks) return mask_texts, list_num_of_masks def getGeneratedTexts(self, args): original_text = args[0] n = args[1] texts = list(re.finditer("[^\d\W]+", original_text)) ratio = int(0.3 * len(texts)) mask_texts, list_num_of_masks = self.multiMaskRandomWord(original_text, ratio, n) list_generated_sentences = self.replaceMask(mask_texts, list_num_of_masks) return list_generated_sentences def mask(self, original_text, text, n=2, remaining=100): """ text: string representing the sentence n: top n mask-filling to be choosen remaining: The remaining slots to be fill """ if remaining <= 0: return [] torch.manual_seed(0) np.random.seed(0) start_time = time.time() out_sentences = [] pool = ThreadPool(remaining//n) out_sentences = pool.map(self.getGeneratedTexts, [(original_text, n) for _ in range(remaining//n)]) out_sentences = list(itertools.chain.from_iterable(out_sentences)) end_time = time.time() return out_sentences def getVerdict(self, score): if score < self.threshold: return "This text is most likely written by an Human" else: return "This text is most likely generated by an A.I." def getScore(self, sentence): original_sentence = sentence sentence_length = len(list(re.finditer("[^\d\W]+", sentence))) # remaining = int(min(max(100, sentence_length * 1/9), 200)) remaining = 50 sentences = self.mask(original_sentence, original_sentence, n=50, remaining=remaining) real_log_likelihood = self.getLogLikelihood(original_sentence) generated_log_likelihoods = [] for sentence in sentences: generated_log_likelihoods.append(self.getLogLikelihood(sentence).cpu().detach().numpy()) if len(generated_log_likelihoods) == 0: return -1 generated_log_likelihoods = np.asarray(generated_log_likelihoods) mean_generated_log_likelihood = np.mean(generated_log_likelihoods) std_generated_log_likelihood = np.std(generated_log_likelihoods) diff = real_log_likelihood - mean_generated_log_likelihood score = diff/(std_generated_log_likelihood) return float(score), float(diff), float(std_generated_log_likelihood) def call_1_1(self, sentence, chunk_value): sentence = re.sub("\[[0-9]+\]", "", sentence) # remove all the [numbers] cause of wiki words = re.split("[ \n]", sentence) # if len(words) < 100: # return {"status": "Please input more text (min 100 words)"}, "Please input more text (min 100 characters)", None groups = len(words) // chunk_value + 1 lines = [] stride = len(words) // groups + 1 for i in range(0, len(words), stride): start_pos = i end_pos = min(i+stride, len(words)) selected_text = " ".join(words[start_pos:end_pos]) selected_text = selected_text.strip() if selected_text == "": continue lines.append(selected_text) # sentence by sentence offset = "" scores = [] probs = [] final_lines = [] labels = [] for line in lines: if re.search("[a-zA-Z0-9]+", line) == None: continue score, diff, sd = self.getScore(line) if score == -1 or math.isnan(score): continue scores.append(score) final_lines.append(line) if score > self.threshold: labels.append(1) prob = "{:.2f}%\n(A.I.)".format(normCdf(abs(self.threshold - score)) * 100) probs.append(prob) else: labels.append(0) prob = "{:.2f}%\n(Human)".format(normCdf(abs(self.threshold - score)) * 100) probs.append(prob) mean_score = sum(scores)/len(scores) mean_prob = normCdf(abs(self.threshold - mean_score)) * 100 label = 0 if mean_score > self.threshold else 1 print(f"probability for {'A.I.' if label == 0 else 'Human'}:", "{:.2f}%".format(mean_prob)) return {"prob": "{:.2f}%".format(mean_prob), "label": label}, self.getVerdict(mean_score) def getLogLikelihood(self,sentence): encodings = self.tokenizer(sentence, return_tensors="pt") seq_len = encodings.input_ids.size(1) nlls = [] prev_end_loc = 0 for begin_loc in range(0, seq_len, self.stride): end_loc = min(begin_loc + self.max_length, seq_len) trg_len = end_loc - prev_end_loc input_ids = encodings.input_ids[:, begin_loc:end_loc].to(self.device) target_ids = input_ids.clone() target_ids[:, :-trg_len] = -100 with torch.no_grad(): outputs = self.model(input_ids, labels=target_ids) neg_log_likelihood = outputs.loss * trg_len nlls.append(neg_log_likelihood) prev_end_loc = end_loc if end_loc == seq_len: break return -1 * torch.stack(nlls).sum() / end_loc ################################################ # Version 1 apis ############################################### def call_1(self, sentence): """ Takes in a sentence split by full stop p and print the perplexity of the total sentence split the lines based on full stop and find the perplexity of each sentence and print average perplexity Burstiness is the max perplexity of each sentence """ results = OrderedDict() total_valid_char = re.findall("[a-zA-Z0-9]+", sentence) total_valid_char = sum([len(x) for x in total_valid_char]) # finds len of all the valid characters a sentence # if total_valid_char < 100: # return {"status": "Please input more text (min 100 characters)"}, "Please input more text (min 100 characters)" lines = re.split(r'(?<=[.?!][ \[\(])|(?<=\n)\s*',sentence) lines = list(filter(lambda x: (x is not None) and (len(x) > 0), lines)) ppl = self.getPPL_1(sentence) print(f"Perplexity {ppl}") results["Perplexity"] = ppl offset = "" Perplexity_per_line = [] for i, line in enumerate(lines): if re.search("[a-zA-Z0-9]+", line) == None: continue if len(offset) > 0: line = offset + line offset = "" # remove the new line pr space in the first sentence if exists if line[0] == "\n" or line[0] == " ": line = line[1:] if line[-1] == "\n" or line[-1] == " ": line = line[:-1] elif line[-1] == "[" or line[-1] == "(": offset = line[-1] line = line[:-1] ppl = self.getPPL_1(line) Perplexity_per_line.append(ppl) print(f"Perplexity per line {sum(Perplexity_per_line)/len(Perplexity_per_line)}") results["Perplexity per line"] = sum(Perplexity_per_line)/len(Perplexity_per_line) print(f"Burstiness {max(Perplexity_per_line)}") results["Burstiness"] = max(Perplexity_per_line) out, label = self.getResults_1(results["Perplexity per line"]) results["label"] = label return results, out def getPPL_1(self,sentence): encodings = self.tokenizer(sentence, return_tensors="pt") seq_len = encodings.input_ids.size(1) nlls = [] likelihoods = [] prev_end_loc = 0 for begin_loc in range(0, seq_len, self.stride): end_loc = min(begin_loc + self.max_length, seq_len) trg_len = end_loc - prev_end_loc input_ids = encodings.input_ids[:, begin_loc:end_loc].to(self.device) target_ids = input_ids.clone() target_ids[:, :-trg_len] = -100 with torch.no_grad(): outputs = self.model(input_ids, labels=target_ids) neg_log_likelihood = outputs.loss * trg_len likelihoods.append(neg_log_likelihood) nlls.append(neg_log_likelihood) prev_end_loc = end_loc if end_loc == seq_len: break ppl = int(torch.exp(torch.stack(nlls).sum() / end_loc)) return ppl def getResults_1(self, threshold): if threshold < 60: label = 0 return "The Text is generated by AI.", label elif threshold < 80: label = 0 return "The Text is most probably contain parts which are generated by AI. (require more text for better Judgement)", label else: label = 1 return "The Text is written by Human.", label