import PyPDF2 from docx import Document import io from langchain.embeddings.openai import OpenAIEmbeddings from langchain.text_splitter import CharacterTextSplitter from typing_extensions import Concatenate from typing import List # from langchain_community.llms import OpenAI from langchain_community.callbacks import get_openai_callback from langchain.output_parsers import PydanticOutputParser from langchain.prompts import PromptTemplate from langchain_core.pydantic_v1 import BaseModel, Field, validator import os import logging import base64 from langchain_openai import OpenAI import re import json from typing import Optional import tiktoken import time from tenacity import ( retry, stop_after_attempt, wait_random_exponential, ) #Setting the openai api key api_key=os.getenv('OPENAI_API_KEY') class Candidate(BaseModel): brand: Optional[str] = Field(default=None, description="INSERT BRAND NAME FROM THE RECEIPT OCR TEXT. IF NOT PRESENT RETURN null") total_cost: Optional[str] = Field(default=None, description="INSERT TOTAL COST FROM THE RECEIPT OCR TEXT. TOTAL AMOUNT IS MAXIMUM VALUE IN THE OCR TEXT. IF NOT PRESENT RETURN null") location: Optional[str] = Field(default=None, description="INSERT LOCATION FROM THE RECEIPT OCR TEXT. IF NOT PRESENT RETURN null") purchase_category: Optional[str] = Field(default=None, description="INSERT PURCHASE CATEGORY FROM THE RECEIPT OCR TEXT. IF NOT PRESENT RETURN null") brand_category: Optional[str] = Field(default=None, description="""INSERT BRAND CATEGORY FROM THE RECEIPT OCR TEXT. CHOOSE CLOSEST BRAND CATEGORY BASED ON THE OCR FROM THIS ARRAY ["Fashion and Apparel","Jewelry and Watches","Beauty and Personal Care","Automobiles","Real Estate","Travel and Leisure","Culinary Services","Home and Lifestyle","Technology and Electronics","Sports and Leisure","Art and Collectibles","Health and Wellness","Stationery and Writing Instruments","Children and Baby","Pet Accessories","Financial Services","Airline Services","Accommodation Services","Beverages Services","Services"] ELSE IF NOT PRESENT RETURN null""") Date: Optional[str] = Field(default=None, description="INSERT RECEIPT DATE FROM THE RECEIPT OCR TEXT. IF NOT PRESENT RETURN null. FORMAT: dd-mm-yyyy") currency: Optional[str] = Field(default=None, description="INSERT CURRENCY FROM THE RECEIPT OCR TEXT. LOOK FOR CURRENCY SYMBOLS (e.g., $, €, £, ¥) OR CURRENCY CODES (e.g., USD, EUR, GBP, JPY).ALWAYS RETURN CURRENCY CODE.IF NOT FOUND RETURN null.") filename: Optional[str] = Field(default=None, description="GENERATE A FILENAME BASED ON THE RECEIPT OCR TEXT. USE THE FORMAT: 'PURCHASE_TYPE_BRAND_DATE' (e.g., 'clothing_gucci_20230715'). USE UNDERSCORES FOR SPACES.IF YOU CANNOT FIND THE COMPONENTS RETURN THIS FIELD AS NULL.") payment_method: Optional[str] = Field(default=None, description="INSERT PAYMENT METHOD FROM THE RECEIPT OCR TEXT. LOOK FOR KEYWORDS LIKE 'CASH', 'CARD', 'CREDIT', 'DEBIT', 'VISA', 'MASTERCARD', 'AMEX', 'PAYPAL', ETC. IF NOT FOUND RETURN null.") @retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6)) def openai_response(model:OpenAI,input:str): result = model.invoke(input) return result def strcuture_document_data(raw_text:str)->dict: raw_text = ensure_token_limit(raw_text) try: model_name = "gpt-3.5-turbo-instruct" temperature = 0.0 model = OpenAI(model_name=model_name, temperature=temperature, max_tokens=256) # doc_query = ( # "Extract and return strictly a JSON object containing only the following keys strictly : brand , total_cost , location , no_of_items , purchase_category,brand_category , Date ." # "\nReceipt Data:\n" + raw_text + "\nRemember the response should only be in JSON format very Strictly and it should have these keys brand , total_cost(LOOK FOR THE HIGHEST VALUE IN RECEIPT OCR TEXT) , location , no_of_items , purchase_category,brand_category , Date , very Strictly.\n" # ) doc_query= ( "Extract and return strictly a JSON object containing only the following keys: brand, total_cost, location, purchase_category, brand_category, Date , currency ,filename,payment_method .FOR total_cost LOOK FOR THE HIGHEST VALUE IN RECEIPT OCR TEXT. Ensure that if a value is not present in the OCR text, it is returned as null." ) parser = PydanticOutputParser(pydantic_object=Candidate) prompt = PromptTemplate( template="""Your primary goal is to take my receipt OCR text and then return back a parsable json. Below is the receipt OCR:.\n {raw_text} \n These are the format instructions telling you to convert the data into json :\n {format_instructions}\nDo not include descriptions or explanations from the Candidate class in the JSON output. The response must be a valid JSON object.\n Follow the below instrcution very strictly:\n {query} \n""", input_variables=["query"], partial_variables={"format_instructions": parser.get_format_instructions(),"raw_text":raw_text}, ) input = prompt.format_prompt(query=doc_query) result = openai_response(model,input.to_string()) print(f"GPT Response {result}") class_object= parser.parse(result) dict_object=class_object.__dict__ if all(value is None for value in dict_object.values()): print(dict_object) print("Got null for dict object") # print("printing structured json") # print(dict_object) if dict_object['total_cost'] is not None: dict_object['total_cost'] = dict_object['total_cost'].split('.')[0].replace(',','') return dict_object except Exception as e: print(f"Error occurred: {e}") return {} def ensure_token_limit(text, model='gpt-3.5-turbo-instruct', max_tokens=4096): # Initialize the tokenizer for the specific model tokenizer = tiktoken.encoding_for_model(model) # Tokenize the text tokens = tokenizer.encode(text) # Check the token count if len(tokens) > max_tokens: # Truncate the text to the maximum token limit truncated_tokens = tokens[:max_tokens] truncated_text = tokenizer.decode(truncated_tokens) with open("token.txt","a") as file : file.write(truncated_text) print(truncated_text) return truncated_text else: return text def extract_json_from_string(input_string): # Define a regular expression pattern to match JSON pattern = r'\{.*?\}' # Use re.findall() to find all matches of JSON in the input string matches = re.findall(pattern, input_string) # If there are matches, extract the JSON and parse it if matches: json_data_list = [] for match in matches: json_data = json.loads(match) json_data_list.append(json_data) return json_data_list else: return None def extract_text_from_pdf(pdf_data): with io.BytesIO(pdf_data) as pdf_file: pdf_reader = PyPDF2.PdfReader(pdf_file) text = "" for page_num in range(len(pdf_reader.pages)): page = pdf_reader.pages[page_num] text += page.extract_text() return text def extract_text_from_docx(docx_data): doc = Document(io.BytesIO(docx_data)) text = "" for para in doc.paragraphs: text += para.text + "\n" return text def extract_text_from_attachment(filename, data): if filename.endswith('.pdf'): return extract_text_from_pdf(base64.urlsafe_b64decode(data)) elif filename.endswith('.docx'): return extract_text_from_docx(base64.urlsafe_b64decode(data)) else: # Add handling for other document types if needed return "Unsupported document type" def extract_text_from_attachment_outlook(filename , data): if filename.endswith('.pdf'): return extract_text_from_pdf(data) elif filename.endswith('.docx'): return extract_text_from_docx(data) else: return "Unsupported document type"