Spaces:
Sleeping
Sleeping
from __future__ import absolute_import | |
import streamlit as st | |
import torch | |
import os | |
import sys | |
import pickle | |
import torch | |
import json | |
import random | |
import logging | |
import argparse | |
import numpy as np | |
from io import open | |
from itertools import cycle | |
import torch.nn as nn | |
from model import Seq2Seq | |
from tqdm import tqdm, trange | |
import regex as re | |
from torch.utils.data import ( | |
DataLoader, | |
Dataset, | |
SequentialSampler, | |
RandomSampler, | |
TensorDataset, | |
) | |
from torch.utils.data.distributed import DistributedSampler | |
from transformers import ( | |
WEIGHTS_NAME, | |
AdamW, | |
get_linear_schedule_with_warmup, | |
RobertaConfig, | |
RobertaModel, | |
RobertaTokenizer, | |
) | |
from huggingface_hub import hf_hub_download | |
import io | |
# def list_files(startpath, prev_level=0): | |
# # list files recursively | |
# for root, dirs, files in os.walk(startpath): | |
# level = root.replace(startpath, "").count(os.sep) + prev_level | |
# indent = " " * 4 * (level) | |
# print("{}{}/".format(indent, os.path.basename(root))) | |
# # st.write("{}{}/".format(indent, os.path.basename(root))) | |
# subindent = " " * 4 * (level + 1) | |
# for f in files: | |
# print("{}{}".format(subindent, f)) | |
# # st.write("{}{}".format(subindent, f)) | |
# for d in dirs: | |
# list_files(d, level + 1) | |
class CONFIG: | |
max_source_length = 256 | |
max_target_length = 128 | |
beam_size = 10 | |
local_rank = -1 | |
no_cuda = False | |
do_train = True | |
do_eval = True | |
do_test = True | |
train_batch_size = 12 | |
eval_batch_size = 32 | |
model_type = "roberta" | |
model_name_or_path = "microsoft/codebert-base" | |
output_dir = "/content/drive/MyDrive/CodeSummarization" | |
load_model_path = None | |
train_filename = "dataset/python/train.jsonl" | |
dev_filename = "dataset/python/valid.jsonl" | |
test_filename = "dataset/python/test.jsonl" | |
config_name = "" | |
tokenizer_name = "" | |
cache_dir = "cache" | |
save_every = 5000 | |
gradient_accumulation_steps = 1 | |
learning_rate = 5e-5 | |
weight_decay = 1e-4 | |
adam_epsilon = 1e-8 | |
max_grad_norm = 1.0 | |
num_train_epochs = 3.0 | |
max_steps = -1 | |
warmup_steps = 0 | |
train_steps = 100000 | |
eval_steps = 10000 | |
n_gpu = torch.cuda.device_count() | |
# download model with streamlit cache decorator | |
def download_model(): | |
if not os.path.exists(r"models/pytorch_model.bin"): | |
os.makedirs("./models", exist_ok=True) | |
path = hf_hub_download( | |
repo_id="tmnam20/codebert-code-summarization", | |
filename="pytorch_model.bin", | |
cache_dir="cache", | |
local_dir=os.path.join(os.getcwd(), "models"), | |
local_dir_use_symlinks=False, | |
force_download=True, | |
) | |
# load with streamlit cache decorator | |
# @st.cache(persist=False, show_spinner=True, allow_output_mutation=True) | |
def load_tokenizer_and_model(pretrained_path): | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
# Config model | |
config_class, model_class, tokenizer_class = ( | |
RobertaConfig, | |
RobertaModel, | |
RobertaTokenizer, | |
) | |
model_config = config_class.from_pretrained( | |
CONFIG.config_name if CONFIG.config_name else CONFIG.model_name_or_path, | |
cache_dir=CONFIG.cache_dir, | |
) | |
# model_config.save_pretrained("config") | |
# load tokenizer | |
tokenizer = tokenizer_class.from_pretrained( | |
CONFIG.tokenizer_name if CONFIG.tokenizer_name else CONFIG.model_name_or_path, | |
cache_dir=CONFIG.cache_dir, | |
# do_lower_case=args.do_lower_case | |
) | |
# load encoder from pretrained RoBERTa | |
encoder = model_class.from_pretrained( | |
CONFIG.model_name_or_path, config=model_config, cache_dir=CONFIG.cache_dir | |
) | |
# build decoder | |
decoder_layer = nn.TransformerDecoderLayer( | |
d_model=model_config.hidden_size, nhead=model_config.num_attention_heads | |
) | |
decoder = nn.TransformerDecoder(decoder_layer, num_layers=6) | |
# build seq2seq model from pretrained encoder and from-scratch decoder | |
model = Seq2Seq( | |
encoder=encoder, | |
decoder=decoder, | |
config=model_config, | |
beam_size=CONFIG.beam_size, | |
max_length=CONFIG.max_target_length, | |
sos_id=tokenizer.cls_token_id, | |
eos_id=tokenizer.sep_token_id, | |
) | |
try: | |
state_dict = torch.load( | |
os.path.join(os.getcwd(), "models", "pytorch_model.bin"), | |
map_location=device, | |
) | |
except RuntimeError as e: | |
print(e) | |
try: | |
state_dict = torch.load( | |
os.path.join(os.getcwd(), "models", "pytorch_model.bin"), | |
map_location="cpu", | |
) | |
except RuntimeError as e: | |
print(e) | |
state_dict = torch.load( | |
os.path.join(os.getcwd(), "models", "pytorch_model_cpu.bin"), | |
map_location="cpu", | |
) | |
del state_dict["encoder.embeddings.position_ids"] | |
model.load_state_dict(state_dict) | |
# model = model.to("cpu") | |
# torch.save(model.state_dict(), os.path.join(os.getcwd(), "models", "pytorch_model_cpu.bin")) | |
model = model.to(device) | |
return tokenizer, model, device | |
def preprocessing(code_segment): | |
# remove newlines | |
code_segment = re.sub(r"\n", " ", code_segment) | |
# remove docstring | |
code_segment = re.sub(r'""".*?"""', "", code_segment, flags=re.DOTALL) | |
# remove multiple spaces | |
code_segment = re.sub(r"\s+", " ", code_segment) | |
# remove comments | |
code_segment = re.sub(r"#.*", "", code_segment) | |
# remove html tags | |
code_segment = re.sub(r"<.*?>", "", code_segment) | |
# remove urls | |
code_segment = re.sub(r"http\S+", "", code_segment) | |
# split special chars into different tokens | |
code_segment = re.sub(r"([^\w\s])", r" \1 ", code_segment) | |
return code_segment.split() | |
def generate_docstring(model, tokenizer, device, code_segemnt, max_length=None): | |
input_tokens = preprocessing(code_segemnt) | |
encoded_input = tokenizer.encode_plus( | |
input_tokens, | |
max_length=CONFIG.max_source_length, | |
pad_to_max_length=True, | |
truncation=True, | |
return_tensors="pt", | |
) | |
input_ids = encoded_input["input_ids"].to(device) | |
input_mask = encoded_input["attention_mask"].to(device) | |
if max_length is not None: | |
model.max_length = max_length | |
summary = model(input_ids, input_mask) | |
# decode summary with tokenizer | |
summaries = [] | |
for i in range(summary.shape[1]): | |
summaries.append(tokenizer.decode(summary[0][i], skip_special_tokens=True)) | |
return summaries | |
# return tokenizer.decode(summary[0][0], skip_special_tokens=True) | |