TwT-6's picture
Upload 2667 files
256a159 verified
import argparse
import os.path as osp
import random
import time
from typing import Any
from mmengine.config import Config, ConfigDict
from mmengine.utils import mkdir_or_exist
from opencompass.registry import (ICL_INFERENCERS, ICL_PROMPT_TEMPLATES,
ICL_RETRIEVERS, TASKS)
from opencompass.tasks.base import BaseTask
from opencompass.utils import (build_dataset_from_cfg, build_model_from_cfg,
get_infer_output_path, get_logger,
task_abbr_from_cfg)
@TASKS.register_module(force=(__name__ == '__main__')) # A hack for script run
class OpenICLAttackTask(BaseTask):
"""OpenICL Inference Task.
This task is used to run the inference process.
"""
name_prefix = 'OpenICLAttack'
log_subdir = 'logs/attack'
output_subdir = 'attack'
def __init__(self, cfg: ConfigDict):
super().__init__(cfg)
run_cfg = self.model_cfgs[0].get('run_cfg', {})
self.num_gpus = run_cfg.get('num_gpus', 0)
self.num_procs = run_cfg.get('num_procs', 1)
self.logger = get_logger()
def get_command(self, cfg_path, template):
"""Get the command template for the task.
Args:
cfg_path (str): The path to the config file of the task.
template (str): The template which have '{task_cmd}' to format
the command.
"""
script_path = __file__
if self.num_gpus > 0:
port = random.randint(12000, 32000)
command = (f'torchrun --master_port={port} '
f'--nproc_per_node {self.num_procs} '
f'{script_path} {cfg_path}')
else:
command = f'python {script_path} {cfg_path}'
return template.format(task_cmd=command)
def prompt_selection(self, inferencer, prompts):
prompt_dict = {}
for prompt in prompts:
acc = inferencer.predict(prompt)
prompt_dict[prompt] = acc
self.logger.info('{:.2f}, {}\n'.format(acc * 100, prompt))
sorted_prompts = sorted(prompt_dict.items(),
key=lambda x: x[1],
reverse=True)
return sorted_prompts
def run(self):
self.logger.info(f'Task {task_abbr_from_cfg(self.cfg)}')
for model_cfg, dataset_cfgs in zip(self.model_cfgs, self.dataset_cfgs):
self.max_out_len = model_cfg.get('max_out_len', None)
self.batch_size = model_cfg.get('batch_size', None)
self.model = build_model_from_cfg(model_cfg)
for dataset_cfg in dataset_cfgs:
self.model_cfg = model_cfg
self.dataset_cfg = dataset_cfg
self.infer_cfg = self.dataset_cfg['infer_cfg']
self.dataset = build_dataset_from_cfg(self.dataset_cfg)
self.sub_cfg = {
'models': [self.model_cfg],
'datasets': [[self.dataset_cfg]],
}
out_path = get_infer_output_path(
self.model_cfg, self.dataset_cfg,
osp.join(self.work_dir, 'attack'))
if osp.exists(out_path):
continue
self._inference()
def _inference(self):
self.logger.info(
f'Start inferencing {task_abbr_from_cfg(self.sub_cfg)}')
assert hasattr(self.infer_cfg, 'ice_template') or hasattr(self.infer_cfg, 'prompt_template'), \
'Both ice_template and prompt_template cannot be None simultaneously.' # noqa: E501
ice_template = None
if hasattr(self.infer_cfg, 'ice_template'):
ice_template = ICL_PROMPT_TEMPLATES.build(
self.infer_cfg['ice_template'])
prompt_template = None
if hasattr(self.infer_cfg, 'prompt_template'):
prompt_template = ICL_PROMPT_TEMPLATES.build(
self.infer_cfg['prompt_template'])
retriever_cfg = self.infer_cfg['retriever'].copy()
retriever_cfg['dataset'] = self.dataset
retriever = ICL_RETRIEVERS.build(retriever_cfg)
# set inferencer's default value according to model's config'
inferencer_cfg = self.infer_cfg['inferencer']
inferencer_cfg['model'] = self.model
self._set_default_value(inferencer_cfg, 'max_out_len',
self.max_out_len)
self._set_default_value(inferencer_cfg, 'batch_size', self.batch_size)
inferencer_cfg['max_seq_len'] = self.model_cfg['max_seq_len']
inferencer_cfg['dataset_cfg'] = self.dataset_cfg
inferencer = ICL_INFERENCERS.build(inferencer_cfg)
out_path = get_infer_output_path(self.model_cfg, self.dataset_cfg,
osp.join(self.work_dir, 'attack'))
out_dir, out_file = osp.split(out_path)
mkdir_or_exist(out_dir)
from config import LABEL_SET
from prompt_attack.attack import create_attack
from prompt_attack.goal_function import PromptGoalFunction
inferencer.retriever = retriever
inferencer.prompt_template = prompt_template
inferencer.ice_template = ice_template
inferencer.output_json_filepath = out_dir
inferencer.output_json_filename = out_file
goal_function = PromptGoalFunction(
inference=inferencer,
query_budget=self.cfg['attack'].query_budget,
logger=self.logger,
model_wrapper=None,
verbose='True')
if self.cfg['attack']['dataset'] not in LABEL_SET:
# set default
self.cfg['attack']['dataset'] = 'mmlu'
attack = create_attack(self.cfg['attack'], goal_function)
prompts = self.infer_cfg['inferencer']['original_prompt_list']
sorted_prompts = self.prompt_selection(inferencer, prompts)
if True:
# if args.prompt_selection:
for prompt, acc in sorted_prompts:
self.logger.info('Prompt: {}, acc: {:.2f}%\n'.format(
prompt, acc * 100))
with open(out_dir + 'attacklog.txt', 'a+') as f:
f.write('Prompt: {}, acc: {:.2f}%\n'.format(
prompt, acc * 100))
for init_prompt, init_acc in sorted_prompts[:self.cfg['attack'].
prompt_topk]:
if init_acc > 0:
init_acc, attacked_prompt, attacked_acc, dropped_acc = attack.attack( # noqa
init_prompt)
self.logger.info('Original prompt: {}'.format(init_prompt))
self.logger.info('Attacked prompt: {}'.format(
attacked_prompt.encode('utf-8')))
self.logger.info(
'Original acc: {:.2f}%, attacked acc: {:.2f}%, dropped acc: {:.2f}%' # noqa
.format(init_acc * 100, attacked_acc * 100,
dropped_acc * 100))
with open(out_dir + 'attacklog.txt', 'a+') as f:
f.write('Original prompt: {}\n'.format(init_prompt))
f.write('Attacked prompt: {}\n'.format(
attacked_prompt.encode('utf-8')))
f.write(
'Original acc: {:.2f}%, attacked acc: {:.2f}%, dropped acc: {:.2f}%\n\n' # noqa
.format(init_acc * 100, attacked_acc * 100,
dropped_acc * 100))
else:
with open(out_dir + 'attacklog.txt', 'a+') as f:
f.write('Init acc is 0, skip this prompt\n')
f.write('Original prompt: {}\n'.format(init_prompt))
f.write('Original acc: {:.2f}% \n\n'.format(init_acc *
100))
def _set_default_value(self, cfg: ConfigDict, key: str, value: Any):
if key not in cfg:
assert value, (f'{key} must be specified!')
cfg[key] = value
def parse_args():
parser = argparse.ArgumentParser(description='Model Inferencer')
parser.add_argument('config', help='Config file path')
args = parser.parse_args()
return args
if __name__ == '__main__':
args = parse_args()
cfg = Config.fromfile(args.config)
start_time = time.time()
inferencer = OpenICLAttackTask(cfg)
inferencer.run()
end_time = time.time()
get_logger().info(f'time elapsed: {end_time - start_time:.2f}s')