Spaces:
Running
Running
import os | |
os.system('pip3 install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cpu') | |
import glob, fitz | |
import PIL | |
import re | |
import torch | |
import cv2 | |
import pytesseract | |
import pandas as pd | |
import numpy as np | |
import gradio as gr | |
from PIL import Image | |
from tqdm import tqdm | |
from difflib import SequenceMatcher | |
from itertools import groupby | |
from scipy import ndimage | |
from scipy.ndimage import interpolation as inter | |
from datasets import load_metric | |
from datasets import load_dataset | |
from datasets.features import ClassLabel | |
from transformers import AutoProcessor | |
from PIL import Image, ImageDraw, ImageFont | |
from transformers import AutoModelForTokenClassification | |
from transformers.data.data_collator import default_data_collator | |
from datasets import Features, Sequence, ClassLabel, Value, Array2D, Array3D | |
from transformers import LayoutLMv3ForTokenClassification,LayoutLMv3FeatureExtractor,LayoutLMv3ImageProcessor | |
import io | |
# import paddleocr | |
# from paddleocr import PaddleOCR | |
auth_token = os.environ.get("HUGGING_FACE_HUB_TOKEN") | |
import warnings | |
# Ignore warning messages | |
warnings.filterwarnings("ignore") | |
id2label= {0: 'others', 1: 'issuer_name', 2: 'issuer_addr', 3: 'issuer_cap', 4: 'issuer_city', 5: 'issuer_prov', 6: 'issuer_state', 7: 'issuer_tel', 8: 'issuer_id', 9: 'issuer_fax', 10: 'issuer_vat', 11: 'issuer_contact', 12: 'issuer_contact_email', 13: 'issuer_contact_phone', 14: 'receiver_name', 15: 'receiver_addr', 16: 'receiver_cap', 17: 'receiver_city', 18: 'receiver_prov', 19: 'receiver_state', 20: 'receiver_tel', 21: 'receiver_fax', 22: 'receiver_vat', 23: 'receiver_id', 24: 'receiver_contact', 25: 'dest_name', 26: 'dest_addr', 27: 'dest_cap', 28: 'dest_city', 29: 'dest_prov', 30: 'dest_state', 31: 'dest_tel', 32: 'dest_fax', 33: 'dest_vat', 34: 'doc_type', 35: 'doc_nr', 36: 'doc_date', 37: 'order_nr', 38: 'order_date', 39: 'service_order', 40: 'shipment_nr', 41: 'client_reference', 42: 'client_vat', 43: 'client_id', 44: 'client_code', 45: 'time', 46: 'notes', 47: 'client_tel', 48: 'art_code', 49: 'ref_code', 50: 'order_reason', 51: 'order_ref', 52: 'order_ref_date', 53: 'detail_desc', 54: 'lot_id', 55: 'lot_qty', 56: 'detail_um', 57: 'detail_qty', 58: 'detail_tare', 59: 'detail_grossw', 60: 'detail_packages', 61: 'detail_netw', 62: 'detail_origin', 63: 'payment_bank', 64: 'payment_terms', 65: 'tot_qty', 66: 'tot_grossw', 67: 'tot_netw', 68: 'tot_volume', 69: 'shipment_reason', 70: 'package_type', 71: 'transport_respons', 72: 'transport_vectors', 73: 'transport_terms', 74: 'transport_datetime', 75: 'return_plt', 76: 'nonreturn_plt', 77: 'dest_signature', 78: 'driver_signature', 79: 'transport_signature', 80: 'page', 81: 'varieta', 82: 'raccolta', 83: 'detail_volume'} | |
custom_config = r'--oem 3 --psm 6' | |
lang='eng' | |
#Google Vision OCR | |
from google.cloud import vision_v1p3beta1 as vision | |
from google.cloud import vision_v1p3beta1 as vision | |
from google.cloud import vision | |
# os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "test-apikey.json" | |
processor = AutoProcessor.from_pretrained("microsoft/layoutlmv3-base", apply_ocr=False) | |
model = AutoModelForTokenClassification.from_pretrained("sxandie/doc-ai-information-extraction",use_auth_token=auth_token) | |
from tabulate import tabulate | |
def print_df(df): | |
print(tabulate(df, headers = df.columns, tablefmt = 'psql')) | |
def process_image_pytesseract(image,width,height): | |
width, height = image.size | |
feature_extractor = LayoutLMv3ImageProcessor(apply_ocr=True,lang=lang) | |
encoding_feature_extractor = feature_extractor(image, return_tensors="pt",truncation=True) | |
words, boxes = encoding_feature_extractor.words, encoding_feature_extractor.boxes | |
return words,boxes | |
def create_bounding_box5(vertices, width_scale, height_scale): | |
# Get the x, y coordinates | |
x1 = int(vertices[0].x * width_scale) | |
y1 = int(vertices[0].y * height_scale) | |
x2 = int(vertices[2].x * width_scale) | |
y2 = int(vertices[2].y * height_scale) | |
# Validate x1 < x2 | |
if x1 > x2: | |
x1, x2 = x2, x1 | |
# Validate y1 < y2 | |
if y1 > y2: | |
y1, y2 = y2, y1 | |
# Return valid bounding box | |
return [x1, y1, x2, y2] | |
#Google Vision OCR | |
def process_image_GoogleVision(image, width, height): | |
inference_image = [image.convert("RGB")] | |
client = vision.ImageAnnotatorClient() | |
with io.BytesIO() as output: | |
image.save(output, format='JPEG') | |
content = output.getvalue() | |
image = vision.Image(content=content) | |
response = client.text_detection(image=image) | |
texts = response.text_annotations | |
# Get the bounding box vertices and remove the first item | |
bboxes = [text.bounding_poly.vertices[1:] for text in texts] | |
# Create the list of words and boxes | |
words = [text.description for text in texts] | |
boxes = [create_bounding_box5(bbox, 1000/width, 1000/height) for bbox in bboxes] | |
return words,boxes | |
def generate_unique_colors(id2label): | |
# Generate unique colors | |
label_ints = np.random.choice(len(PIL.ImageColor.colormap), len(id2label), replace=False) | |
label_color_pil = list(PIL.ImageColor.colormap.values()) | |
label_color = [label_color_pil[i] for i in label_ints] | |
color = {} | |
for k, v in id2label.items(): | |
if v[:2] == '': | |
color['o'] = label_color[k] | |
else: | |
color[v[0:]] = label_color[k] | |
return color | |
def create_bounding_box1(bbox_data, width_scale: float, height_scale: float): | |
xs = [] | |
ys = [] | |
for x, y in bbox_data: | |
xs.append(x) | |
ys.append(y) | |
left = int(max(0, min(xs) * width_scale)) | |
top = int(max(0, min(ys) * height_scale)) | |
right = int(min(1000, max(xs) * width_scale)) | |
bottom = int(min(1000, max(ys) * height_scale)) | |
return [left, top, right, bottom] | |
def unnormalize_box(bbox, width, height): | |
return [ | |
width * (bbox[0] / 1000), | |
height * (bbox[1] / 1000), | |
width * (bbox[2] / 1000), | |
height * (bbox[3] / 1000), | |
] | |
def iob_to_label(label): | |
return id2label.get(label, 'others') | |
def process_image(image): | |
custom_config = r'--oem 3 --psm 6' | |
# lang='eng+deu+ita+chi_sim' | |
lang='eng' | |
width, height = image.size | |
feature_extractor = LayoutLMv3FeatureExtractor(apply_ocr=True) | |
encoding_feature_extractor = feature_extractor(image, return_tensors="pt",truncation=True) | |
words, boxes = encoding_feature_extractor.words, encoding_feature_extractor.boxes | |
custom_config = r'--oem 3 --psm 6' | |
# encode | |
inference_image = [image.convert("RGB")] | |
encoding = processor(inference_image , truncation=True, return_offsets_mapping=True, return_tensors="pt", padding="max_length", stride =128, max_length=512, return_overflowing_tokens=True) | |
offset_mapping = encoding.pop('offset_mapping') | |
overflow_to_sample_mapping = encoding.pop('overflow_to_sample_mapping') | |
# change the shape of pixel values | |
x = [] | |
for i in range(0, len(encoding['pixel_values'])): | |
x.append(encoding['pixel_values'][i]) | |
x = torch.stack(x) | |
encoding['pixel_values'] = x | |
# forward pass | |
outputs = model(**encoding) | |
# get predictions | |
predictions = outputs.logits.argmax(-1).squeeze().tolist() | |
token_boxes = encoding.bbox.squeeze().tolist() | |
# only keep non-subword predictions | |
preds = [] | |
l_words = [] | |
bboxes = [] | |
token_section_num = [] | |
if (len(token_boxes) == 512): | |
predictions = [predictions] | |
token_boxes = [token_boxes] | |
for i in range(0, len(token_boxes)): | |
for j in range(0, len(token_boxes[i])): | |
unnormal_box = unnormalize_box(token_boxes[i][j], width, height) | |
if (np.asarray(token_boxes[i][j]).shape != (4,)): | |
continue | |
elif (token_boxes[i][j] == [0, 0, 0, 0] or token_boxes[i][j] == 0): | |
#print('zero found!') | |
continue | |
# if bbox is available in the list, just we need to update text | |
elif (unnormal_box not in bboxes): | |
preds.append(predictions[i][j]) | |
l_words.append(processor.tokenizer.decode(encoding["input_ids"][i][j])) | |
bboxes.append(unnormal_box) | |
token_section_num.append(i) | |
else: | |
# we have to update the word | |
_index = bboxes.index(unnormal_box) | |
if (token_section_num[_index] == i): | |
# check if they're in a same section or not (documents with more than 512 tokens will divide to seperate | |
# parts, so it's possible to have a word in both of the pages and we have to control that repetetive words | |
# HERE: because they're in a same section, so we can merge them safely | |
l_words[_index] = l_words[_index] + processor.tokenizer.decode(encoding["input_ids"][i][j]) | |
else: | |
continue | |
return bboxes, preds, l_words, image | |
def process_image_encoding(model, processor, image, words, boxes,width,height): | |
# encode | |
inference_image = [image.convert("RGB")] | |
encoding = processor(inference_image ,words,boxes=boxes, truncation=True, return_offsets_mapping=True, return_tensors="pt", | |
padding="max_length", stride =128, max_length=512, return_overflowing_tokens=True) | |
offset_mapping = encoding.pop('offset_mapping') | |
overflow_to_sample_mapping = encoding.pop('overflow_to_sample_mapping') | |
# change the shape of pixel values | |
x = [] | |
for i in range(0, len(encoding['pixel_values'])): | |
x.append(encoding['pixel_values'][i]) | |
x = torch.stack(x) | |
encoding['pixel_values'] = x | |
# forward pass | |
outputs = model(**encoding) | |
# get predictions | |
predictions = outputs.logits.argmax(-1).squeeze().tolist() | |
token_boxes = encoding.bbox.squeeze().tolist() | |
# only keep non-subword predictions | |
preds = [] | |
l_words = [] | |
bboxes = [] | |
token_section_num = [] | |
if (len(token_boxes) == 512): | |
predictions = [predictions] | |
token_boxes = [token_boxes] | |
for i in range(0, len(token_boxes)): | |
for j in range(0, len(token_boxes[i])): | |
unnormal_box = unnormalize_box(token_boxes[i][j], width, height) | |
if (np.asarray(token_boxes[i][j]).shape != (4,)): | |
continue | |
elif (token_boxes[i][j] == [0, 0, 0, 0] or token_boxes[i][j] == 0): | |
#print('zero found!') | |
continue | |
# if bbox is available in the list, just we need to update text | |
elif (unnormal_box not in bboxes): | |
preds.append(predictions[i][j]) | |
l_words.append(processor.tokenizer.decode(encoding["input_ids"][i][j])) | |
bboxes.append(unnormal_box) | |
token_section_num.append(i) | |
else: | |
# we have to update the word | |
_index = bboxes.index(unnormal_box) | |
if (token_section_num[_index] == i): | |
# check if they're in a same section or not (documents with more than 512 tokens will divide to seperate | |
# parts, so it's possible to have a word in both of the pages and we have to control that repetetive words | |
# HERE: because they're in a same section, so we can merge them safely | |
l_words[_index] = l_words[_index] + processor.tokenizer.decode(encoding["input_ids"][i][j]) | |
else: | |
continue | |
return bboxes, preds, l_words, image | |
def process_form_(json_df): | |
labels = [x['LABEL'] for x in json_df] | |
texts = [x['TEXT'] for x in json_df] | |
cmb_list = [] | |
for i, j in enumerate(labels): | |
cmb_list.append([labels[i], texts[i]]) | |
grouper = lambda l: [[k] + sum((v[1::] for v in vs), []) for k, vs in groupby(l, lambda x: x[0])] | |
list_final = grouper(cmb_list) | |
lst_final = [] | |
for x in list_final: | |
json_dict = {} | |
json_dict[x[0]] = (' ').join(x[1:]) | |
lst_final.append(json_dict) | |
return lst_final | |
def createExcel(maindf, detailsdf, pdffile): | |
outputPath = f'{pdffile}.xlsx' | |
with pd.ExcelWriter(outputPath, engine='xlsxwriter') as writer: | |
maindf.to_excel(writer, sheet_name='headers', index=False) | |
detailsdf.to_excel(writer, sheet_name='details', index=False) | |
worksheet1 = writer.sheets["headers"] | |
for idx, col in enumerate(maindf): | |
series = maindf[col] | |
max_len = max(( | |
series.astype(str).map(len).max(), | |
len(str(series.name)) | |
)) + 1 | |
worksheet1.set_column(idx, idx, max_len) | |
worksheet2 = writer.sheets["details"] | |
for idx, col in enumerate(detailsdf): | |
series = detailsdf[col] | |
max_len = max(( | |
series.astype(str).map(len).max(), | |
len(str(series.name)) | |
)) + 1 | |
worksheet2.set_column(idx, idx, max_len) | |
return outputPath | |
def visualize_image(final_bbox, final_preds, l_words, image,label2color): | |
draw = ImageDraw.Draw(image) | |
font = ImageFont.load_default() | |
json_df = [] | |
for ix, (prediction, box) in enumerate(zip(final_preds, final_bbox)): | |
if prediction is not None: | |
predicted_label = iob_to_label(prediction).lower() | |
if predicted_label not in ["others"]: | |
draw.rectangle(box, outline=label2color[predicted_label]) | |
draw.text((box[0]+10, box[1]-10), text=predicted_label, fill=label2color[predicted_label], font=font) | |
json_dict = {} | |
json_dict['TEXT'] = l_words[ix] | |
json_dict['LABEL'] = label2color[predicted_label] | |
json_df.append(json_dict) | |
return image, json_df | |
def rotate_image(image): | |
extracted_text = pytesseract.image_to_string(image) | |
# check if the image contains any text | |
if not extracted_text: | |
print("The image does not contain any text.") | |
return None | |
elif extracted_text.isspace(): | |
print("The image contains only spaces.") | |
return None | |
text = pytesseract.image_to_osd(image) | |
angle = int(re.search('(?<=Rotate: )\d+', text).group(0)) | |
angle = 360 - angle | |
rotated = ndimage.rotate(image, angle) | |
data = Image.fromarray(rotated) | |
return data | |
# correct the skewness of images | |
def correct_skew(image, delta=1, limit=5): | |
def determine_score(arr, angle): | |
data = inter.rotate(arr, angle, reshape=False, order=0) | |
histogram = np.sum(data, axis=1, dtype=float) | |
score = np.sum((histogram[1:] - histogram[:-1]) ** 2, dtype=float) | |
return histogram, score | |
# Convert the PIL Image object to a numpy array | |
image = np.asarray(image.convert('L'), dtype=np.uint8) | |
# Apply thresholding | |
thresh = cv2.threshold(image, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)[1] | |
scores = [] | |
angles = np.arange(-limit, limit + delta, delta) | |
for angle in angles: | |
histogram, score = determine_score(thresh, angle) | |
scores.append(score) | |
best_angle = angles[scores.index(max(scores))] | |
(h, w) = image.shape[:2] | |
center = (w // 2, h // 2) | |
M = cv2.getRotationMatrix2D(center, best_angle, 1.0) | |
corrected = cv2.warpAffine(image, M, (w, h), flags=cv2.INTER_CUBIC, \ | |
borderMode=cv2.BORDER_REPLICATE) | |
return best_angle, corrected | |
def removeBorders(img): | |
result = img.copy() | |
if len(result.shape) == 2: | |
# if the input image is grayscale, convert it to BGR format | |
result = cv2.cvtColor(result, cv2.COLOR_GRAY2BGR) | |
gray = cv2.cvtColor(result, cv2.COLOR_BGR2GRAY) # convert to grayscale | |
thresh = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)[1] | |
# Remove horizontal lines | |
horizontal_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (40,1)) | |
remove_horizontal = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, horizontal_kernel, iterations=2) | |
cnts = cv2.findContours(remove_horizontal, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
cnts = cnts[0] if len(cnts) == 2 else cnts[1] | |
for c in cnts: | |
cv2.drawContours(result, [c], -1, (255,255,255), 5) | |
# Remove vertical lines | |
vertical_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (1,40)) | |
remove_vertical = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, vertical_kernel, iterations=2) | |
cnts = cv2.findContours(remove_vertical, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
cnts = cnts[0] if len(cnts) == 2 else cnts[1] | |
for c in cnts: | |
cv2.drawContours(result, [c], -1, (255,255,255), 5) | |
return result | |
def color2label_except(label2color, excluded_labels): | |
""" | |
Inversely maps colors to labels based on the provided label2color dictionary, | |
excluding the specified labels. | |
Args: | |
label2color (dict): Dictionary mapping labels to colors. | |
excluded_labels (list): List of labels to exclude. | |
Returns: | |
dict: Dictionary mapping colors to labels, excluding the specified labels. | |
""" | |
# Filter out excluded labels from label2color dictionary | |
filtered_label2color = {label: color for label, color in label2color.items() if label not in excluded_labels} | |
# Invert the filtered label2color dictionary to create color2label mapping | |
return {v: k for k, v in filtered_label2color.items()} | |
def add_dataframe(df_main,labels_repeating,label2color): | |
col_name_map =color2label_except(label2color,labels_repeating) | |
columns = list(col_name_map.values()) | |
data = {col:[] for col in columns} | |
for i in df_main: | |
for k, v in i.items(): | |
if k in col_name_map: | |
data[col_name_map[k]].append(v) | |
# join the list of strings for each column and convert to a dataframe | |
for col in columns: | |
data[col] = [' '.join(data[col])] | |
df_upper = pd.DataFrame(data) | |
key_value_pairs = [] | |
for col in df_upper.columns: | |
key_value_pairs.append({'key': col, 'value': df_upper[col][0]}) | |
df_key_value = pd.DataFrame(key_value_pairs) | |
# Extract the value from the containertype column | |
# container_quantity = int(df_key_value[df_key_value['key'] == 'containertype']['value'].str.split("x").str[0]) | |
# # Add a new row to the DataFrame | |
# df_key_value = df_key_value.append({'key': 'containerquantity', 'value': container_quantity}, ignore_index=True) | |
# # Extract the desired value from the containertype column | |
# df_key_value.loc[df_key_value['key'] == 'containertype', 'value'] = df_key_value.loc[df_key_value['key'] == 'containertype', 'value'].str.split("x").str[1] | |
return df_key_value | |
import statistics | |
def id2label_row(s, id2label): | |
if s in id2label.values(): | |
return s | |
return id2label[s] | |
def dist_height(y1,y2): | |
return abs(int(y1)- int(y2)) | |
def mergeBoxes(df): | |
xmin, ymin, xmax, ymax = [], [], [], [] | |
for i in range(df.shape[0]): | |
box = df['bbox_column'].iloc[i] | |
xmin.append(box[0]) | |
ymin.append(box[1]) | |
xmax.append(box[2]) | |
ymax.append(box[3]) | |
return [min(xmin), min(ymin), max(xmax), max(ymax)] | |
def transform_dataset(df, merge_labels): | |
df_temp = df.iloc[merge_labels] # a duplicate df with only concerned rows | |
df_temp.reset_index(drop = True, inplace = True) | |
text = ' '.join(df_temp['scr_column']) | |
bbox = mergeBoxes(df_temp) | |
retain_index = merge_labels[0] #the first index is parent row | |
df['scr_column'].iloc[retain_index] = text | |
df['bbox_column'].iloc[retain_index] = bbox | |
# keeping the first & removing rest | |
df = df.loc[~df.index.isin(merge_labels[1:]), :] | |
df.reset_index(drop = True, inplace = True) | |
return df | |
def box_overlap(box1, box2, horizontal_vertical): | |
# Extract coordinates of box1 | |
x1_box1, y1_box1, x2_box1, y2_box1 = box1 | |
# Extract coordinates of box2 | |
x1_box2, y1_box2, x2_box2, y2_box2 = box2 | |
# Check if boxes overlap horizontally and vertically | |
if horizontal_vertical == "H": | |
if x1_box1 <= x2_box2 and x2_box1 >= x1_box2: | |
return True | |
else: | |
return False | |
if horizontal_vertical == "V": | |
if y1_box1 <= y2_box2 and y2_box1 >= y1_box2: | |
return True | |
else: | |
return False | |
def horizonatal_merging(df, font_length, perform_overlapping =False, x_change = 0, y_change = 0): | |
fat_df = df.copy() | |
for i in range(df.shape[0]): | |
box = fat_df['bbox_column'].iloc[i] | |
fat_df['bbox_column'].iloc[i] = [box[0]-x_change, box[1]-y_change, box[2]+x_change, box[3] + y_change] | |
if perform_overlapping == True: | |
redundant_rows = [] | |
for i in range(fat_df.shape[0]): | |
box_i = fat_df.bbox_column[i] | |
indices2merge = [] | |
for j in range(i+1, fat_df.shape[0]): | |
if fat_df.preds_column[j] == fat_df.preds_column[i]: # if labels are same | |
box_j = fat_df.bbox_column[j] | |
if abs(box_i[1]-box_j[3])<font_length*1.5: # if the boxes are at height within 50% more range of font size | |
# Check if boxes overlap horizontally | |
if box_overlap(box_i, box_j, 'H'): | |
indices2merge.append(j) | |
df.scr_column[i] += df.scr_column[j] | |
box_i = fat_df.bbox_column[j] # finding the next connected word | |
#once we have all indices that belong to a particular category | |
# merging the boundong boxes, keeping them in 1st note/row. | |
if len(indices2merge)!=0: | |
df['bbox_column'].iloc[i] = mergeBoxes(df.loc[indices2merge]) | |
redundant_rows.extend(indices2merge) | |
#now since all the transformation is done, lets remove the redundant rows | |
return df.drop(redundant_rows) | |
def mergeLabelsExtensive_repeating(df_grouped, repeating_label): | |
df_grouped.reset_index(inplace = True, drop = True) | |
# this function merges same label entities together in a single instance. | |
df_grouped = df_grouped[df_grouped['preds_column'].isin(repeating_label)] | |
font_length =0 | |
count = 0 | |
while count<5 and count<df_grouped.shape[0]: | |
box_i = df_grouped['bbox_column'].iloc[count] # box of current label contains [x1,y1,x3,y3] | |
font_length += box_i[3]-box_i[1] | |
count +=1 | |
font_length = font_length/5 | |
df_grouped = horizonatal_merging(df_grouped, font_length, True, 30, 0) | |
return df_grouped | |
def group_labels_wrt_height(df): | |
""" | |
This function groups the labels based on the height of the bounding box. | |
""" | |
#sorting the lines based on heights using column 'y_axis' | |
df = df.sort_values(by='y_axis') | |
df.reset_index(inplace = True, drop = True) | |
print("entering: group_labels_wrt_height ") | |
final_yaxis = [] | |
final_scr = [] | |
final_pred = [] | |
current_group = [] | |
current_scr = [] | |
current_pred = [] | |
# Iterate through the column values | |
for i, (value,scr,preds ) in enumerate(zip(df['y_axis'], df['scr_column'], df['preds_column'])): | |
if i == 0: | |
# Start a new group with the first value | |
current_group.append(value) | |
current_scr.append(scr) | |
current_pred.append(preds) | |
else: | |
# Check if the difference between the current value and the previous value is <= 20 | |
if abs(value - df['y_axis'][i - 1]) <= 35: | |
# Add the value to the current group | |
current_group.append(value) | |
current_scr.append(scr) | |
current_pred.append(preds) | |
else: | |
# Start a new group with the current value | |
final_yaxis.append(current_group) | |
final_scr.append(current_scr) | |
final_pred.append(current_pred) | |
current_group = [value] | |
current_scr = [scr] | |
current_pred = [preds] | |
# Add the last group | |
final_yaxis.append(current_group) | |
final_scr.append(current_scr) | |
final_pred.append(current_pred) | |
final_grouped_df = pd.DataFrame({'y_axis': final_yaxis, 'scr_column': final_scr, 'preds_column': final_pred}) | |
print("Grouped df after sorting based on height") | |
print_df(final_grouped_df) | |
return final_grouped_df | |
# searches the set of labels in the whole range | |
def search_labelSet_height_range(df, d, keyList): | |
print("search_labelSet_height_range") | |
keyDict = dict.fromkeys(keyList, []) #stores the required information as dictonary, then coverted to df | |
print("Dataframe from extraction is going to happen: ") | |
for i in range(df.shape[0]): # search df for right-bottom y axis value and check if it lies within the range d. | |
box = df['bbox_column'].iloc[i] | |
if dist_height(box[1], d)<50: | |
key = df['preds_column'].iloc[i] | |
keyDict[key] = df['scr_column'].iloc[i] | |
return keyDict | |
def clean_colText(df, column): | |
for i in range(df.shape[0]): | |
df[column].iloc[i] = df[column].iloc[i].replace('[', '').replace('|', '').replace('+', '') | |
return df | |
def find_repeatingLabels(df, labels_repeating): | |
print("In find_repeatingLabels: ") | |
row2drop = [] # dropping the rows that have been covered in previous dataframe | |
for i in range(df.shape[0]): | |
df['preds_column'].iloc[i] = id2label_row(df['preds_column'].iloc[i], id2label) | |
if df['preds_column'].iloc[i] not in labels_repeating: | |
row2drop.append(i) | |
df.drop(index = row2drop, inplace = True) | |
df = clean_colText(df, 'scr_column') | |
print("removing non-tabular labels.") | |
df = mergeLabelsExtensive_repeating(df,labels_repeating) | |
print('after merging non-tabular labels: ') | |
labels_repeating = list(set(list(df["preds_column"]))) | |
print("labels_repeating in this document are: ",labels_repeating) | |
# adding extra column that contains the Y-axis information (Height) | |
df['y_axis'] = np.NaN | |
for i in range(df.shape[0]): | |
box = df['bbox_column'].iloc[i] | |
df['y_axis'].iloc[i] = box[1] | |
print("After adding y-axis data in the dataframes: ") | |
df = mergeLabelsExtensive(df) | |
print("aftermerging the df extensively") | |
print("Grouping the labels wrt heights: ") | |
grouped_df = group_labels_wrt_height(df) | |
#once labels are grouped, now we will create dictionaries for labels and values occuring in single line | |
row_dicts = [] # will contains each row of df as single dictionary. | |
for _, row in grouped_df.iterrows(): | |
row_dict = {} | |
for preds, scr in zip(row['preds_column'], row['scr_column']): | |
row_dict[preds] = scr | |
row_dicts.append(row_dict) | |
#creating new | |
final_df = pd.DataFrame(columns=labels_repeating) | |
for d in row_dicts: | |
final_df = final_df.append(d, ignore_index=True) | |
final_df = final_df.fillna('') | |
return final_df | |
def mergeImageVertical(images): | |
# pick the image which is the smallest, and resize the others to match it (can be arbitrary image shape here) | |
min_shape = sorted( [(np.sum(i.size), i.size ) for i in images])[0][1] | |
imgs_comb = np.hstack([i.resize(min_shape) for i in images]) | |
# for a vertical stacking it is simple: use vstack | |
imgs_comb = np.vstack([i.resize(min_shape) for i in images]) | |
imgs_comb = Image.fromarray(imgs_comb) | |
return imgs_comb | |
def perform_erosion(img): | |
# Check if the image is already in grayscale | |
if len(img.shape) == 2: | |
gray = img | |
else: | |
# Convert the image to grayscale | |
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
# Define the kernel for erosion and dilation | |
kernel = np.ones((3, 3), np.uint8) | |
# Perform erosion followed by dilation | |
erosion = cv2.erode(gray, kernel, iterations=1) | |
dilation = cv2.dilate(erosion, kernel, iterations=1) | |
# Double the size of the image | |
double_size = cv2.resize(gray, None, fx=2, fy=2, interpolation=cv2.INTER_LINEAR) | |
# Perform erosion on the doubled image | |
double_erosion = cv2.erode(double_size, kernel, iterations=1) | |
return double_erosion | |
def remove_leading_trailing_special_characters(input_string): | |
cleaned_string = re.sub(r'^[^A-Za-z0-9]+|[^A-Za-z0-9]+$', '', str(input_string)) | |
return cleaned_string | |
def clean_dataframe(df): | |
# Apply the remove_leading_trailing_special_characters function to all string columns | |
for column in df.select_dtypes(include='object').columns: | |
df[column] = df[column].apply(remove_leading_trailing_special_characters) | |
# Remove rows with all NaN or blank values | |
df = df.fillna('') # Replace NaN values with blank | |
return df | |
def mergeLabelsExtensive(df_grouped): | |
i = 0 | |
while i < df_grouped.shape[0]: | |
merge_labels = [i] # collects indices whose data has been merged, so we need to delete it now. | |
label = df_grouped['preds_column'].iloc[i] | |
box1 = df_grouped['bbox_column'].iloc[i] | |
for j in range(i+1, df_grouped.shape[0]): | |
box2 = df_grouped['bbox_column'].iloc[j] | |
if label == df_grouped['preds_column'].iloc[j] and dist_height(box1[3], box2[3])<20: # which are in the vicinity of 20 pixels. | |
merge_labels.append(j) | |
print_df(df_grouped) | |
df_grouped = transform_dataset(df_grouped, merge_labels) | |
i = i+1 | |
return df_grouped | |
def multilabelsHandle(df, thermo_details): | |
# Since 0 is assigned to 'others' and these values are not so important. We delete these values. | |
df = df[df.preds_column != 0] | |
df.reset_index(drop=True, inplace=True) | |
for i in range(df.shape[0]): | |
df['preds_column'].iloc[i] = id2label.get(df['preds_column'].iloc[i]) | |
df['preds_column'].unique() | |
df_grouped = df.copy() #stores the index of relevant labels. | |
df_grouped.shape[0] | |
for i in range(df.shape[0]): | |
if df['preds_column'].iloc[i] not in thermo_details: | |
df_grouped.drop(i, inplace = True) | |
df_grouped.reset_index(drop=True, inplace=True) | |
keyList = df_grouped['preds_column'].unique() | |
df_grouped = mergeLabelsExtensive(df_grouped) | |
# extract the height of boxes | |
df_grouped = extract_yaxis(df_grouped) | |
shipment_labels = ['delivery_name','delivery_address','contact_phone'] | |
# shipment | |
heights_shipment = get_heights(df_grouped, shipment_labels) | |
# now segregating the other repeating values in df like measiure, weight, volume etc. | |
# they will be containeed within the heights, as they act as boudaries. | |
df_labelSet = pd.DataFrame(columns= thermo_details) | |
for i in range(len(heights_shipment)): | |
if i == len(heights_shipment)-1: | |
new_df = search_labelSet_between_h1_h2(df_grouped, heights_shipment[i], 5000, keyList) | |
else: | |
new_df = search_labelSet_between_h1_h2(df_grouped, heights_shipment[i], heights_shipment[i+1], keyList) | |
df_labelSet = df_labelSet.append(new_df, ignore_index=True) | |
return df_labelSet | |
def completepreprocess(pdffile,ocr_type): | |
myDataFrame = pd.DataFrame() | |
myDataFrame2 = pd.DataFrame() | |
merge_pages=[] | |
doc = fitz.open(pdffile) | |
for i in range(0, len(doc)): | |
page = doc.load_page(i) | |
zoom = 2 | |
mat = fitz.Matrix(zoom, zoom) | |
pix = page.get_pixmap(matrix = mat, dpi = 300) | |
image = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
ro_image = rotate_image(image) | |
if ro_image is None: | |
return None | |
angle, skewed_image = correct_skew(ro_image) | |
if skewed_image is None: | |
return None | |
remove_border = removeBorders(skewed_image) | |
image = Image.fromarray(remove_border) | |
width,height=image.size | |
label2color = generate_unique_colors(id2label) | |
width,height=image.size | |
if ocr_type == "GoogleVisionOCR": | |
words, boxes = process_image_GoogleVision(image, width, height) | |
else: | |
words, boxes = process_image_pytesseract(image, width, height) | |
bbox, preds, words, image = process_image_encoding(model, processor, image, words, boxes,width,height) | |
im, df_visualize = visualize_image(bbox, preds, words, image,label2color) | |
df_main = process_form_(df_visualize) | |
bbox_column = bbox | |
preds_column = preds | |
scr_column = words | |
# dictionary of lists | |
dict = {'bbox_column': bbox_column, 'preds_column': preds_column, 'scr_column': scr_column} | |
df_single_page = pd.DataFrame(dict) | |
labels_repeating = ['art_code', 'ref_code', 'detail_desc','lot_id','detail_qty','detail_um','detail_tare','detail_grossw','detail_netw','detail_origin','varieta','raccolta'] | |
df_repeating_page = find_repeatingLabels(df_single_page, labels_repeating) | |
myDataFrame2= myDataFrame2.append(df_repeating_page,sort=False) | |
df1=add_dataframe(df_main,labels_repeating,label2color).astype(str) | |
myDataFrame= myDataFrame.append(df1,sort=False).reset_index(drop = True) | |
myDataFrame['value'].apply(len) | |
row2drop = [] | |
for i in range(myDataFrame.shape[0]): | |
if len( myDataFrame['value'].iloc[i]) ==0: | |
row2drop.append(i) | |
myDataFrame.drop(index = row2drop, inplace = True) | |
myDataFrame.reset_index(drop = True, inplace = True) | |
myDataFrame = myDataFrame[myDataFrame["value"].notnull()] | |
myDataFrame.drop_duplicates(subset=["key"],inplace=True) | |
myDataFrame2 = myDataFrame2.loc[:, ~(myDataFrame2.apply(lambda x: all(isinstance(val, list) and len(val) == 0 for val in x)))] | |
merge_pages.append(im) | |
im2=mergeImageVertical(merge_pages) | |
myDataFrame2 = clean_dataframe(myDataFrame2) | |
myDataFrame = clean_dataframe(myDataFrame) | |
myDataFrame = myDataFrame[myDataFrame['key'] != 'others'] | |
output_excel_path = createExcel(myDataFrame, myDataFrame2, pdffile.name) | |
return im2,myDataFrame,myDataFrame2,output_excel_path | |
title = "Interactive demo: Document Information Extraction model PDF/Images" | |
description = "Upload your own document, or use the one given below at the left corner. Results will show up in a few seconds. The annotated image can be opened in a new window for a better view." | |
css = """.output_image, .input_image {height: 600px !important}""" | |
examples = [["sample_doc.pdf"]] | |
iface = gr.Interface( | |
fn=completepreprocess, | |
inputs=[ | |
gr.components.File(label="PDF"), | |
gr.components.Dropdown(label="Select the OCR", choices=["Pytesseract","GoogleVisionOCR"]), | |
], | |
outputs=[ | |
gr.components.Image(type="pil", label="annotated image"), | |
"dataframe", | |
"dataframe" | |
#gr.File(label="Excel output") | |
], | |
title=title, | |
description=description, | |
examples=examples, | |
css=css | |
) | |
iface.launch(inline=True, debug=True) |