Spaces:
Sleeping
Sleeping
import torch | |
import torch.nn as nn | |
# import jieba | |
import string | |
import numpy as np | |
from copy import deepcopy | |
from tqdm import tqdm | |
import time | |
from datetime import datetime | |
import os | |
from sklearn.linear_model import LinearRegression | |
from torch.multiprocessing import Process,Pool | |
from transformers import BertTokenizer | |
os.environ['TOKENIZERS_PARALLELISM']='True' | |
# torch.autograd.set_detect_anomaly(True) | |
class BaseAttack: | |
def __init__(self, name, model, tokenizer, device, max_per, padding,max_length,label_to_id,sentence1_key,sentence2_key): | |
self.name = name | |
self.model = model | |
self.tokenizer = tokenizer | |
self.device = device | |
self.model = self.model.to(self.device) | |
self.model.eval() | |
self.padding = padding | |
self.max_length = max_length | |
self.label_to_id = label_to_id | |
self.sentence1_key = sentence1_key | |
self.sentence2_key = sentence2_key | |
# 修改token个数的最大值 | |
self.max_per = max_per | |
# linear regression model initialization | |
# self.linear_regression() | |
self.random_tokenizer = BertTokenizer.from_pretrained('bert-base-uncased') | |
def run_attack(self, x): | |
pass | |
def compute_loss(self, x): | |
pass | |
def preprocess_function(self,examples,to_device=True): | |
# Tokenize the texts | |
texts = ((examples[0],) if self.sentence2_key is None else (examples[0], examples[1])) | |
result = self.tokenizer(*texts, padding=self.padding, max_length=self.max_length, truncation=True) | |
new_result = {} | |
for key,item in result.items(): | |
if to_device: | |
new_result[key] = torch.tensor(item).unsqueeze(0).to(self.device) | |
else: | |
new_result[key] = torch.tensor(item).unsqueeze(0) | |
return new_result | |
def get_pred(self,input_): | |
return self.get_prob(input_).logits.argmax(dim=-1) | |
def get_prob(self,input_): | |
toc = datetime.now() | |
batch = self.preprocess_function(input_) | |
# batch['gumbel_softmax']=gradient | |
# print(batch) | |
outputs = self.model(**batch) # get all logits | |
tic = datetime.now() | |
running_time = (tic-toc).total_seconds() | |
return outputs,running_time | |
def output_analysis(self,outputs): | |
# print(outputs) | |
all_skim_loss, all_tokens_remained = list(), list() | |
all_layer_tokens_remained = [[] for _ in range(len(outputs.layer_tokens_remained))] | |
all_skim_loss.append(outputs.skim_loss) | |
all_tokens_remained.append(outputs.tokens_remained) | |
for layer_idx,mac in enumerate(outputs.layer_tokens_remained): | |
all_layer_tokens_remained[layer_idx].append(mac) | |
skim_loss = torch.mean(torch.stack(all_skim_loss)) | |
tokens_remained = torch.mean(torch.stack(all_tokens_remained)) | |
layers_result = [torch.mean(torch.stack(macs)) for i,macs in enumerate(all_layer_tokens_remained)] | |
return skim_loss,tokens_remained,layers_result | |
def load_data(self,model_path_key,mode='train'): | |
path = f'flops_count/{model_path_key}/{mode}' | |
if os.path.exists(f'{path}/process_data.pth'): | |
print(f'loading data from {path}') | |
data = torch.load(f'{path}/process_data.pth') | |
else: | |
time_list = torch.load(f'{path}/time_list.pth') | |
ratio_list = torch.load(f'{path}/ratio_list.pth') | |
token_num_list = torch.load(f'{path}/text_len_list_tokenizer.pth') | |
ratio_list_ = [] | |
for ratio in ratio_list: | |
ratio_list_.append(ratio.item()) | |
y = np.expand_dims(np.array(ratio_list_),axis=1) | |
# print(x.shape) | |
time_list_ = [] | |
for time,token_num in zip(time_list,token_num_list): | |
time_list_.append((time/(token_num*(10**8)))) | |
x = np.expand_dims(np.array(time_list_),axis=1) | |
# print(y.shape) | |
data = dict() | |
data['x']=x | |
data['y']=y | |
torch.save(data,f'{path}/process_data.pth') | |
return data | |
def predict(self,x): | |
return self.w*x+self.b | |
def linear_regression(self): | |
print("="*20) | |
print('Linear Regression Generation') | |
data_train = self.load_data(self.name,mode='train') | |
data_test = self.load_data(self.name,mode='test') | |
# print(data_train,data_test) | |
reg = LinearRegression().fit(data_train['x'],data_train['y']) | |
train_score = reg.score(data_train['x'],data_train['y']) | |
test_score = reg.score(data_test['x'],data_test['y']) | |
print(f'train set score: {train_score}') | |
print(f'test set score: {test_score}') | |
self.w = reg.coef_[0][0] | |
self.b = reg.intercept_[0] | |
print("w:",self.w) | |
print("b:",self.b) | |
print(self.predict(0.8)) | |
class MyAttack(BaseAttack): | |
def __init__(self, name, model, tokenizer, device, max_per, padding, max_length, label_to_id, sentence1_key, sentence2_key): | |
super(MyAttack, self).__init__(name, model, tokenizer, device, max_per, padding, max_length, label_to_id, sentence1_key, sentence2_key) | |
# self.insert_character = string.punctuation | |
self.insert_character = string.digits | |
self.insert_character += string.ascii_letters | |
# self.insert_character -= """"'/\\""" | |
# print(self.insert_character) | |
self.origin_ratio = [] | |
self.attack_ratio = [] | |
self.layer_result = [] | |
self.origin_layer_result = [] | |
# @torch.no_grad() | |
# def select_best(self, new_strings): | |
# best_string = None | |
# best_loss = 0 | |
# for new_string in new_strings: | |
# new_predicted_loss = self.compute_loss(new_string) | |
# if new_predicted_loss>best_loss: | |
# best_loss = new_predicted_loss | |
# best_string = new_string | |
# assert best_string is not None | |
# return best_string,best_loss | |
def select_best(self, new_strings): | |
# self.model.to('cpu') | |
best_string = None | |
best_loss = 0 | |
with Pool(processes=4) as pool: | |
loss_list = pool.map(self.compute_loss,new_strings) | |
idx = np.argmax(np.array(loss_list)) | |
best_loss = loss_list[idx] | |
best_string = new_strings[idx] | |
# self.model.to(self.device) | |
# for new_string in new_strings: | |
# new_predicted_loss = self.compute_loss(new_string) | |
# if new_predicted_loss>best_loss: | |
# best_loss = new_predicted_loss | |
# best_string = new_string | |
assert best_string is not None | |
# self.model.to(self.device) | |
return best_string,best_loss | |
def compute_loss(self, xxx): | |
raise NotImplementedError | |
def mutation(self, current_adv_text, grad, modify_pos): | |
raise NotImplementedError | |
def run_attack(self, text): | |
# assert len(text) == 1 | |
# print(text) | |
text[0] = text[0].strip(" .") | |
text[1] = text[1].strip(" .") | |
print(f'Origin Text: {text}') | |
current_adv_text = deepcopy(text) | |
# max_per 最多扰动单词的个数 | |
# pbar = tqdm(range(self.max_per)) | |
best_loss = 0 | |
best_tokens_remained = 0 | |
best_layer_result = None | |
output,_ = self.get_prob(current_adv_text) | |
origin_skim_loss,origin_ratio_,origin_layer_result_ = self.output_analysis(output) | |
print(origin_skim_loss,origin_ratio_) | |
self.origin_ratio.append(origin_ratio_.item()) | |
self.origin_layer_result.append(origin_layer_result_) | |
# for it in pbar: | |
for _ in range(self.max_per): | |
# 得到每个修改的位置 | |
new_strings = self.mutation(current_adv_text) | |
#print(new_strings) | |
current_adv_text,current_loss = self.select_best(new_strings) | |
# print(new_strings) | |
# print(current_adv_text,current_loss,current_tokens_remained) | |
if current_loss > best_loss: | |
best_adv_text = deepcopy(current_adv_text) | |
best_loss = current_loss | |
print(best_adv_text) | |
output,_ = self.get_prob(best_adv_text) | |
_,best_tokens_remained,best_layer_result = self.output_analysis(output) | |
self.attack_ratio.append(best_tokens_remained.item()) | |
self.layer_result.append(best_layer_result) | |
print(f'Malicious Text: {best_adv_text}') | |
print(f'Origin Ratio: {self.origin_ratio[-1]} Attack Ratio: {self.attack_ratio[-1]}') | |
print(f'Layer Result: {self.layer_result[-1]}') | |
return best_adv_text,best_loss,best_tokens_remained,best_layer_result | |