Spaces:
Sleeping
Sleeping
File size: 8,182 Bytes
d1a66a2 7fa1436 b53150e 56eb0c5 2f625a1 864e803 661b4df 393a668 5a09d05 d1a66a2 b53150e 864e803 29c2deb b53150e 29c2deb eef32e5 29c2deb ac097f7 5495ec5 29c2deb 864e803 b53150e 5a09d05 b53150e 5df9c1c b53150e b45ab43 b53150e 5df9c1c daad07f 52267ff daad07f 52267ff 57eb2e6 661b4df b53150e 1f4fb93 1524208 b53150e 1f4fb93 b53150e 5a09d05 b53150e 5a09d05 ad282b7 5a09d05 99b959a b53150e 83d8c1d b009d8f 83d8c1d 3c001d4 142156e b53150e 661b4df 5df9c1c 661b4df 5df9c1c 661b4df b53150e 56eb0c5 b53150e d1a66a2 b53150e d1a66a2 b53150e d1a66a2 b53150e d1a66a2 b53150e d1a66a2 b53150e 5769d80 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
import PyPDF2
from docx import Document
import io
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter
from typing_extensions import Concatenate
from typing import List
# from langchain_community.llms import OpenAI
from langchain_community.callbacks import get_openai_callback
from langchain.output_parsers import PydanticOutputParser
from langchain.prompts import PromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field, validator
import os
import logging
import base64
from langchain_openai import OpenAI
import re
import json
from typing import Optional
import tiktoken
import time
from tenacity import (
retry,
stop_after_attempt,
wait_random_exponential,
)
#Setting the openai api key
api_key=os.getenv('OPENAI_API_KEY')
class Candidate(BaseModel):
brand: Optional[str] = Field(default=None, description="INSERT BRAND NAME FROM THE RECEIPT OCR TEXT. IF NOT PRESENT RETURN null")
total_cost: Optional[str] = Field(default=None, description="INSERT TOTAL COST FROM THE RECEIPT OCR TEXT. TOTAL AMOUNT IS MAXIMUM VALUE IN THE OCR TEXT. IF NOT PRESENT RETURN null")
location: Optional[str] = Field(default=None, description="INSERT LOCATION FROM THE RECEIPT OCR TEXT. IF NOT PRESENT RETURN null")
purchase_category: Optional[str] = Field(default=None, description="INSERT PURCHASE CATEGORY FROM THE RECEIPT OCR TEXT. IF NOT PRESENT RETURN null")
brand_category: Optional[str] = Field(default=None, description="""INSERT BRAND CATEGORY FROM THE RECEIPT OCR TEXT. CHOOSE CLOSEST BRAND CATEGORY BASED ON THE OCR FROM THIS ARRAY ["Fashion and Apparel","Jewelry and Watches","Beauty and Personal Care","Automobiles","Real Estate","Travel and Leisure","Culinary Services","Home and Lifestyle","Technology and Electronics","Sports and Leisure","Art and Collectibles","Health and Wellness","Stationery and Writing Instruments","Children and Baby","Pet Accessories","Financial Services","Airline Services","Accommodation Services","Beverages Services","Services"] ELSE IF NOT PRESENT RETURN null""")
Date: Optional[str] = Field(default=None, description="INSERT RECEIPT DATE FROM THE RECEIPT OCR TEXT. IF NOT PRESENT RETURN null. FORMAT: dd-mm-yyyy")
currency: Optional[str] = Field(default=None, description="INSERT CURRENCY FROM THE RECEIPT OCR TEXT. LOOK FOR CURRENCY SYMBOLS (e.g., $, €, £, ¥) OR CURRENCY CODES (e.g., USD, EUR, GBP, JPY).ALWAYS RETURN CURRENCY CODE.IF NOT FOUND RETURN null.")
filename: Optional[str] = Field(default=None, description="GENERATE A FILENAME BASED ON THE RECEIPT OCR TEXT. USE THE FORMAT: 'PURCHASE_TYPE_BRAND_DATE' (e.g., 'clothing_gucci_20230715'). USE UNDERSCORES FOR SPACES.IF YOU CANNOT FIND THE COMPONENTS RETURN THIS FIELD AS NULL.")
payment_method: Optional[str] = Field(default=None, description="INSERT PAYMENT METHOD FROM THE RECEIPT OCR TEXT. LOOK FOR KEYWORDS LIKE 'CASH', 'CARD', 'CREDIT', 'DEBIT', 'VISA', 'MASTERCARD', 'AMEX', 'PAYPAL', ETC. IF NOT FOUND RETURN null.")
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
def openai_response(model:OpenAI,input:str):
result = model.invoke(input)
return result
def strcuture_document_data(raw_text:str)->dict:
raw_text = ensure_token_limit(raw_text)
try:
model_name = "gpt-3.5-turbo-instruct"
temperature = 0.0
model = OpenAI(model_name=model_name, temperature=temperature, max_tokens=256)
# doc_query = (
# "Extract and return strictly a JSON object containing only the following keys strictly : brand , total_cost , location , no_of_items , purchase_category,brand_category , Date ."
# "\nReceipt Data:\n" + raw_text + "\nRemember the response should only be in JSON format very Strictly and it should have these keys brand , total_cost(LOOK FOR THE HIGHEST VALUE IN RECEIPT OCR TEXT) , location , no_of_items , purchase_category,brand_category , Date , very Strictly.\n"
# )
doc_query= (
"Extract and return strictly a JSON object containing only the following keys: brand, total_cost, location, purchase_category, brand_category, Date , currency ,filename,payment_method .FOR total_cost LOOK FOR THE HIGHEST VALUE IN RECEIPT OCR TEXT. Ensure that if a value is not present in the OCR text, it is returned as null."
)
parser = PydanticOutputParser(pydantic_object=Candidate)
prompt = PromptTemplate(
template="""Your primary goal is to take my receipt OCR text and then return back a parsable json.
Below is the receipt OCR:.\n {raw_text} \n These are the format instructions telling you to convert the data into json :\n {format_instructions}\nDo not include descriptions or explanations from the Candidate class in the JSON output. The response must be a valid JSON object.\n Follow the below instrcution very strictly:\n {query} \n""",
input_variables=["query"],
partial_variables={"format_instructions": parser.get_format_instructions(),"raw_text":raw_text},
)
input = prompt.format_prompt(query=doc_query)
result = openai_response(model,input.to_string())
print(f"GPT Response {result}")
class_object= parser.parse(result)
dict_object=class_object.__dict__
if all(value is None for value in dict_object.values()):
print(dict_object)
print("Got null for dict object")
# print("printing structured json")
# print(dict_object)
if dict_object['total_cost'] is not None:
dict_object['total_cost'] = dict_object['total_cost'].split('.')[0].replace(',','')
return dict_object
except Exception as e:
print(f"Error occurred: {e}")
return {}
def ensure_token_limit(text, model='gpt-3.5-turbo-instruct', max_tokens=4096):
# Initialize the tokenizer for the specific model
tokenizer = tiktoken.encoding_for_model(model)
# Tokenize the text
tokens = tokenizer.encode(text)
# Check the token count
if len(tokens) > max_tokens:
# Truncate the text to the maximum token limit
truncated_tokens = tokens[:max_tokens]
truncated_text = tokenizer.decode(truncated_tokens)
with open("token.txt","a") as file :
file.write(truncated_text)
print(truncated_text)
return truncated_text
else:
return text
def extract_json_from_string(input_string):
# Define a regular expression pattern to match JSON
pattern = r'\{.*?\}'
# Use re.findall() to find all matches of JSON in the input string
matches = re.findall(pattern, input_string)
# If there are matches, extract the JSON and parse it
if matches:
json_data_list = []
for match in matches:
json_data = json.loads(match)
json_data_list.append(json_data)
return json_data_list
else:
return None
def extract_text_from_pdf(pdf_data):
with io.BytesIO(pdf_data) as pdf_file:
pdf_reader = PyPDF2.PdfReader(pdf_file)
text = ""
for page_num in range(len(pdf_reader.pages)):
page = pdf_reader.pages[page_num]
text += page.extract_text()
return text
def extract_text_from_docx(docx_data):
doc = Document(io.BytesIO(docx_data))
text = ""
for para in doc.paragraphs:
text += para.text + "\n"
return text
def extract_text_from_attachment(filename, data):
if filename.endswith('.pdf'):
return extract_text_from_pdf(base64.urlsafe_b64decode(data))
elif filename.endswith('.docx'):
return extract_text_from_docx(base64.urlsafe_b64decode(data))
else:
# Add handling for other document types if needed
return "Unsupported document type"
def extract_text_from_attachment_outlook(filename , data):
if filename.endswith('.pdf'):
return extract_text_from_pdf(data)
elif filename.endswith('.docx'):
return extract_text_from_docx(data)
else:
return "Unsupported document type"
|