Spaces:
Running
Running
File size: 9,431 Bytes
ed28876 |
|
import gradio as gr
import logging
import sqlite3
from typing import List, Dict
import os
import zipfile
import tempfile
import shutil
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Database connection (you'll need to set this up)
db = None # Replace with your actual database connection
class DatabaseError(Exception):
pass
# Database functions
def fetch_items_by_keyword(search_query: str) -> List[Dict]:
try:
with db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT m.id, m.title, m.url
FROM Media m
JOIN MediaKeywords mk ON m.id = mk.media_id
JOIN Keywords k ON mk.keyword_id = k.id
WHERE k.keyword LIKE ?
""", (f'%{search_query}%',))
results = cursor.fetchall()
return [{"id": r[0], "title": r[1], "url": r[2]} for r in results]
except sqlite3.Error as e:
logger.error(f"Error fetching items by keyword: {e}")
raise DatabaseError(f"Error fetching items by keyword: {e}")
def fetch_item_details(media_id: int) -> tuple:
try:
with db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT prompt, summary
FROM MediaModifications
WHERE media_id = ?
ORDER BY modification_date DESC
LIMIT 1
""", (media_id,))
prompt_summary_result = cursor.fetchone()
cursor.execute("SELECT content FROM Media WHERE id = ?", (media_id,))
content_result = cursor.fetchone()
prompt = prompt_summary_result[0] if prompt_summary_result else ""
summary = prompt_summary_result[1] if prompt_summary_result else ""
content = content_result[0] if content_result else ""
return content, prompt, summary
except sqlite3.Error as e:
logger.error(f"Error fetching item details: {e}")
return "", "", ""
def browse_items(search_query: str, search_type: str) -> List[Dict]:
try:
with db.get_connection() as conn:
cursor = conn.cursor()
if search_type == 'Title':
cursor.execute("SELECT id, title, url FROM Media WHERE title LIKE ?", (f'%{search_query}%',))
elif search_type == 'URL':
cursor.execute("SELECT id, title, url FROM Media WHERE url LIKE ?", (f'%{search_query}%',))
elif search_type == 'Keyword':
return fetch_items_by_keyword(search_query)
elif search_type == 'Content':
cursor.execute("SELECT id, title, url FROM Media WHERE content LIKE ?", (f'%{search_query}%',))
else:
raise ValueError(f"Invalid search type: {search_type}")
results = cursor.fetchall()
return [{"id": r[0], "title": r[1], "url": r[2]} for r in results]
except sqlite3.Error as e:
logger.error(f"Error fetching items by {search_type}: {e}")
raise DatabaseError(f"Error fetching items by {search_type}: {e}")
# Export functions
def export_item_as_markdown(media_id: int) -> str:
try:
content, prompt, summary = fetch_item_details(media_id)
title = f"Item {media_id}" # You might want to fetch the actual title
markdown_content = f"# {title}\n\n## Prompt\n{prompt}\n\n## Summary\n{summary}\n\n## Content\n{content}"
filename = f"export_item_{media_id}.md"
with open(filename, "w", encoding='utf-8') as f:
f.write(markdown_content)
logger.info(f"Successfully exported item {media_id} to {filename}")
return filename
except Exception as e:
logger.error(f"Error exporting item {media_id}: {str(e)}")
return None
def export_items_by_keyword(keyword: str) -> str:
try:
items = fetch_items_by_keyword(keyword)
if not items:
logger.warning(f"No items found for keyword: {keyword}")
return None
# Create a temporary directory to store individual markdown files
with tempfile.TemporaryDirectory() as temp_dir:
folder_name = f"export_keyword_{keyword}"
export_folder = os.path.join(temp_dir, folder_name)
os.makedirs(export_folder)
for item in items:
content, prompt, summary = fetch_item_details(item['id'])
markdown_content = f"# {item['title']}\n\n## Prompt\n{prompt}\n\n## Summary\n{summary}\n\n## Content\n{content}"
# Create individual markdown file for each item
file_name = f"{item['id']}_{item['title'][:50]}.md" # Limit filename length
file_path = os.path.join(export_folder, file_name)
with open(file_path, "w", encoding='utf-8') as f:
f.write(markdown_content)
# Create a zip file containing all markdown files
zip_filename = f"{folder_name}.zip"
shutil.make_archive(os.path.join(temp_dir, folder_name), 'zip', export_folder)
# Move the zip file to a location accessible by Gradio
final_zip_path = os.path.join(os.getcwd(), zip_filename)
shutil.move(os.path.join(temp_dir, zip_filename), final_zip_path)
logger.info(f"Successfully exported {len(items)} items for keyword '{keyword}' to {zip_filename}")
return final_zip_path
except Exception as e:
logger.error(f"Error exporting items for keyword '{keyword}': {str(e)}")
return None
def export_selected_items(selected_items: List[Dict]) -> str:
try:
if not selected_items:
logger.warning("No items selected for export")
return None
markdown_content = "# Selected Items\n\n"
for item in selected_items:
content, prompt, summary = fetch_item_details(item['id'])
markdown_content += f"## {item['title']}\n\n### Prompt\n{prompt}\n\n### Summary\n{summary}\n\n### Content\n{content}\n\n---\n\n"
filename = "export_selected_items.md"
with open(filename, "w", encoding='utf-8') as f:
f.write(markdown_content)
logger.info(f"Successfully exported {len(selected_items)} selected items to {filename}")
return filename
except Exception as e:
logger.error(f"Error exporting selected items: {str(e)}")
return None
# Gradio interface functions
def display_search_results(search_query: str, search_type: str) -> List[Dict]:
try:
results = browse_items(search_query, search_type)
return [{"name": f"{item['title']} ({item['url']})", "value": item} for item in results]
except DatabaseError as e:
logger.error(f"Error in display_search_results: {str(e)}")
return []
# Gradio interface
with gr.Blocks() as demo:
gr.Markdown("# Content Export Interface")
with gr.Tab("Search and Export"):
search_query = gr.Textbox(label="Search Query")
search_type = gr.Radio(["Title", "URL", "Keyword", "Content"], label="Search By")
search_button = gr.Button("Search")
search_results = gr.CheckboxGroup(label="Search Results")
export_selected_button = gr.Button("Export Selected Items")
keyword_input = gr.Textbox(label="Enter keyword for export")
export_by_keyword_button = gr.Button("Export items by keyword")
export_output = gr.File(label="Download Exported File")
error_output = gr.Textbox(label="Status/Error Messages", interactive=False)
search_button.click(
fn=display_search_results,
inputs=[search_query, search_type],
outputs=[search_results, error_output]
)
export_selected_button.click(
fn=lambda selected: (export_selected_items(selected), "Exported selected items") if selected else (
None, "No items selected"),
inputs=[search_results],
outputs=[export_output, error_output]
)
export_by_keyword_button.click(
fn=lambda keyword: (
export_items_by_keyword(keyword), f"Exported items for keyword: {keyword}") if keyword else (
None, "No keyword provided"),
inputs=[keyword_input],
outputs=[export_output, error_output]
)
# Add functionality to export individual items
search_results.select(
fn=lambda item: (export_item_as_markdown(item['id']), f"Exported item: {item['title']}") if item else (
None, "No item selected"),
inputs=[gr.State(lambda: search_results.value)],
outputs=[export_output, error_output]
)
demo.launch()
# This modified version of export_items_by_keyword does the following:
#
# Creates a temporary directory to store individual markdown files.
# For each item associated with the keyword, it creates a separate markdown file.
# Places all markdown files in a folder named export_keyword_{keyword}.
# Creates a zip file containing the folder with all markdown files.
# Moves the zip file to a location accessible by Gradio for download. |