Spaces:
Runtime error
Runtime error
import speech_recognition as sr | |
from nltk.sentiment.vader import SentimentIntensityAnalyzer | |
import spacy, os | |
import pandas as pd | |
import numpy as np | |
import nltk | |
from nltk.tokenize import word_tokenize | |
from nltk.corpus import stopwords | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from autocorrect import Speller | |
from datetime import datetime | |
from transformers import pipeline | |
from translate import Translator | |
from nltk.stem import WordNetLemmatizer | |
from nltk.stem import PorterStemmer | |
from nltk.corpus import wordnet | |
from googletrans import Translator | |
import pickle | |
class recommendationModel: | |
def __init__(self): | |
self.translator = Translator() | |
self.zero_shot_classifier = pipeline('zero-shot-classification', model="MoritzLaurer/mDeBERTa-v3-base-mnli-xnli") | |
self.spell_checker = Speller(lang='en') | |
self.porter = PorterStemmer() | |
self.lemmatizer = WordNetLemmatizer() | |
self.nlp = spacy.load("en_core_web_sm") | |
# self.spell_checker = Speller(lang='en') | |
self.class_names = ["positive :)", "neutral :|", "negative :("] | |
self.data1 = None | |
def detect_language(self,user_input): | |
det = self.translator.detect(user_input) | |
if det.lang!='en': | |
trans = self.translator.translate(user_input,'en') | |
print("\nTranslation:",trans.text) | |
return trans.text | |
else: | |
return user_input | |
def remove_stopwords(self,tags): | |
words = word_tokenize(tags) | |
stop_words = set(stopwords.words('english')) | |
filtered_words = [word for word in words if word not in stop_words] | |
filtered_text = " ".join(filtered_words) | |
return filtered_text | |
def correct_spelling(self,word): | |
return self.spell_checker(word) | |
def porterStemmer(self,text): | |
words = word_tokenize(text) | |
stemmed_words = [self.porter.stem(word) for word in words] | |
stemmed_sentence = ' '.join(stemmed_words) | |
return stemmed_sentence | |
def correct_spellings_in_text(self,text): | |
words = nltk.word_tokenize(text) | |
corrected_words = [self.correct_spelling(word) for word in words] | |
corrected_text = " ".join(corrected_words) | |
return corrected_text | |
def preprocess_input(self,userInput): | |
corrected_text = self.correct_spellings_in_text(userInput) | |
words = nltk.word_tokenize(corrected_text.lower()) | |
sentence = " ".join(words) | |
sentence = self.remove_stopwords(sentence) | |
# sentence = porterStemmer(sentence) | |
keywords = nltk.word_tokenize(sentence.lower()) | |
return keywords, sentence | |
def calculate_score(self,about, keywords): | |
score = 0 | |
for keyword in keywords: | |
if keyword in about.lower(): | |
score += 1 | |
return score | |
def zero_shot_classifier_sent(self,userInput): | |
zsc_output = self.zero_shot_classifier(userInput, self.class_names) | |
zsc_labels = zsc_output['labels'] | |
zsc_scores = zsc_output['scores'] | |
return zsc_labels, zsc_scores | |
def recommendArticle(self,userInput,tfidf_scores,output_csv): | |
zsc_labels, zsc_scores = self.zero_shot_classifier_sent(userInput) | |
label_score_pairs = zip(zsc_labels, zsc_scores) | |
max_label, max_score = max(label_score_pairs, key=lambda pair: pair[1]) | |
userInput = self.detect_language(userInput) #change to english | |
keywords, sentence = self.preprocess_input(userInput) | |
self.data1['score'] = self.data1['description'].apply(lambda x: self.calculate_score(x, keywords)) | |
# Sort articles based on score | |
recommended_articles = self.data1.sort_values(by='score', ascending=False) | |
print("\n*****************\nRecommended Articles:") | |
for index, row in recommended_articles.head(10).iterrows(): | |
print(f"\nTitle: {row['title']}") | |
print(f"Keywords: {row['keywords']}") | |
print(f"Class: {row['class']}") | |
print(f"URL: {row['url']}") | |
# Prepare data to append to CSV | |
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
output_data = { | |
'Timestamp': timestamp, | |
'User Input': userInput, | |
'Emotion': max_label, | |
'Sentiment Score': max_score, | |
'Keywords': ", ".join(keywords)} | |
# Append output data to CSV | |
output_df = pd.DataFrame(output_data, index=[0]) | |
output_df.to_csv(output_csv, mode='a', header=not os.path.exists(output_csv), index=False) | |
def convert_audio_to_text(self,recognizer, source, duration): | |
print("Listening for audio...") | |
audio_data = recognizer.listen(source, timeout=duration, phrase_time_limit=duration) | |
try: | |
text = recognizer.recognize_google(audio_data) | |
return text | |
except sr.WaitTimeoutError: | |
print("Listening timed out. No speech detected.") | |
return "" | |
except sr.UnknownValueError: | |
print("Oops, it seems we're having trouble understanding the audio. Let's try again with clearer sound.") | |
return "" | |
except sr.RequestError as e: | |
print(f"Could not request results; {e}") | |
return "" | |
def extract_keywords_tfidf(self,article_descriptions): | |
tfidf_vectorizer = TfidfVectorizer(stop_words='english') | |
tfidf_matrix = tfidf_vectorizer.fit_transform(article_descriptions) | |
feature_names = tfidf_vectorizer.get_feature_names_out() | |
article_tfidf_scores = tfidf_matrix[0].toarray().flatten() | |
keyword_scores = dict(zip(feature_names, article_tfidf_scores)) | |
return keyword_scores | |
def main(self,inputs): | |
output_csv = "Output2.csv" # Specify the output CSV file | |
print("Choose input method:\n1. Text\n2. Voice\n3. Audio File") | |
while True: | |
choice = input("\nEnter your choice (1 or 2 or 3): ") | |
if choice == '1': | |
user_input1 = input("Enter your message: ") | |
user_input1 = self.detect_language(user_input1) | |
inputs.append(user_input1) | |
user_input = ' '.join(inputs) | |
print(user_input) | |
print("\nProcessing....") | |
tfidf_scores = self.extract_keywords_tfidf(self.data1['description']) | |
self.recommendArticle(user_input, tfidf_scores, output_csv) | |
break | |
elif choice == '2': | |
recognizer = sr.Recognizer() | |
with sr.Microphone() as source: | |
recognizer.adjust_for_ambient_noise(source) # Adjust for ambient noise | |
text1 = self.convert_audio_to_text(recognizer, source, 15) | |
if text1: | |
text = self.detect_language(text1) | |
inputs.append(text1) | |
text = ' '.join(inputs) | |
print(text) | |
print("\nProcessing....") | |
tfidf_scores = self.extract_keywords_tfidf(self.data1['description']) | |
self.recommendArticle(text, tfidf_scores, output_csv) | |
break | |
else: | |
print("Oops, it seems we're having trouble understanding the audio. Let's try again with clearer sound.") | |
elif choice == '3': | |
filename = input("Enter the path to the audio file: ") | |
recognizer = sr.Recognizer() | |
with sr.AudioFile(filename) as source: | |
recognizer.adjust_for_ambient_noise(source) # Adjust for ambient noise | |
text1 = self.convert_audio_to_text(recognizer, source, 1000) | |
if text1: | |
text = self.detect_language(text1) | |
inputs.append(text1) | |
text = ' '.join(inputs) | |
print(text) | |
print("\nProcessing....") | |
tfidf_scores = self.extract_keywords_tfidf(self.data1['description']) | |
self.recommendArticle(text, tfidf_scores, output_csv) | |
break | |
else: | |
print("Oops, it seems we're having trouble finding the file. Let's try again with the correct path.") | |
else: | |
print("Invalid choice. Please enter 1 or 2 or 3.") | |
#PROPER PICKLING AND UNPICKLING ATTRIBUTES | |
def __getstate__(self): | |
# Exclude specific attributes from being pickled | |
excluded_attrs = ['translator', 'zero_shot_classifier', 'nlp'] # Add other attributes here if needed | |
state = self.__dict__.copy() | |
for attr in excluded_attrs: | |
if attr in state: | |
del state[attr] | |
return state | |
def __setstate__(self, state): | |
# Restore the state and recreate excluded attributes | |
self.__dict__.update(state) | |
self.translator = Translator() # Recreate translator | |
self.zero_shot_classifier = pipeline('zero-shot-classification', model="MoritzLaurer/mDeBERTa-v3-base-mnli-xnli") # Recreate zero_shot_classifier | |
self.nlp = spacy.load("en_core_web_sm") # Recreate nlp | |
# Recreate other excluded attributes here if needed | |
model = recommendationModel() | |
with open('model2.pkl', 'wb') as f: | |
pickle.dump(model, f) |