File size: 8,697 Bytes
004a744
 
 
 
 
 
5c621a1
004a744
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4f784ce
4b491a9
004a744
 
 
 
 
 
 
 
 
 
 
 
5c621a1
 
004a744
 
 
5c621a1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
import spacy
from spacy.language import Language
from spacy.lang.it import Italian
import re
from transformers import pipeline
from gradio.inputs import File
import gradio as gr
from pdf2image import convert_from_path
import pytesseract
import tempfile
import os
from gradio.inputs import Dropdown
import gradio as gr
import tempfile
import os
from pdf2image import convert_from_path
import pytesseract
import fitz
from pdf2image import convert_from_bytes


def preprocess_punctuation(text):
  pattern = r'(?<![a-z])[a-zA-Z\.]{1,4}(?:\.[a-zA-Z\.]{1,4})*\.(?!\s*[A-Z])'
  matches = re.findall(pattern, text)
  res = [*set(matches)]
  #res = [r for r in res if not nlp(r).ents or 
       #not any(ent.label_ in nlp.get_pipe('ner').labels for ent in nlp(r).ents)] #optimized
  return res


def preprocess_text(text):
  prep_text = re.sub(r'\n\s*\n', '\n', text)
  prep_text = re.sub(r'\n{2,}', '\n', prep_text)
#string_with_single_newlines_and_no_blank_lines = re.sub(r' {2,}', ' ', string_with_single_newlines_and_no_blank_lines)
#print(string_with_single_newlines_and_no_blank_lines)
  return prep_text



@Language.component('custom_tokenizer')
def custom_tokenizer(doc):
    # Define a custom rule to ignore colons as a sentence boundary
    for token in doc[:-1]:
        if (token.text == ":"):
            doc[token.i+1].is_sent_start = False
    return doc



def get_sentences(text, dictionary = None):
  cl_sentences = []
  chars_to_strip = [' ', '\n']
  chars_to_strip_str = ''.join(set(chars_to_strip))
  nlp = spacy.load("it_core_news_lg")  #load ita moodel
  nlp.add_pipe("custom_tokenizer", before="parser")

  for punct in preprocess_punctuation(text):
    nlp.tokenizer.add_special_case(punct, [{spacy.symbols.ORTH: punct, spacy.symbols.NORM: punct}])

  doc = nlp(text)  # Process the text with spaCy
  sentences = list(doc.sents)  # Split the text into sentences
  for sentence in sentences:
    sent = sentence.text
    cl_sentence = ' '.join(filter(None, sent.lstrip(chars_to_strip_str).rstrip(chars_to_strip_str).split(' ')))
    if cl_sentence!= '':
      cl_sentences.append(cl_sentence)
  return cl_sentences




def extract_numbers(text, given_strings):
    # Split text into a list of words
    words = text.split()
    # Find the indices of the given strings in the list of words
    indices = [i for i, word in enumerate(words) if any(s in word for s in given_strings)]
    # Initialize an empty list to store the numbers
    numbers = []
    # Loop through each index
    for index in indices:
        # Define the range of words to search for numbers
        start = max(index - 1, 0)
        end = min(index + 2, len(words))
        # Extract the words within the range
        context = words[start:end]
        # Check if the context contains mathematical operators
        if any(re.match(r'[+\*/]', word) for word in context):
            continue
        # Find all numbers in the context
        context_numbers = [
            float(re.sub('[^0-9\.,]+', '', word).replace(',', '.'))
            if re.sub('[^0-9\.,]+', '', word).replace(',', '.').replace('.', '', 1).isdigit()
            else int(re.sub('[^0-9]+', '', word))
            if re.sub('[^0-9]+', '', word).isdigit()
            else None
            for word in context
        ]
        # Add the numbers to the list
        numbers.extend(context_numbers)
    return numbers



def get_text_and_values(text, key_list):
  sentences = get_sentences(text)
  total_numbers= []
  infoDict = {}
  for sentence in sentences:
    numbers = extract_numbers(text = sentence, given_strings = key_list)
    total_numbers.append(numbers)
    if not numbers:
      continue
    else: infoDict[sentence] = numbers
  return infoDict


def get_useful_text(dictionary):
  keysList = list(dictionary.keys())
  tes = ('\n'.join(keysList))
  return tes

def get_values(dictionary):
  pr = list(dictionary.values())
  return pr


def initialize_qa_transformer(model):
  qa = pipeline("text2text-generation", model=model)
  return qa


def get_answers_unfiltered(dictionary, question, qa_pipeline):
  keysList = list(dictionary.keys())
  answers = []
  for kl in keysList:
    answer = qa_pipeline(f'{kl} Domanda: {question}')
    answers.append(answer)
  return answers


def get_total(answered_values, text, keywords, raw_values, unique_values = False):
  numeric_list = [num for sublist in raw_values for num in sublist if isinstance(num, (int, float))]
  #numbers = [float(x[0]['generated_text']) for x in answered_values if x[0]['generated_text'].isdigit()]
  pattern = r'\d+(?:[.,]\d+)?'
  numbers = []
  for sub_lst in answered_values:
      for d in sub_lst:
          for k, v in d.items():
            # Replace commas with dots
              v = v.replace(',', '.')
            # Extract numbers and convert to float
              numbers += [float(match) for match in re.findall(pattern, v) if (float(match) >= 5.0) and (float(match) in numeric_list)]
  ###### remove duplicates
  if unique_values:
    numbers = list(set(numbers))
  ######
  total = 0
  sum = 0
  total_list = []
# Define a regular expression pattern that will match a number
  pattern = r'\d+'
# Loop through the keywords and search for them in the text
  found = False
  for keyword in keywords:
    # Build a regular expression pattern that looks for the keyword
    # followed by up to three words, then a number
      keyword_pattern = f'{keyword}(\\s+\\w+){{0,3}}\\s+({pattern})'
      match = re.search(keyword_pattern, text, re.IGNORECASE)
      if match:
        # If we find a match, print the number and set found to True
          number = match.group(2)
          if (number in numbers) and (number in numeric_list):
            total_list.append(int(number))
            print(f"Found a value ({number}) for keyword '{keyword}'.")
            found = True  

# If we didn't find a match
  if not found:
    for value in numbers:
      if value in numeric_list:
        total += value
    total_list.append(total)
 #If there is more than one total, it means different lots with many total measures for each house. Calculate the sum of the totals mq  
  for value in total_list:
    sum += value
  return numbers, sum




def extractor_clean(text, k_words, transformer, question, total_kwords, return_text = False):

  tex = ''
  dictionary = get_text_and_values(text, k_words)
  raw = get_values(dictionary)
  qa = initialize_qa_transformer(transformer)
  val = get_answers_unfiltered(dictionary, question = question, qa_pipeline = qa)
  keywords = ['totale', 'complessivo', 'complessiva']
  values = get_total(answered_values= val, raw_values = raw, text = text, keywords = total_kwords, unique_values = True)
  if return_text:
    tex = get_useful_text(dictionary)
    return values, return_text, tex
  elif return_text == False:
    return values, return_text



def format_output(extracted_values):
    output = f"Valori: {extracted_values[0][0]}\n"
    output += f"Totale: {extracted_values[0][1]}\n"
    if extracted_values[1] == True:
        output += "-------------------\n"
        output += f"Rif. Testo:\n{extracted_values[2]}"
    return output




def pdf_ocr(file):
    # Convert PDF to image
    with tempfile.TemporaryDirectory() as path:
        with open(file, "rb") as f:
            content = f.read()

        with fitz.open(stream=content, filetype="pdf") as doc:
            num_pages = len(doc)

            # Extract text from the PDF
            text = ""
            for page in doc:
                text += page.get_text()

            # Perform OCR on the PDF if the extracted text is empty
            if not text:
                # Convert PDF pages to images
                images = convert_from_path(content)
                for i, img in enumerate(images):
                    text += pytesseract.image_to_string(img, lang='ita')

                # Clear the image list to free up memory
                del images

    # Call extractor_clean and format_output functions
    ks = ('mq', 'metri quadri', 'm2')
    tra = 'it5/it5-base-question-answering'
    quest = "Quanti metri quadri misura l'immobile?"
    totalK = ['totale', 'complessivo', 'complessiva']

    extracted_values = extractor_clean(text=text, k_words=ks, transformer=tra, question=quest, total_kwords=totalK, return_text=True)
    output = format_output(extracted_values=extracted_values)

    return output


def ocr_interface(pdf_file):
    # Call the pdf_ocr function
    ocr_output = pdf_ocr(pdf_file.name)
    return ocr_output


pdf_input = gr.inputs.File(label="PDF File")
output_text = gr.outputs.Textbox(label="Output")
iface = gr.Interface(fn=ocr_interface, inputs=pdf_input, outputs=output_text)
iface.launch()