New Version started with Claude 3.5 Sonnet, finishing with o1 ChatGPT on full source code listing every time.

#3
by awacke1 - opened
Owner

o1-preview is doing amazingly well once you enter multi-shot or have more than one record history:

import streamlit as st
from azure.cosmos import CosmosClient, PartitionKey, exceptions
import os
import pandas as pd
import traceback
import requests
import shutil
import zipfile
from github import Github
from git import Repo
from datetime import datetime
import base64
import json

πŸŽ‰ Welcome to our fun-filled Cosmos DB and GitHub Integration app!

st.set_page_config(layout="wide")

🌌 Cosmos DB configuration

ENDPOINT = "https://acae-afd.documents.azure.com:443/"
SUBSCRIPTION_ID = "003fba60-5b3f-48f4-ab36-3ed11bc40816"
DATABASE_NAME = os.environ.get("COSMOS_DATABASE_NAME")
CONTAINER_NAME = os.environ.get("COSMOS_CONTAINER_NAME")
Key = os.environ.get("Key") # πŸ”‘ Don't forget your key!

πŸ™ GitHub configuration

def download_github_repo(url, local_path):
# 🚚 Let's download that GitHub repo!
if os.path.exists(local_path):
shutil.rmtree(local_path)
Repo.clone_from(url, local_path)

def create_zip_file(source_dir, output_filename):
# πŸ“¦ Zipping up files like a pro!
shutil.make_archive(output_filename, 'zip', source_dir)

def create_repo(g, repo_name):
# πŸ› οΈ Creating a new GitHub repo. Magic!
user = g.get_user()
return user.create_repo(repo_name)

def push_to_github(local_path, repo, github_token):
# πŸš€ Pushing code to GitHub. Hold on tight!
repo_url = f"https://{github_token}@github.com/{repo.full_name}.git"
local_repo = Repo(local_path)

if 'origin' in [remote.name for remote in local_repo.remotes]:
    origin = local_repo.remote('origin')
    origin.set_url(repo_url)
else:
    origin = local_repo.create_remote('origin', repo_url)

if not local_repo.heads:
    local_repo.git.checkout('-b', 'main')
    current_branch = 'main'
else:
    current_branch = local_repo.active_branch.name

local_repo.git.add(A=True)

if local_repo.is_dirty():
    local_repo.git.commit('-m', 'Initial commit')

origin.push(refspec=f'{current_branch}:{current_branch}')

def get_base64_download_link(file_path, file_name):
# πŸ§™β€β™‚οΈ Generating a magical download link!
with open(file_path, "rb") as file:
contents = file.read()
base64_encoded = base64.b64encode(contents).decode()
return f'⬇️ Download {file_name}'

🧭 New functions for dynamic sidebar navigation

def get_databases(client):
# πŸ“š Fetching list of databases. So many options!
return [db['id'] for db in client.list_databases()]

def get_containers(database):
# πŸ“‚ Getting containers. Containers within containers!
return [container['id'] for container in database.list_containers()]

def get_documents(container, limit=1000):
# πŸ“ Retrieving documents. Shhh, don't tell anyone!
query = "SELECT * FROM c"
items = list(container.query_items(query=query, enable_cross_partition_query=True, max_item_count=limit))
return items

🌟 Cosmos DB functions

def insert_record(container, record):
try:
container.create_item(body=record)
return True, "Record inserted successfully! πŸŽ‰"
except exceptions.CosmosHttpResponseError as e:
return False, f"HTTP error occurred: {str(e)}. Status code: {e.status_code} 🚨"
except Exception as e:
return False, f"An unexpected error occurred: {str(e)} 😱"

def update_record(container, updated_record):
try:
container.upsert_item(body=updated_record)
return True, f"Record with id {updated_record['id']} successfully updated. πŸ› οΈ"
except exceptions.CosmosHttpResponseError as e:
return False, f"HTTP error occurred: {str(e)}. Status code: {e.status_code} 🚨"
except Exception as e:
return False, f"An unexpected error occurred: {traceback.format_exc()} 😱"

def delete_record(container, name, id):
try:
container.delete_item(item=id, partition_key=id)
return True, f"Successfully deleted record with name: {name} and id: {id} πŸ—‘οΈ"
except exceptions.CosmosResourceNotFoundError:
return False, f"Record with id {id} not found. It may have been already deleted. πŸ•΅οΈβ€β™‚οΈ"
except exceptions.CosmosHttpResponseError as e:
return False, f"HTTP error occurred: {str(e)}. Status code: {e.status_code} 🚨"
except Exception as e:
return False, f"An unexpected error occurred: {traceback.format_exc()} 😱"

πŸ“¦ Function to archive current container

def archive_current_container(database_name, container_name, client):
try:
base_dir = "./cosmos_archive_current_container"
if os.path.exists(base_dir):
shutil.rmtree(base_dir)
os.makedirs(base_dir)

    db_client = client.get_database_client(database_name)
    container_client = db_client.get_container_client(container_name)
    items = list(container_client.read_all_items())
    
    container_dir = os.path.join(base_dir, container_name)
    os.makedirs(container_dir)
    
    for item in items:
        item_id = item.get('id', f"unknown_{datetime.now().strftime('%Y%m%d%H%M%S')}")
        with open(os.path.join(container_dir, f"{item_id}.json"), 'w') as f:
            json.dump(item, f, indent=2)
    
    archive_name = f"{container_name}_archive_{datetime.now().strftime('%Y%m%d%H%M%S')}"
    shutil.make_archive(archive_name, 'zip', base_dir)
    
    return get_base64_download_link(f"{archive_name}.zip", f"{archive_name}.zip")
except Exception as e:
    return f"An error occurred while archiving data: {str(e)} 😒"

🎈 Let's modify the main app to be more fun!

def main():
st.title("🌟 Cosmos DB and GitHub Integration")

# 🚦 Initialize session state
if 'logged_in' not in st.session_state:
    st.session_state.logged_in = False
if 'selected_records' not in st.session_state:
    st.session_state.selected_records = []
if 'client' not in st.session_state:
    st.session_state.client = None
if 'selected_database' not in st.session_state:
    st.session_state.selected_database = None
if 'selected_container' not in st.session_state:
    st.session_state.selected_container = None

# πŸ” Login section
if not st.session_state.logged_in:
    st.subheader("πŸ” Login")
    # Let's be sneaky and use the Key from environment variables! πŸ•΅οΈβ€β™‚οΈ
    input_key = Key

    # πŸš€ Time to blast off!
    if st.button("πŸš€ Login"):
        if input_key:
            st.session_state.primary_key = input_key
            st.session_state.logged_in = True
            st.experimental_rerun()
        else:
            st.error("Invalid key. Please check your input. πŸ”‘βŒ")
else:
    # 🌌 Initialize Cosmos DB client
    try:
        if st.session_state.client is None:
            st.session_state.client = CosmosClient(ENDPOINT, credential=st.session_state.primary_key)
        
        # πŸ—„οΈ Sidebar for database, container, and document selection
        st.sidebar.title("πŸ—„οΈ Cosmos DB Navigator")
        
        databases = get_databases(st.session_state.client)
        selected_db = st.sidebar.selectbox("πŸ—ƒοΈ Select Database", databases)
        
        if selected_db != st.session_state.selected_database:
            st.session_state.selected_database = selected_db
            st.session_state.selected_container = None
            st.experimental_rerun()
        
        if st.session_state.selected_database:
            database = st.session_state.client.get_database_client(st.session_state.selected_database)
            containers = get_containers(database)
            selected_container = st.sidebar.selectbox("πŸ“ Select Container", containers)
            
            if selected_container != st.session_state.selected_container:
                st.session_state.selected_container = selected_container
                st.experimental_rerun()
            
            if st.session_state.selected_container:
                container = database.get_container_client(st.session_state.selected_container)
                
                # πŸ“¦ Add Export button
                if st.button("πŸ“¦ Export Container Data"):
                    download_link = archive_current_container(st.session_state.selected_database, st.session_state.selected_container, st.session_state.client)
                    if download_link.startswith('<a'):
                        st.markdown(download_link, unsafe_allow_html=True)
                    else:
                        st.error(download_link)
                
                limit_to_1000 = st.sidebar.checkbox("πŸ”’ Limit to top 1000 documents", value=True)
                documents = get_documents(container, limit=1000 if limit_to_1000 else None)
                
                if documents:
                    document_ids = [doc.get('id', 'Unknown') for doc in documents]
                    selected_document = st.sidebar.selectbox("πŸ“„ Select Document", document_ids)
                    
                    if selected_document:
                        st.subheader(f"πŸ“„ Document Details: {selected_document}")
                        selected_doc = next((doc for doc in documents if doc.get('id') == selected_document), None)
                        if selected_doc:
                            # 🎨 Add Viewer/Editor selection
                            view_options = ['Show as Markdown', 'Show as Code Editor', 'Show as Edit and Save', 'Clone Document', 'New Record']
                            selected_view = st.selectbox("Select Viewer/Editor", view_options)

                            if selected_view == 'Show as Markdown':
                                # πŸ–ŒοΈ Show as Markdown
                                items = get_documents(container)
                                for item in items:
                                    st.markdown(f"### πŸ†” ID: {item.get('id', 'Unknown')}")
                                    content = item.get('content', '')
                                    if isinstance(content, dict) or isinstance(content, list):
                                        content = json.dumps(content, indent=2)
                                    st.markdown(content)
                            elif selected_view == 'Show as Code Editor':
                                # πŸ’» Show as Code Editor
                                items = get_documents(container)
                                for item in items:
                                    st.code(json.dumps(item, indent=2), language='python')
                            elif selected_view == 'Show as Edit and Save':
                                # ✏️ Show as Edit and Save
                                doc_str = st.text_area("Edit Document", value=json.dumps(selected_doc, indent=2), height=300)
                                if st.button("πŸ’Ύ Save"):
                                    try:
                                        updated_doc = json.loads(doc_str)
                                        success, message = update_record(container, updated_doc)
                                        if success:
                                            st.success(message)
                                        else:
                                            st.error(message)
                                    except json.JSONDecodeError as e:
                                        st.error(f"Invalid JSON: {str(e)} 🚫")
                            elif selected_view == 'Clone Document':
                                # 🧬 Clone Document
                                if st.button("πŸ“„ Clone Document"):
                                    cloned_doc = selected_doc.copy()
                                    timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
                                    cloned_doc['id'] = f"{cloned_doc['id']}_clone_{timestamp}"
                                    success, message = insert_record(container, cloned_doc)
                                    if success:
                                        st.success(f"Document cloned with new id: {cloned_doc['id']} πŸŽ‰")
                                    else:
                                        st.error(message)
                            elif selected_view == 'New Record':
                                # πŸ†• New Record
                                new_doc_str = st.text_area("New Document", value='{}', height=300)
                                if st.button("βž• Create New Document"):
                                    try:
                                        new_doc = json.loads(new_doc_str)
                                        if 'id' not in new_doc:
                                            new_doc['id'] = f"new_doc_{datetime.now().strftime('%Y%m%d%H%M%S')}"
                                        success, message = insert_record(container, new_doc)
                                        if success:
                                            st.success(f"New document created with id: {new_doc['id']} πŸŽ‰")
                                        else:
                                            st.error(message)
                                    except json.JSONDecodeError as e:
                                        st.error(f"Invalid JSON: {str(e)} 🚫")
                else:
                    st.sidebar.info("No documents found in this container. πŸ“­")
        
        # πŸŽ‰ Main content area
        st.subheader(f"πŸ“Š Container: {st.session_state.selected_container}")
        if st.session_state.selected_container:
            df = pd.DataFrame(documents)
            st.dataframe(df)
                
        # πŸ™ GitHub section
        st.subheader("πŸ™ GitHub Operations")
        github_token = os.environ.get("GITHUB")  # Read GitHub token from environment variable
        source_repo = st.text_input("Source GitHub Repository URL", value="https://github.com/AaronCWacker/AIExamples-8-24-Streamlit")
        new_repo_name = st.text_input("New Repository Name (for cloning)", value=f"AIExample-Clone-{datetime.now().strftime('%Y%m%d_%H%M%S')}")

        col1, col2 = st.columns(2)
        with col1:
            if st.button("πŸ“₯ Clone Repository"):
                if github_token and source_repo:
                    try:
                        local_path = f"./temp_repo_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
                        download_github_repo(source_repo, local_path)
                        zip_filename = f"{new_repo_name}.zip"
                        create_zip_file(local_path, zip_filename[:-4])
                        st.markdown(get_base64_download_link(zip_filename, zip_filename), unsafe_allow_html=True)
                        st.success("Repository cloned successfully! πŸŽ‰")
                    except Exception as e:
                        st.error(f"An error occurred: {str(e)} 😒")
                    finally:
                        if os.path.exists(local_path):
                            shutil.rmtree(local_path)
                        if os.path.exists(zip_filename):
                            os.remove(zip_filename)
                else:
                    st.error("Please ensure GitHub token is set in environment variables and source repository URL is provided. πŸ”‘β“")
    
        with col2:
            if st.button("πŸ“€ Push to New Repository"):
                if github_token and source_repo:
                    try:
                        g = Github(github_token)
                        new_repo = create_repo(g, new_repo_name)
                        local_path = f"./temp_repo_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
                        download_github_repo(source_repo, local_path)
                        push_to_github(local_path, new_repo, github_token)
                        st.success(f"Repository pushed successfully to {new_repo.html_url} πŸš€")
                    except Exception as e:
                        st.error(f"An error occurred: {str(e)} 😒")
                    finally:
                        if os.path.exists(local_path):
                            shutil.rmtree(local_path)
                else:
                    st.error("Please ensure GitHub token is set in environment variables and source repository URL is provided. πŸ”‘β“")

    except exceptions.CosmosHttpResponseError as e:
        st.error(f"Failed to connect to Cosmos DB. HTTP error: {str(e)}. Status code: {e.status_code} 🚨")
    except Exception as e:
        st.error(f"An unexpected error occurred: {str(e)} 😱")

# πŸšͺ Logout button
if st.session_state.logged_in and st.sidebar.button("πŸšͺ Logout"):
    st.session_state.logged_in = False
    st.session_state.selected_records.clear()
    st.session_state.client = None
    st.session_state.selected_database = None
    st.session_state.selected_container = None
    st.experimental_rerun()

if name == "main":
main()


Edit Save fails with An unexpected error occurred: Traceback (most recent call last): File "/home/user/app/app.py", line 119, in update_record container.upsert_item(body=updated_record) NameError: name 'container' is not defined

Also Clone document errors: An unexpected error occurred: name 'container' is not defined

Fix those two issues and show full code listing along with emoji comments and humorous comments to help others learn and humorous emoji based interface output so users feel like the use is fun. Show full code listing.


My app connects to CosmosDB and dynamically allows you to view db, Container, and Document the Three levels of data in Azure Cosmos DB. I want to add one more drop down list box which allows you to change your Viewer / Editor. I want to add these variations in a drop down on sidebar: 1. Show as Markdown - this retrieves all records in the container and shows them in series with ids and contents showing as st.markdown(). 2. Show as Code Editor - this retrieves all documents / records and shows the data in st.code with language of python. 3. Show as Edit and Save - This shows the data as a document in st input Text areas and appropriate editors where a Save emoji button then saves changes back to Cosmos and cofirms updates. 4. The last is to Clone the list which looks like generic editors in the Document view where the user can select Document and then click Clone emoji button which would save a duplicate record back to Cosmos and give it a timestamped name to avoid colliding with its parent record. 5. Allow New Record which dynamically names the document saves it then allows edits to save or change the name or content of the document. Assume defaults for display are always Markdown, but often the markdown which is unicode compliant might exist inside JSON or JSONL. Provide an emoji Export button which saves a zip file and allows download of all the files or records in the current container then gives you a base64 download link for the zip file

import streamlit as st
from azure.cosmos import CosmosClient, PartitionKey, exceptions
import os
import pandas as pd
import traceback
import requests
import shutil
import zipfile
from github import Github
from git import Repo
from datetime import datetime
import base64
import json

st.set_page_config(layout="wide")

Cosmos DB configuration

ENDPOINT = "https://acae-afd.documents.azure.com:443/"
SUBSCRIPTION_ID = "003fba60-5b3f-48f4-ab36-3ed11bc40816"
DATABASE_NAME = os.environ.get("COSMOS_DATABASE_NAME")
CONTAINER_NAME = os.environ.get("COSMOS_CONTAINER_NAME")
Key = os.environ.get("Key")

GitHub configuration

def download_github_repo(url, local_path):
if os.path.exists(local_path):
shutil.rmtree(local_path)
Repo.clone_from(url, local_path)

def create_zip_file(source_dir, output_filename):
shutil.make_archive(output_filename, 'zip', source_dir)

def create_repo(g, repo_name):
user = g.get_user()
return user.create_repo(repo_name)

def push_to_github(local_path, repo, github_token):
repo_url = f"https://{github_token}@github.com/{repo.full_name}.git"
local_repo = Repo(local_path)

if 'origin' in [remote.name for remote in local_repo.remotes]:
    origin = local_repo.remote('origin')
    origin.set_url(repo_url)
else:
    origin = local_repo.create_remote('origin', repo_url)

if not local_repo.heads:
    local_repo.git.checkout('-b', 'main')
    current_branch = 'main'
else:
    current_branch = local_repo.active_branch.name

local_repo.git.add(A=True)

if local_repo.is_dirty():
    local_repo.git.commit('-m', 'Initial commit')

origin.push(refspec=f'{current_branch}:{current_branch}')

def get_base64_download_link(file_path, file_name):
with open(file_path, "rb") as file:
contents = file.read()
base64_encoded = base64.b64encode(contents).decode()
return f'Download {file_name}'

New functions for dynamic sidebar

def get_databases(client):
return [db['id'] for db in client.list_databases()]

def get_containers(database):
return [container['id'] for container in database.list_containers()]

def get_documents(container, limit=1000):
query = "SELECT * FROM c"
items = list(container.query_items(query=query, enable_cross_partition_query=True, max_item_count=limit))
return items

Cosmos DB functions

def insert_record(record):
try:
response = container.create_item(body=record)
return True, response
except exceptions.CosmosHttpResponseError as e:
return False, f"HTTP error occurred: {str(e)}. Status code: {e.status_code}"
except Exception as e:
return False, f"An unexpected error occurred: {str(e)}"

def call_stored_procedure(record):
try:
response = container.scripts.execute_stored_procedure(
sproc="processPrompt",
params=[record],
partition_key=record['id']
)
return True, response
except exceptions.CosmosHttpResponseError as e:
error_message = f"HTTP error occurred: {str(e)}. Status code: {e.status_code}"
return False, error_message
except Exception as e:
error_message = f"An unexpected error occurred: {str(e)}"
return False, error_message

def fetch_all_records():
try:
query = "SELECT * FROM c"
items = list(container.query_items(query=query, enable_cross_partition_query=True))
return pd.DataFrame(items)
except exceptions.CosmosHttpResponseError as e:
st.error(f"HTTP error occurred while fetching records: {str(e)}. Status code: {e.status_code}")
return pd.DataFrame()
except Exception as e:
st.error(f"An unexpected error occurred while fetching records: {str(e)}")
return pd.DataFrame()

def update_record(updated_record):
try:
container.upsert_item(body=updated_record)
return True, f"Record with id {updated_record['id']} successfully updated."
except exceptions.CosmosHttpResponseError as e:
return False, f"HTTP error occurred: {str(e)}. Status code: {e.status_code}"
except Exception as e:
return False, f"An unexpected error occurred: {traceback.format_exc()}"

def delete_record(name, id):
try:
container.delete_item(item=id, partition_key=id)
return True, f"Successfully deleted record with name: {name} and id: {id}"
except exceptions.CosmosResourceNotFoundError:
return False, f"Record with id {id} not found. It may have been already deleted."
except exceptions.CosmosHttpResponseError as e:
return False, f"HTTP error occurred: {str(e)}. Status code: {e.status_code}"
except Exception as e:
return False, f"An unexpected error occurred: {traceback.format_exc()}"

New function to archive all databases and containers

def archive_all_data(client):
try:
base_dir = "./cosmos_archive"
if os.path.exists(base_dir):
shutil.rmtree(base_dir)
os.makedirs(base_dir)

    for database in client.list_databases():
        db_name = database['id']
        db_dir = os.path.join(base_dir, db_name)
        os.makedirs(db_dir)

        db_client = client.get_database_client(db_name)
        for container in db_client.list_containers():
            container_name = container['id']
            container_dir = os.path.join(db_dir, container_name)
            os.makedirs(container_dir)

            container_client = db_client.get_container_client(container_name)
            items = list(container_client.read_all_items())

            with open(os.path.join(container_dir, f"{container_name}.json"), 'w') as f:
                json.dump(items, f, indent=2)

    archive_name = f"cosmos_archive_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
    shutil.make_archive(archive_name, 'zip', base_dir)
    
    return get_base64_download_link(f"{archive_name}.zip", f"{archive_name}.zip")
except Exception as e:
    return f"An error occurred while archiving data: {str(e)}"

Modify the main app

def main():
st.title("🌟 Cosmos DB and GitHub Integration")

# Initialize session state
if 'logged_in' not in st.session_state:
    st.session_state.logged_in = False
if 'selected_records' not in st.session_state:
    st.session_state.selected_records = []
if 'client' not in st.session_state:
    st.session_state.client = None
if 'selected_database' not in st.session_state:
    st.session_state.selected_database = None
if 'selected_container' not in st.session_state:
    st.session_state.selected_container = None

# Login section
if not st.session_state.logged_in:
    st.subheader("πŸ” Login")
    #input_key = st.text_input("Enter your Cosmos DB key", type="password")
    input_key=Key

    # Cosmos DB configuration
    if st.button("πŸš€ Login"):
        if input_key:
            st.session_state.primary_key = input_key
            st.session_state.logged_in = True
            st.rerun()
        else:
            st.error("Invalid key. Please check your input.")
else:
    # Initialize Cosmos DB client
    try:
        if st.session_state.client is None:
            st.session_state.client = CosmosClient(ENDPOINT, credential=st.session_state.primary_key)
        
        # Sidebar for database, container, and document selection
        st.sidebar.title("πŸ—„οΈ Cosmos DB Navigator")
        
        databases = get_databases(st.session_state.client)
        selected_db = st.sidebar.selectbox("πŸ—ƒοΈ Select Database", databases)
        
        if selected_db != st.session_state.selected_database:
            st.session_state.selected_database = selected_db
            st.session_state.selected_container = None
            st.rerun()
        
        if st.session_state.selected_database:
            database = st.session_state.client.get_database_client(st.session_state.selected_database)
            containers = get_containers(database)
            selected_container = st.sidebar.selectbox("πŸ“ Select Container", containers)
            
            if selected_container != st.session_state.selected_container:
                st.session_state.selected_container = selected_container
                st.rerun()
            
            if st.session_state.selected_container:
                container = database.get_container_client(st.session_state.selected_container)
                
                limit_to_1000 = st.sidebar.checkbox("πŸ”’ Limit to top 1000 documents", value=True)
                documents = get_documents(container, limit=1000 if limit_to_1000 else None)
                
                if documents:
                    document_ids = [doc.get('id', 'Unknown') for doc in documents]
                    selected_document = st.sidebar.selectbox("πŸ“„ Select Document", document_ids)
                    
                    if selected_document:
                        st.subheader(f"πŸ“„ Document Details: {selected_document}")
                        selected_doc = next((doc for doc in documents if doc.get('id') == selected_document), None)
                        if selected_doc:
                            st.json(selected_doc)
                else:
                    st.sidebar.info("No documents found in this container.")
        
        # Main content area
        st.subheader(f"πŸ“Š Container: {st.session_state.selected_container}")
        if st.session_state.selected_container:
            df = pd.DataFrame(documents)
            st.dataframe(df)
                
        # GitHub section
        st.subheader("πŸ™ GitHub Operations")
        github_token = os.environ.get("GITHUB")  # Read GitHub token from environment variable
        source_repo = st.text_input("Source GitHub Repository URL", value="https://github.com/AaronCWacker/AIExamples-8-24-Streamlit")
        new_repo_name = st.text_input("New Repository Name (for cloning)", value=f"AIExample-Clone-{datetime.now().strftime('%Y%m%d_%H%M%S')}")
        
        col1, col2 = st.columns(2)
        with col1:
            if st.button("πŸ“₯ Clone Repository"):
                if github_token and source_repo:
                    try:
                        local_path = f"./temp_repo_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
                        download_github_repo(source_repo, local_path)
                        zip_filename = f"{new_repo_name}.zip"
                        create_zip_file(local_path, zip_filename[:-4])
                        st.markdown(get_base64_download_link(zip_filename, zip_filename), unsafe_allow_html=True)
                        st.success("Repository cloned successfully!")
                    except Exception as e:
                        st.error(f"An error occurred: {str(e)}")
                    finally:
                        if os.path.exists(local_path):
                            shutil.rmtree(local_path)
                        if os.path.exists(zip_filename):
                            os.remove(zip_filename)
                else:
                    st.error("Please ensure GitHub token is set in environment variables and source repository URL is provided.")
    
        with col2:
            if st.button("πŸ“€ Push to New Repository"):
                if github_token and source_repo:
                    try:
                        g = Github(github_token)
                        new_repo = create_repo(g, new_repo_name)
                        local_path = f"./temp_repo_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
                        download_github_repo(source_repo, local_path)
                        push_to_github(local_path, new_repo, github_token)
                        st.success(f"Repository pushed successfully to {new_repo.html_url}")
                    except Exception as e:
                        st.error(f"An error occurred: {str(e)}")
                    finally:
                        if os.path.exists(local_path):
                            shutil.rmtree(local_path)
                else:
                    st.error("Please ensure GitHub token is set in environment variables and source repository URL is provided.")

    except exceptions.CosmosHttpResponseError as e:
        st.error(f"Failed to connect to Cosmos DB. HTTP error: {str(e)}. Status code: {e.status_code}")
    except Exception as e:
        st.error(f"An unexpected error occurred: {str(e)}")

# Logout button
if st.session_state.logged_in and st.sidebar.button("πŸšͺ Logout"):
    st.session_state.logged_in = False
    st.session_state.selected_records.clear()
    st.session_state.client = None
    st.session_state.selected_database = None
    st.session_state.selected_container = None
    st.rerun()

if name == "main":
main()

Sign up or log in to comment