Spaces:
Running
Running
# Article_Summarization_Lib.py | |
######################################### | |
# Article Summarization Library | |
# This library is used to handle summarization of articles. | |
# | |
#### | |
# | |
#################### | |
# Function List | |
# | |
# 1. | |
# | |
#################### | |
# | |
# Import necessary libraries | |
import datetime | |
from datetime import datetime | |
import gradio as gr | |
import json | |
import os | |
import logging | |
import requests | |
# 3rd-Party Imports | |
from tqdm import tqdm | |
from App_Function_Libraries.Utils import sanitize_filename | |
# Local Imports | |
from Article_Extractor_Lib import scrape_article | |
from Local_Summarization_Lib import summarize_with_llama, summarize_with_oobabooga, summarize_with_tabbyapi, \ | |
summarize_with_vllm, summarize_with_kobold, save_summary_to_file, summarize_with_local_llm | |
from Summarization_General_Lib import summarize_with_openai, summarize_with_anthropic, summarize_with_cohere, summarize_with_groq, summarize_with_openrouter, summarize_with_deepseek, summarize_with_huggingface | |
from SQLite_DB import Database, create_tables, add_media_with_keywords | |
# | |
####################################################################################################################### | |
# Function Definitions | |
# | |
def ingest_article_to_db(url, title, author, content, keywords, summary, ingestion_date, custom_prompt): | |
try: | |
# Check if content is not empty or whitespace | |
if not content.strip(): | |
raise ValueError("Content is empty.") | |
db = Database() | |
create_tables() | |
keyword_list = keywords.split(',') if keywords else ["default"] | |
keyword_str = ', '.join(keyword_list) | |
# Set default values for missing fields | |
url = url or 'Unknown' | |
title = title or 'Unknown' | |
author = author or 'Unknown' | |
keywords = keywords or 'default' | |
summary = summary or 'No summary available' | |
ingestion_date = ingestion_date or datetime.datetime.now().strftime('%Y-%m-%d') | |
# Log the values of all fields before calling add_media_with_keywords | |
logging.debug(f"URL: {url}") | |
logging.debug(f"Title: {title}") | |
logging.debug(f"Author: {author}") | |
logging.debug(f"Content: {content[:50]}... (length: {len(content)})") # Log first 50 characters of content | |
logging.debug(f"Keywords: {keywords}") | |
logging.debug(f"Summary: {summary}") | |
logging.debug(f"Ingestion Date: {ingestion_date}") | |
logging.debug(f"Custom Prompt: {custom_prompt}") | |
# Check if any required field is empty and log the specific missing field | |
if not url: | |
logging.error("URL is missing.") | |
raise ValueError("URL is missing.") | |
if not title: | |
logging.error("Title is missing.") | |
raise ValueError("Title is missing.") | |
if not content: | |
logging.error("Content is missing.") | |
raise ValueError("Content is missing.") | |
if not keywords: | |
logging.error("Keywords are missing.") | |
raise ValueError("Keywords are missing.") | |
if not summary: | |
logging.error("Summary is missing.") | |
raise ValueError("Summary is missing.") | |
if not ingestion_date: | |
logging.error("Ingestion date is missing.") | |
raise ValueError("Ingestion date is missing.") | |
if not custom_prompt: | |
logging.error("Custom prompt is missing.") | |
raise ValueError("Custom prompt is missing.") | |
# Add media with keywords to the database | |
result = add_media_with_keywords( | |
url=url, | |
title=title, | |
media_type='article', | |
content=content, | |
keywords=keyword_str or "article_default", | |
prompt=custom_prompt or None, | |
summary=summary or "No summary generated", | |
transcription_model=None, # or some default value if applicable | |
author=author or 'Unknown', | |
ingestion_date=ingestion_date | |
) | |
return result | |
except Exception as e: | |
logging.error(f"Failed to ingest article to the database: {e}") | |
return str(e) | |
def scrape_and_summarize_multiple(urls, custom_prompt_arg, api_name, api_key, keywords, custom_article_titles): | |
urls = [url.strip() for url in urls.split('\n') if url.strip()] | |
custom_titles = custom_article_titles.split('\n') if custom_article_titles else [] | |
results = [] | |
errors = [] | |
# Create a progress bar | |
progress = gr.Progress() | |
for i, url in tqdm(enumerate(urls), total=len(urls), desc="Processing URLs"): | |
custom_title = custom_titles[i] if i < len(custom_titles) else None | |
try: | |
result = scrape_and_summarize(url, custom_prompt_arg, api_name, api_key, keywords, custom_title) | |
results.append(f"Results for URL {i + 1}:\n{result}") | |
except Exception as e: | |
error_message = f"Error processing URL {i + 1} ({url}): {str(e)}" | |
errors.append(error_message) | |
results.append(f"Failed to process URL {i + 1}: {url}") | |
# Update progress | |
progress((i + 1) / len(urls), desc=f"Processed {i + 1}/{len(urls)} URLs") | |
# Combine results and errors | |
combined_output = "\n".join(results) | |
if errors: | |
combined_output += "\n\nErrors encountered:\n" + "\n".join(errors) | |
return combined_output | |
def scrape_and_summarize(url, custom_prompt_arg, api_name, api_key, keywords, custom_article_title): | |
try: | |
# Step 1: Scrape the article | |
article_data = scrape_article(url) | |
print(f"Scraped Article Data: {article_data}") # Debugging statement | |
if not article_data: | |
return "Failed to scrape the article." | |
# Use the custom title if provided, otherwise use the scraped title | |
title = custom_article_title.strip() if custom_article_title else article_data.get('title', 'Untitled') | |
author = article_data.get('author', 'Unknown') | |
content = article_data.get('content', '') | |
ingestion_date = datetime.now().strftime('%Y-%m-%d') | |
print(f"Title: {title}, Author: {author}, Content Length: {len(content)}") # Debugging statement | |
# Custom prompt for the article | |
article_custom_prompt = custom_prompt_arg or "Summarize this article." | |
# Step 2: Summarize the article | |
summary = None | |
if api_name: | |
logging.debug(f"Article_Summarizer: Summarization being performed by {api_name}") | |
# Sanitize filename for saving the JSON file | |
sanitized_title = sanitize_filename(title) | |
json_file_path = os.path.join("Results", f"{sanitized_title}_segments.json") | |
with open(json_file_path, 'w') as json_file: | |
json.dump([{'text': content}], json_file, indent=2) | |
try: | |
if api_name.lower() == 'openai': | |
# def summarize_with_openai(api_key, input_data, custom_prompt_arg) | |
summary = summarize_with_openai(api_key, json_file_path, article_custom_prompt) | |
elif api_name.lower() == "anthropic": | |
# def summarize_with_anthropic(api_key, input_data, model, custom_prompt_arg, max_retries=3, retry_delay=5): | |
summary = summarize_with_anthropic(api_key, json_file_path, article_custom_prompt) | |
elif api_name.lower() == "cohere": | |
# def summarize_with_cohere(api_key, input_data, model, custom_prompt_arg) | |
summary = summarize_with_cohere(api_key, json_file_path, article_custom_prompt) | |
elif api_name.lower() == "groq": | |
logging.debug(f"MAIN: Trying to summarize with groq") | |
# def summarize_with_groq(api_key, input_data, model, custom_prompt_arg): | |
summary = summarize_with_groq(api_key, json_file_path, article_custom_prompt) | |
elif api_name.lower() == "openrouter": | |
logging.debug(f"MAIN: Trying to summarize with OpenRouter") | |
# def summarize_with_openrouter(api_key, input_data, custom_prompt_arg): | |
summary = summarize_with_openrouter(api_key, json_file_path, article_custom_prompt) | |
elif api_name.lower() == "deepseek": | |
logging.debug(f"MAIN: Trying to summarize with DeepSeek") | |
# def summarize_with_deepseek(api_key, input_data, custom_prompt_arg): | |
summary = summarize_with_deepseek(api_key, json_file_path, article_custom_prompt) | |
elif api_name.lower() == "llama.cpp": | |
logging.debug(f"MAIN: Trying to summarize with Llama.cpp") | |
# def summarize_with_llama(api_url, file_path, token, custom_prompt) | |
summary = summarize_with_llama(json_file_path, article_custom_prompt) | |
elif api_name.lower() == "kobold": | |
logging.debug(f"MAIN: Trying to summarize with Kobold.cpp") | |
# def summarize_with_kobold(input_data, kobold_api_token, custom_prompt_input, api_url): | |
summary = summarize_with_kobold(json_file_path, api_key, article_custom_prompt) | |
elif api_name.lower() == "ooba": | |
# def summarize_with_oobabooga(input_data, api_key, custom_prompt, api_url): | |
summary = summarize_with_oobabooga(json_file_path, api_key, article_custom_prompt) | |
elif api_name.lower() == "tabbyapi": | |
# def summarize_with_tabbyapi(input_data, tabby_model, custom_prompt_input, api_key=None, api_IP): | |
summary = summarize_with_tabbyapi(json_file_path, article_custom_prompt) | |
elif api_name.lower() == "vllm": | |
logging.debug(f"MAIN: Trying to summarize with VLLM") | |
# def summarize_with_vllm(api_key, input_data, custom_prompt_input): | |
summary = summarize_with_vllm(json_file_path, article_custom_prompt) | |
elif api_name.lower() == "local-llm": | |
logging.debug(f"MAIN: Trying to summarize with Local LLM") | |
summary = summarize_with_local_llm(json_file_path, article_custom_prompt) | |
elif api_name.lower() == "huggingface": | |
logging.debug(f"MAIN: Trying to summarize with huggingface") | |
# def summarize_with_huggingface(api_key, input_data, custom_prompt_arg): | |
summarize_with_huggingface(api_key, json_file_path, article_custom_prompt) | |
# Add additional API handlers here... | |
except requests.exceptions.ConnectionError as e: | |
logging.error(f"Connection error while trying to summarize with {api_name}: {str(e)}") | |
if summary: | |
logging.info(f"Article_Summarizer: Summary generated using {api_name} API") | |
save_summary_to_file(summary, json_file_path) | |
else: | |
summary = "Summary not available" | |
logging.warning(f"Failed to generate summary using {api_name} API") | |
else: | |
summary = "Article Summarization: No API provided for summarization." | |
print(f"Summary: {summary}") # Debugging statement | |
# Step 3: Ingest the article into the database | |
ingestion_result = ingest_article_to_db(url, title, author, content, keywords, summary, ingestion_date, | |
article_custom_prompt) | |
return f"Title: {title}\nAuthor: {author}\nIngestion Result: {ingestion_result}\n\nSummary: {summary}\n\nArticle Contents: {content}" | |
except Exception as e: | |
logging.error(f"Error processing URL {url}: {str(e)}") | |
return f"Failed to process URL {url}: {str(e)}" | |
def ingest_unstructured_text(text, custom_prompt, api_name, api_key, keywords, custom_article_title): | |
title = custom_article_title.strip() if custom_article_title else "Unstructured Text" | |
author = "Unknown" | |
ingestion_date = datetime.now().strftime('%Y-%m-%d') | |
# Summarize the unstructured text | |
if api_name: | |
json_file_path = f"Results/{title.replace(' ', '_')}_segments.json" | |
with open(json_file_path, 'w') as json_file: | |
json.dump([{'text': text}], json_file, indent=2) | |
if api_name.lower() == 'openai': | |
summary = summarize_with_openai(api_key, json_file_path, custom_prompt) | |
# Add other APIs as needed | |
else: | |
summary = "Unsupported API." | |
else: | |
summary = "No API provided for summarization." | |
# Ingest the unstructured text into the database | |
ingestion_result = ingest_article_to_db('Unstructured Text', title, author, text, keywords, summary, ingestion_date, | |
custom_prompt) | |
return f"Title: {title}\nSummary: {summary}\nIngestion Result: {ingestion_result}" | |
# | |
# | |
####################################################################################################################### |