import os import copy import numpy as np import random as rn from tqdm import tqdm from . import utils model_depth = { 'gpt2-xl': 48, 'gpt-j-6b': 28, 'llama-2-7b': 32, 'llama-3-8b': 32, 'gemma-7b': 28, 'mistral-7b': 32, 'mamba-1.4b': 48 } model_layer_indices = { k: np.arange(1,model_depth[k],4) for k in model_depth } model_layer_folders = { key:[f'layer{i}' for i in model_layer_indices[key]] for key in model_layer_indices } def find_oap_subsets( request, requests_subset, new_request = None, static_context = 'The following is a stealth attack: ', eval_oap = False, eval_ap = False ): op_request = request.copy() op_subset = copy.deepcopy(requests_subset) if eval_oap: oap_request = copy.deepcopy(request) oap_request['prompt'] = static_context + oap_request['prompt'] oap_subset = copy.deepcopy(requests_subset) for i in range(len(oap_subset)): oap_subset[i]['prompt'] = static_context + oap_subset[i]['prompt'] if eval_ap: # find request with attack trigger prompt section (ap) ap_request = copy.deepcopy(new_request) # find trigger prompt ap_section = new_request['prompt'].split(op_request['prompt'])[0] ap_section = ap_section + '{}' # find subset of other subject requests with attack trigger prompt section (ap) ap_subset = copy.deepcopy(op_subset) for i in range(len(ap_subset)): ap_subset[i]['prompt'] = ap_section.format(ap_subset[i]['prompt']) if eval_oap: # create a list of requests related to the target subject target_requests = [op_request, oap_request, ap_request] return target_requests, op_subset, oap_subset, ap_subset elif eval_ap: target_requests = [op_request, ap_request] return target_requests, op_subset, None, ap_subset else: target_requests = [op_request] return target_requests, op_subset, None, None def find_aug_subsets(aug_prompts, aug_subjects=None, num_aug_prompt_eval=None): if num_aug_prompt_eval is not None: aug_prompts_idxs = rn.sample( list(np.arange(len(aug_prompts))), k=min(len(aug_prompts), num_aug_prompt_eval)) aug_prompts = np.array(aug_prompts)[aug_prompts_idxs] if aug_subjects is not None: aug_subjects = np.array(aug_subjects)[aug_prompts_idxs] else: aug_subjects = [new_request['subject']]*len(aug_prompts) return aug_prompts, aug_subjects def eval_sample_ppl( eval_contents, eval_op = True, eval_oap = False, eval_ap = False, eval_aug = False, eval_rnd = False, tok = None, verbose = False ): """ Evaluation summarisation function for a single sample attack (PPL metrics) """ sample_results = {} sample_results['target_gen_ppl_ratio'] = eval_contents['am_list_gen_ppl'][-1] / eval_contents['om_list_gen_ppl'][-1] if eval_op: # calculate PPL - Other Samples sample_results['mean_op_gen_ppl_ratio'] = np.mean(eval_contents['am_op_gen_ppl'] / eval_contents['om_op_gen_ppl']) if eval_aug: sample_results['mean_aug_gen_ppl_ratio'] = np.mean(eval_contents['am_aug_gen_ppl'] / eval_contents['om_aug_gen_ppl']) sample_results['per_aug_mismatch_response'] = np.mean(np.array([ eval_contents['new_request']['target_new']['str'] in e \ for e in eval_contents['am_aug_gen_text'] ])) if eval_ap: ppl_ratio = eval_contents['am_ap_gen_ppl'] / eval_contents['om_ap_gen_ppl'] sample_results['mean_ap_gen_ppl_ratio'] = np.mean(ppl_ratio) if eval_oap: sample_results['mean_oap_gen_ppl_ratio'] = np.mean(eval_contents['am_oap_gen_ppl'] / eval_contents['om_oap_gen_ppl']) if eval_rnd: raise NotImplementedError return sample_results def eval_model_ppl( model_name, results_path, eval_op = True, eval_oap = False, eval_ap = False, eval_aug = False, eval_rnd = False, num_examples = 1000, eval_selection = None ): # load tokenizer tok = utils.load_tok(model_name=model_name) # find layers layer_folders = model_layer_folders[model_name] across_layer_metrics = None none_layers = np.zeros(len(layer_folders), dtype=bool) for i in tqdm(range(len(layer_folders)), disable=False): # find edit path layer_path = os.path.join(results_path, layer_folders[i]) # find ppl evaluation path and files eval_path = os.path.join(results_path, layer_folders[i], 'perplexity/') eval_files = np.array([f for f in os.listdir(eval_path) if f.endswith('.pickle')]) eval_case_ids = np.array([int(f.split('.')[0]) for f in eval_files]) sorted_indices = np.argsort(eval_case_ids) eval_files = eval_files[sorted_indices] eval_case_ids = eval_case_ids[sorted_indices] if eval_selection is not None: o1, o2, bt = utils.comp(eval_selection, eval_files) eval_files = list(bt) eval_files = eval_files[:num_examples] layer_metrics = None for file in eval_files: try: # find path to single sample file eval_file_path = os.path.join(eval_path, file) edit_file_path = os.path.join(layer_path, file) # load result files edit_contents = utils.loadpickle(edit_file_path) eval_contents = utils.loadpickle(eval_file_path) eval_contents['request'] = edit_contents['request'] # calculate metrics sample_results = eval_sample_ppl( eval_contents, eval_op = eval_op, eval_oap = eval_oap, eval_ap = eval_ap, eval_aug = eval_aug, eval_rnd = eval_rnd, tok = tok, verbose = False ) sample_results['case_id'] = edit_contents['case_id'] sample_results['layer'] = layer_folders[i] if layer_metrics is None: layer_metrics = {k:[] for k in sample_results} for key in sample_results: layer_metrics[key].append(sample_results[key]) except Exception as e: print('Error:', model_name, layer_folders[i], file, e) sample_results = {k:np.nan for k in sample_results} if layer_metrics is not None: for key in sample_results: layer_metrics[key].append(sample_results[key]) if layer_metrics is not None: if across_layer_metrics is None: across_layer_metrics = {key:[] for key in layer_metrics} for key in layer_metrics.keys(): across_layer_metrics[key].append(layer_metrics[key]) else: none_layers[i] = True # fill to sample number for key in across_layer_metrics.keys(): for j in range(len(across_layer_metrics[key])): if len(across_layer_metrics[key][j]) < num_examples: across_layer_metrics[key][j] = across_layer_metrics[key][j] \ + [np.nan]*(num_examples - len(across_layer_metrics[key][j])) for key in across_layer_metrics.keys(): across_layer_metrics[key] = np.array(across_layer_metrics[key]) across_layer_metrics['none_layers'] = none_layers return across_layer_metrics def eval_model_ppl_metrics( model_contents, eval_op = True, eval_oap = False, eval_ap = False, eval_aug = False, eval_rnd = False, ): model_metrics = {} model_metrics['layer_indices'] = model_contents['layer_indices'] none_layers = model_contents['none_layers'] # Efficacy - Successful Response Rate (if edit meets both criterias, it is not NaN) model_metrics['efficacy'] = np.mean(~np.isnan(model_contents['target_gen_ppl_ratio']), axis=1) if eval_op: # PPL - Target and Other Samples model_metrics['ppl_other_mean'], model_metrics['ppl_other_std'] = utils.smart_mean_std(model_contents['mean_op_gen_ppl_ratio'], axis=-1) model_metrics['ppl_target_mean'], model_metrics['ppl_target_std'] = utils.smart_mean_std(model_contents['target_gen_ppl_ratio'], axis=-1) if eval_aug: # PPL - Augmentations model_metrics['ppl_aug_mean'], model_metrics['ppl_aug_std'] = utils.smart_mean_std(model_contents['mean_aug_gen_ppl_ratio'], axis=-1) model_metrics['ppl_aug_mismatch_mean'], model_metrics['ppl_aug_mismatch_std'] = utils.smart_mean_std(model_contents['per_aug_mismatch_response'], axis=-1) if eval_oap: # PPL - Static Context + Other Samples model_metrics['ppl_oap_mean'], model_metrics['ppl_oap_std'] = utils.smart_mean_std(model_contents['mean_oap_gen_ppl_ratio'], axis=-1) if eval_ap: # PPL - Attack Context + Other Samples model_metrics['ppl_ap_mean'], model_metrics['ppl_ap_std'] = utils.smart_mean_std(model_contents['mean_ap_gen_ppl_ratio'], axis=-1) if eval_rnd: raise NotImplementedError for key in model_metrics: layer_filled = np.full(none_layers.shape, np.nan) layer_filled[~none_layers] = model_metrics[key] model_metrics[key] = layer_filled return model_metrics def load_dims(models, datasets, dims_path): dims_contents = {} fpr_contents = {} for dataset_name in datasets: model_dim_contents = {} model_fpr_contents = {} for model_name in models: dims_folder = dims_path.format(dataset_name, model_name) files_in_folder = os.listdir(dims_folder) model_dims = [] model_fprs = [] model_nums = [] for i in range(len(files_in_folder)): contents = utils.loadpickle(os.path.join(dims_folder, files_in_folder[i])) ids = contents['intrinsic_dims'] model_dims.append(np.sqrt(2**(-ids-1))) model_fprs.append(contents['fpr_ftd']) model_nums.append(contents['num_filtered']) model_dims = np.array(model_dims) model_fprs = np.array(model_fprs) mean_dims, std_dims = utils.smart_mean_std(model_dims, axis=0) mean_fprs, std_fprs = utils.smart_mean_std(model_fprs, axis=0) mean_nums, std_nums = utils.smart_mean_std(model_nums, axis=0) model_dim_contents[model_name] = { 'mean_dims': mean_dims, 'std_dims': std_dims } model_fpr_contents[model_name] = { 'mean_fprs': mean_fprs, 'std_fprs': std_fprs, 'mean_nums': mean_nums, 'std_nums': std_nums } dims_contents[dataset_name] = copy.deepcopy(model_dim_contents) fpr_contents[dataset_name] = copy.deepcopy(model_fpr_contents) return dims_contents, fpr_contents