import gradio as gr from simpletransformers.classification import ClassificationModel, ClassificationArgs from lime.lime_text import LimeTextExplainer import torch from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler import re import transformers import json # load all models deep_scc_model_args = ClassificationArgs(num_train_epochs=10,max_seq_length=300,use_multiprocessing=False) deep_scc_model = ClassificationModel("roberta", "NTUYG/DeepSCC-RoBERTa", num_labels=19, args=deep_scc_model_args, use_cuda=False) pragformer = transformers.AutoModel.from_pretrained("Pragformer/PragFormer", trust_remote_code=True) pragformer_private = transformers.AutoModel.from_pretrained("Pragformer/PragFormer_private", trust_remote_code=True) pragformer_reduction = transformers.AutoModel.from_pretrained("Pragformer/PragFormer_reduction", trust_remote_code=True) # Event Listeners with_omp_str = 'Should contain a parallel work-sharing loop construct' without_omp_str = 'Should not contain a parallel work-sharing loop construct' name_file = ['bash', 'c', 'c#', 'c++','css', 'haskell', 'java', 'javascript', 'lua', 'objective-c', 'perl', 'php', 'python','r','ruby', 'scala', 'sql', 'swift', 'vb.net'] tokenizer = transformers.AutoTokenizer.from_pretrained('NTUYG/DeepSCC-RoBERTa') with open('c_data.json', 'r') as f: data = json.load(f) def fill_code(code_pth): pragma = data[code_pth]['pragma'] code = data[code_pth]['code'] return 'None' if len(pragma)==0 else pragma, code def predict(code_txt): code = code_txt.lstrip().rstrip() tokenized = tokenizer.batch_encode_plus( [code], max_length = 150, pad_to_max_length = True, truncation = True ) pred = pragformer(torch.tensor(tokenized['input_ids']), torch.tensor(tokenized['attention_mask'])) y_hat = torch.argmax(pred).item() return with_omp_str if y_hat==1 else without_omp_str, torch.nn.Softmax(dim=1)(pred).squeeze()[y_hat].item() def is_private(code_txt): if predict(code_txt)[0] == without_omp_str: return gr.update(visible=False) code = code_txt.lstrip().rstrip() tokenized = tokenizer.batch_encode_plus( [code], max_length = 150, pad_to_max_length = True, truncation = True ) pred = pragformer_private(torch.tensor(tokenized['input_ids']), torch.tensor(tokenized['attention_mask'])) y_hat = torch.argmax(pred).item() # if y_hat == 0: # return gr.update(visible=False) # else: return gr.update(value=f"Should {'not' if y_hat==0 else ''} contain private with confidence: {torch.nn.Softmax(dim=1)(pred).squeeze()[y_hat].item()}", visible=True) def is_reduction(code_txt): if predict(code_txt)[0] == without_omp_str: return gr.update(visible=False) code = code_txt.lstrip().rstrip() tokenized = tokenizer.batch_encode_plus( [code], max_length = 150, pad_to_max_length = True, truncation = True ) pred = pragformer_reduction(torch.tensor(tokenized['input_ids']), torch.tensor(tokenized['attention_mask'])) y_hat = torch.argmax(pred).item() # if y_hat == 0: # return gr.update(visible=False) # else: return gr.update(value=f"Should {'not' if y_hat==0 else ''} contain reduction with confidence: {torch.nn.Softmax(dim=1)(pred).squeeze()[y_hat].item()}", visible=True) def get_predictor(model): def predictor(texts): tokenized = tokenizer.batch_encode_plus( texts, max_length = 150, pad_to_max_length = True, truncation = True ) test_seq = torch.tensor(tokenized['input_ids']) test_mask = torch.tensor(tokenized['attention_mask']) test_y = torch.tensor([1]*len(texts)) test_data = TensorDataset(test_seq, test_mask, test_y) test_sampler = SequentialSampler(test_seq) test_dataloader = DataLoader(test_data, sampler = test_sampler, batch_size = len(texts)) total_probas = [] for step, batch in enumerate(test_dataloader): sent_id, mask, labels = batch outputs = model(sent_id, mask) probas = outputs.detach().numpy() total_probas.extend(probas) return torch.nn.Softmax(dim=1)(torch.tensor(probas)).numpy() return predictor def get_lime_explain(filename): def lime_explain(code_txt): SAMPLES = 20 exp = [] if filename == 'Loop': model = pragformer class_names = ['Without OpenMP', 'With OpenMP'] elif filename == 'Private': model = pragformer_private class_names = ['Without Private', 'With Private'] else: model = pragformer_reduction class_names = ['Without Reduction', 'With Reduction'] explainer = LimeTextExplainer(class_names=class_names, split_expression=r"\s+") exp = explainer.explain_instance(code_txt, get_predictor(model), num_features=20, num_samples=SAMPLES) exp.save_to_file(f'{filename.lower()}_explanation.html') return gr.update(visible=True, value=f'{filename.lower()}_explanation.html') return lime_explain def lime_title(code_txt): return gr.update(visible=True) def activate_c(lang_pred): langs = lang_pred.split('\n') langs = {lang[5:lang.find(':')]:float(lang[lang.find(':')+1:]) for lang in langs} if any([lang in langs for lang in ['c', 'c++', 'c#']]) and any([val > 0.15 for val in langs.values()]): return gr.update(visible=True) else: return gr.update(visible=False) def activate_button(lang_pred): langs = lang_pred.split('\n') langs = {lang[5:lang.find(':')]:float(lang[lang.find(':')+1:]) for lang in langs} if any([lang in langs for lang in ['c', 'c++', 'c#']]) and any([val > 0.15 for val in langs.values()]): return gr.update(visible=False) else: return gr.update(visible=True) def lang_predict(code_txt): res = {} code = code_txt.replace('\n',' ').replace('\r',' ') predictions, raw_outputs = deep_scc_model.predict([code]) # preds = [name_file[predictions[i]] for i in range(5)] softmax_vals = torch.nn.Softmax(dim=1)(torch.tensor(raw_outputs)) top5 = torch.topk(softmax_vals, 5) for lang_idx, conf in zip(top5.indices.flatten(), top5.values.flatten()): res[name_file[lang_idx.item()]] = conf.item() return '\n'.join([f" {'✅' if k=='c' else '❌'} {k}: {v}" for k,v in res.items()]) # Define GUI with gr.Blocks() as pragformer_gui: gr.Markdown( """ # PragFormer Pragma Classifiction Pragformer is a tool that analyzes C code to determine whether it would benefit from being placed in a work-sharing loop construct and, if necessary, suggests the use of data-sharing attribute clauses (e.g. private and reduction) to improve performance. It also provides explainability through the use of LIME. """) #with gr.Row(equal_height=True): with gr.Column(): gr.Markdown("## Input") with gr.Row(): with gr.Column(): drop = gr.Dropdown(list(data.keys()), label="Mix of parallel and not-parallel code snippets", value="Minyoung-Kim1110/OpenMP/Excercise/atomic/0") sample_btn = gr.Button("Sample") pragma = gr.Textbox(label="Original parallelization classification (if any)") with gr.Row(): code_in = gr.Textbox(lines=5, label="Write some C code and see if it should contain a parallel work-sharing loop construct") lang_pred = gr.Textbox(lines=5, label="DeepScc programming language prediction (only codes written in a C-like syntax will be executed)") submit_btn = gr.Button("Submit") err_msg = gr.Markdown("""