import os import streamlit as st import requests import msal from datetime import datetime, timedelta import calendar # Configuration APPLICATION_ID_KEY = os.getenv('APPLICATION_ID_KEY') CLIENT_SECRET_KEY = os.getenv('CLIENT_SECRET_KEY') AUTHORITY_URL = 'https://login.microsoftonline.com/common' REDIRECT_URI = 'https://huggingface.co/spaces/awacke1/MSGraphAPI' # 1. Configuration Site_Name = 'π Event π Schedule and πAgenda πDashboard' title="π Event π Schedule and πAgenda πDashboard" helpURL='https://huggingface.co/awacke1' bugURL='https://huggingface.co/spaces/awacke1' icons='π ' #icons = Image.open("icons.ico") st.set_page_config( page_title=title, page_icon=icons, layout="wide", #initial_sidebar_state="expanded", initial_sidebar_state="auto", menu_items={ 'Get Help': helpURL, 'Report a bug': bugURL, 'About': title } ) # Sticky navigation menu def sticky_menu(): st.markdown( """
""", unsafe_allow_html=True ) # Define product to scope mapping, links, AI capabilities, and Graph solutions PRODUCT_SCOPES = { "π§ Outlook": { 'scopes': ['Mail.Read', 'Mail.Send'], 'link': 'https://outlook.office.com/mail/', 'ai_capabilities': "π€βοΈ Smart email & scheduling", 'graph_solution': "π¨π Mail, calendar & contacts API" }, "π Calendar": { 'scopes': ['Calendars.ReadWrite'], 'link': 'https://outlook.office.com/calendar/', 'ai_capabilities': "π€π Smart scheduling & reminders", 'graph_solution': "π Calendar management API" }, "π Tasks": { 'scopes': ['Tasks.ReadWrite'], 'link': 'https://to-do.office.com/tasks/', 'ai_capabilities': "π€π Task prioritization", 'graph_solution': "β Task & plan management" }, "ποΈ OneDrive": { 'scopes': ['Files.ReadWrite.All'], 'link': 'https://onedrive.live.com/', 'ai_capabilities': "π€π Smart file organization", 'graph_solution': "π File & folder API" }, "ποΈ SharePoint": { 'scopes': ['Sites.Read.All', 'Sites.ReadWrite.All'], 'link': 'https://www.microsoft.com/microsoft-365/sharepoint/collaboration', 'ai_capabilities': "π€π Smart search & tagging", 'graph_solution': "π Sites & lists API" }, "π Teams": { 'scopes': ['Team.ReadBasic.All', 'Channel.ReadBasic.All'], 'link': 'https://teams.microsoft.com/', 'ai_capabilities': "π€π¬ Meeting insights & summaries", 'graph_solution': "π₯ Teams & chats API" }, "π Microsoft Bookings": { 'scopes': ['Bookings.Read.All', 'Bookings.ReadWrite.All'], 'link': 'https://outlook.office.com/bookings/', 'ai_capabilities': "π€π Smart scheduling", 'graph_solution': "π Booking services API" }, "π£οΈ Translator": { 'scopes': ['Translation.Read'], 'link': 'https://www.microsoft.com/translator/', 'ai_capabilities': "π€π Real-time translation", 'graph_solution': "π¨οΈ Translation services API" }, "π Loop": { 'scopes': ['Files.ReadWrite.All'], 'link': 'https://loop.microsoft.com/', 'ai_capabilities': "π€π Real-time collaboration AI", 'graph_solution': "π Workspace API" }, "π Azure OpenAI Service": { 'scopes': ['AzureAIServices.ReadWrite.All'], 'link': 'https://azure.microsoft.com/products/cognitive-services/openai-service/', 'ai_capabilities': "π€π§ Custom AI model access", 'graph_solution': "π AI model integration API" }, "π§ Copilot": { 'scopes': ['Cognitive.Read'], 'link': 'https://www.microsoft.com/microsoft-365/copilot', 'ai_capabilities': "π€π Cross-app AI assistance", 'graph_solution': "π§ AI integration API" }, "π OneNote": { 'scopes': ['Notes.Read', 'Notes.Create'], 'link': 'https://www.onenote.com/notebooks', 'ai_capabilities': "π€π Content suggestion & OCR", 'graph_solution': "π Notebook & page API" }, "π Excel": { 'scopes': ['Files.ReadWrite.All'], 'link': 'https://www.office.com/launch/excel', 'ai_capabilities': "π€π Data analysis & insights", 'graph_solution': "π Workbook & chart API" }, "π Word": { 'scopes': ['Files.ReadWrite.All'], 'link': 'https://www.office.com/launch/word', 'ai_capabilities': "π€βοΈ Smart drafting & editing", 'graph_solution': "π Document content API" }, "π¬ Viva": { 'scopes': ['Analytics.Read'], 'link': 'https://www.microsoft.com/microsoft-viva', 'ai_capabilities': "π€π Personalized insights", 'graph_solution': "π Analytics & learning API" }, "π Power Platform": { 'scopes': ['Flow.Read.All'], 'link': 'https://powerplatform.microsoft.com/', 'ai_capabilities': "π€βοΈ AI-powered automation", 'graph_solution': "π§ Workflow & app API" }, "π‘ PowerPoint": { 'scopes': ['Files.ReadWrite.All'], 'link': 'https://www.office.com/launch/powerpoint', 'ai_capabilities': "π€π¨ Design & coaching AI", 'graph_solution': "π Presentation API" }, } BASE_SCOPES = ['User.Read'] def get_msal_app(): return msal.ConfidentialClientApplication( client_id=APPLICATION_ID_KEY, client_credential=CLIENT_SECRET_KEY, authority=AUTHORITY_URL ) def get_access_token(code): client_instance = get_msal_app() try: result = client_instance.acquire_token_by_authorization_code( code=code, scopes=st.session_state.get('request_scopes', BASE_SCOPES), redirect_uri=REDIRECT_URI ) if 'access_token' in result: return result['access_token'] else: raise Exception(f"Error acquiring token: {result.get('error_description')}") except Exception as e: st.error(f"Exception in get_access_token: {str(e)}") raise def make_api_call(access_token, endpoint, method='GET', data=None): headers = {'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json'} url = f'https://graph.microsoft.com/v1.0/{endpoint}' if method == 'GET': response = requests.get(url, headers=headers) elif method == 'POST': response = requests.post(url, headers=headers, json=data) else: raise ValueError(f"Unsupported method: {method}") if response.status_code in [200, 201]: return response.json() else: st.error(f"API call failed: {response.status_code} - {response.text}") return None def handle_outlook_integration(access_token): st.subheader("π§ Outlook Integration") st.markdown(f"[Open Outlook]({PRODUCT_SCOPES['π§ Outlook']['link']})") # Read emails emails = make_api_call(access_token, 'me/messages?$top=10&$orderby=receivedDateTime desc') if emails and 'value' in emails: for email in emails['value']: with st.expander(f"From: {email['from']['emailAddress']['name']} - Subject: {email['subject']}"): st.write(f"Received: {email['receivedDateTime']}") st.write(f"Body: {email['bodyPreview']}") else: st.write("No emails found or unable to fetch emails.") # Create (Send) email st.write("Send a new email:") recipient = st.text_input("Recipient Email") subject = st.text_input("Subject") body = st.text_area("Body") if st.button("Send Email"): new_email = { "message": { "subject": subject, "body": { "contentType": "Text", "content": body }, "toRecipients": [ { "emailAddress": { "address": recipient } } ] } } result = make_api_call(access_token, 'me/sendMail', method='POST', data=new_email) if result is None: # sendMail doesn't return content on success st.success("Email sent successfully!") else: st.error("Failed to send email.") # Update email (mark as read) st.write("Mark an email as read:") email_id = st.text_input("Enter Email ID") if st.button("Mark as Read"): update_data = { "isRead": True } result = make_api_call(access_token, f'me/messages/{email_id}', method='PATCH', data=update_data) if result is None: # PATCH doesn't return content on success st.success("Email marked as read!") else: st.error("Failed to mark email as read.") # Delete email st.write("Delete an email:") email_id = st.text_input("Enter Email ID to delete") if st.button("Delete Email"): result = make_api_call(access_token, f'me/messages/{email_id}', method='DELETE') if result is None: # DELETE doesn't return content on success st.success("Email deleted successfully!") else: st.error("Failed to delete email.") def handle_calendar_integration(access_token): st.subheader("π Calendar Integration") st.markdown(f"[Open Calendar]({PRODUCT_SCOPES['π Calendar']['link']})") # Get the current month's start and end dates now = datetime.now() start_of_month = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) end_of_month = start_of_month.replace(month=start_of_month.month % 12 + 1, day=1) - timedelta(days=1) events = make_api_call(access_token, f"me/calendarView?startDateTime={start_of_month.isoformat()}&endDateTime={end_of_month.isoformat()}&$orderby=start/dateTime") if events and 'value' in events: # Create a calendar view cal = calendar.monthcalendar(now.year, now.month) st.write(f"Calendar for {now.strftime('%B %Y')}") # Create a placeholder for each day day_placeholders = {} for week in cal: cols = st.columns(7) for i, day in enumerate(week): if day != 0: day_placeholders[day] = cols[i].empty() day_placeholders[day].write(f"**{day}**") # Populate the calendar with events for event in events['value']: start_date = datetime.fromisoformat(event['start']['dateTime'][:-1]) # Remove 'Z' from the end day = start_date.day if day in day_placeholders: day_placeholders[day].write(f"{start_date.strftime('%H:%M')} - {event['subject']}") else: st.write("No events found or unable to fetch events.") # Anchor for Add Event st.markdown("", unsafe_allow_html=True) # Create event β Add Event st.write("β Add New Event:") event_subject = st.text_input("Event Subject") event_date = st.date_input("Event Date") event_time = st.time_input("Event Time") if st.button("β Add Event"): event_start = datetime.combine(event_date, event_time) event_end = event_start + timedelta(hours=1) new_event = { "subject": event_subject, "start": { "dateTime": event_start.isoformat(), "timeZone": "UTC" }, "end": { "dateTime": event_end.isoformat(), "timeZone": "UTC" } } result = make_api_call(access_token, 'me/events', method='POST', data=new_event) if result: st.success("Event added successfully!") else: st.error("Failed to add event.") # Update event st.write("Update an event:") event_id = st.text_input("Enter Event ID") new_subject = st.text_input("New Subject") if st.button("Update Event"): update_data = { "subject": new_subject } result = make_api_call(access_token, f'me/events/{event_id}', method='PATCH', data=update_data) if result is None: # PATCH doesn't return content on success st.success("Event updated successfully!") else: st.error("Failed to update event.") # Delete event st.write("Delete an event:") event_id_to_delete = st.text_input("Enter Event ID to delete") if st.button("Delete Event"): result = make_api_call(access_token, f'me/events/{event_id_to_delete}', method='DELETE') if result is None: # DELETE doesn't return content on success st.success("Event deleted successfully!") else: st.error("Failed to delete event.") def handle_tasks_integration(access_token): st.subheader("π Tasks Integration") st.markdown(f"[Open Tasks]({PRODUCT_SCOPES['π Tasks']['link']})") # Read tasks tasks = make_api_call(access_token, 'me/todo/lists') if tasks and 'value' in tasks: default_list = next((list for list in tasks['value'] if list['wellknownListName'] == 'defaultList'), None) if default_list: tasks = make_api_call(access_token, f"me/todo/lists/{default_list['id']}/tasks") if tasks and 'value' in tasks: for task in tasks['value']: st.write(f"Task: {task['title']}") st.write(f"Status: {'Completed' if task['status'] == 'completed' else 'Not Completed'}") st.write("---") else: st.write("No tasks found or unable to fetch tasks.") else: st.write("Default task list not found.") else: st.write("Unable to fetch task lists.") # Create task st.write("Add a new task:") task_title = st.text_input("Task Title") if st.button("Add Task"): new_task = { "title": task_title } result = make_api_call(access_token, f"me/todo/lists/{default_list['id']}/tasks", method='POST', data=new_task) if result: st.success("Task added successfully!") else: st.error("Failed to add task.") # Update task st.write("Update a task:") task_id = st.text_input("Enter Task ID") new_title = st.text_input("New Title") if st.button("Update Task"): update_data = { "title": new_title } result = make_api_call(access_token, f"me/todo/lists/{default_list['id']}/tasks/{task_id}", method='PATCH', data=update_data) if result is None: # PATCH doesn't return content on success st.success("Task updated successfully!") else: st.error("Failed to update task.") # Delete task st.write("Delete a task:") task_id_to_delete = st.text_input("Enter Task ID to delete") if st.button("Delete Task"): result = make_api_call(access_token, f"me/todo/lists/{default_list['id']}/tasks/{task_id_to_delete}", method='DELETE') if result is None: # DELETE doesn't return content on success st.success("Task deleted successfully!") else: st.error("Failed to delete task.") def handle_onedrive_integration(access_token): st.subheader("ποΈ OneDrive Integration") st.markdown(f"[Open OneDrive]({PRODUCT_SCOPES['ποΈ OneDrive']['link']})") # Read files files = make_api_call(access_token, 'me/drive/root/children') if files and 'value' in files: for file in files['value']: st.write(f"Name: {file['name']}") st.write(f"Type: {'Folder' if 'folder' in file else 'File'}") st.write(f"Last Modified: {file['lastModifiedDateTime']}") st.write("---") else: st.write("No files found or unable to fetch files.") # Create file st.write("Create a new text file:") file_name = st.text_input("File Name (include .txt extension)") file_content = st.text_area("File Content") if st.button("Create File"): create_file_url = f"https://graph.microsoft.com/v1.0/me/drive/root:/{file_name}:/content" headers = { 'Authorization': f'Bearer {access_token}', 'Content-Type': 'text/plain' } response = requests.put(create_file_url, headers=headers, data=file_content.encode('utf-8')) if response.status_code == 201: st.success("File created successfully!") else: st.error("Failed to create file.") # Update file st.write("Update a file:") file_path = st.text_input("File Path (e.g., /Documents/file.txt)") new_content = st.text_area("New Content") if st.button("Update File"): update_file_url = f"https://graph.microsoft.com/v1.0/me/drive/root:{file_path}:/content" headers = { 'Authorization': f'Bearer {access_token}', 'Content-Type': 'text/plain' } response = requests.put(update_file_url, headers=headers, data=new_content.encode('utf-8')) if response.status_code == 200: st.success("File updated successfully!") else: st.error("Failed to update file.") # Delete file st.write("Delete a file:") file_path_to_delete = st.text_input("File Path to delete (e.g., /Documents/file.txt)") if st.button("Delete File"): result = make_api_call(access_token, f"me/drive/root:{file_path_to_delete}", method='DELETE') if result is None: # DELETE doesn't return content on success st.success("File deleted successfully!") else: st.error("Failed to delete file.") def handle_sharepoint_integration(access_token): st.subheader("ποΈ SharePoint Integration") st.markdown(f"[Open SharePoint]({PRODUCT_SCOPES['ποΈ SharePoint']['link']})") st.write("Smart search & tagging features coming soon.") def handle_teams_integration(access_token): st.subheader("π Teams Integration") st.markdown(f"[Open Teams]({PRODUCT_SCOPES['π Teams']['link']})") st.write("Meeting insights & summaries features coming soon.") def handle_bookings_integration(access_token): st.subheader("π Microsoft Bookings Integration") st.markdown(f"[Open Microsoft Bookings]({PRODUCT_SCOPES['π Microsoft Bookings']['link']})") st.write("Smart scheduling features coming soon.") def handle_translator_integration(access_token): st.subheader("π£οΈ Translator Integration") st.markdown(f"[Open Translator]({PRODUCT_SCOPES['π£οΈ Translator']['link']})") st.write("Real-time translation features coming soon.") def handle_loop_integration(access_token): st.subheader("π Loop Integration") st.markdown(f"[Open Loop]({PRODUCT_SCOPES['π Loop']['link']})") st.write("Real-time collaboration AI features coming soon.") def handle_openai_service_integration(access_token): st.subheader("π Azure OpenAI Service Integration") st.markdown(f"[Open Azure OpenAI Service]({PRODUCT_SCOPES['π Azure OpenAI Service']['link']})") st.write("Custom AI model access features coming soon.") def handle_copilot_integration(access_token): st.subheader("π§ Copilot Integration") st.markdown(f"[Open Copilot]({PRODUCT_SCOPES['π§ Copilot']['link']})") st.write("Cross-app AI assistance features coming soon.") def handle_onenote_integration(access_token): st.subheader("π OneNote Integration") st.markdown(f"[Open OneNote]({PRODUCT_SCOPES['π OneNote']['link']})") st.write("Content suggestion & OCR features coming soon.") def handle_excel_integration(access_token): st.subheader("π Excel Integration") st.markdown(f"[Open Excel]({PRODUCT_SCOPES['π Excel']['link']})") st.write("Data analysis & insights features coming soon.") def handle_word_integration(access_token): st.subheader("π Word Integration") st.markdown(f"[Open Word]({PRODUCT_SCOPES['π Word']['link']})") st.write("Smart drafting & editing features coming soon.") def handle_viva_integration(access_token): st.subheader("π¬ Viva Integration") st.markdown(f"[Open Viva]({PRODUCT_SCOPES['π¬ Viva']['link']})") st.write("Personalized insights features coming soon.") def handle_power_platform_integration(access_token): st.subheader("π Power Platform Integration") st.markdown(f"[Open Power Platform]({PRODUCT_SCOPES['π Power Platform']['link']})") st.write("AI-powered automation features coming soon.") def handle_powerpoint_integration(access_token): st.subheader("π‘ PowerPoint Integration") st.markdown(f"[Open PowerPoint]({PRODUCT_SCOPES['π‘ PowerPoint']['link']})") st.write("Design & coaching AI features coming soon.") def main(): st.title("π¦ MS Graph API with AI & Cloud Integration for M365") # Sticky Menu sticky_menu() st.sidebar.title("π M365 Products") st.sidebar.write("Select products to integrate:") selected_products = {} for product, details in PRODUCT_SCOPES.items(): selected = st.sidebar.checkbox(product) if selected: selected_products[product] = True st.sidebar.write(f"AI Capabilities: {details['ai_capabilities']}") st.sidebar.write(f"Graph Solution: {details['graph_solution']}") request_scopes = BASE_SCOPES.copy() for product in selected_products: request_scopes.extend(PRODUCT_SCOPES[product]['scopes']) request_scopes = list(set(request_scopes)) st.session_state['request_scopes'] = request_scopes if 'access_token' not in st.session_state: client_instance = get_msal_app() auth_url = client_instance.get_authorization_request_url( scopes=request_scopes, redirect_uri=REDIRECT_URI ) st.write('π Please [click here]({}) to log in and authorize the app.'.format(auth_url)) query_params = st.query_params if 'code' in query_params: code = query_params.get('code') st.write('π Authorization Code Obtained:', code[:10] + '...') try: access_token = get_access_token(code) st.session_state['access_token'] = access_token st.success("Access token acquired successfully!") st.rerun() except Exception as e: st.error(f"Error acquiring access token: {str(e)}") st.stop() fragment = query_params.get('code', [None])[0] # For example, 'add-event' # Handle section jumping based on the URL fragment if fragment: st.write(f"Detected URL fragment: {fragment}") # Simulate the scrolling or jump based on fragment if fragment == 'add-event': st.markdown("", unsafe_allow_html=True) else: access_token = st.session_state['access_token'] user_info = make_api_call(access_token, 'me') if user_info: st.sidebar.write(f"π Hello, {user_info.get('displayName', 'User')}!") if selected_products: for product in selected_products: if product == "π§ Outlook": handle_outlook_integration(access_token) elif product == "π Calendar": handle_calendar_integration(access_token) elif product == "π Tasks": handle_tasks_integration(access_token) elif product == "ποΈ OneDrive": handle_onedrive_integration(access_token) elif product == "ποΈ SharePoint": handle_sharepoint_integration(access_token) elif product == "π Teams": handle_teams_integration(access_token) elif product == "π Microsoft Bookings": handle_bookings_integration(access_token) elif product == "π£οΈ Translator": handle_translator_integration(access_token) elif product == "π Loop": handle_loop_integration(access_token) elif product == "π Azure OpenAI Service": handle_openai_service_integration(access_token) elif product == "π§ Copilot": handle_copilot_integration(access_token) elif product == "π OneNote": handle_onenote_integration(access_token) elif product == "π Excel": handle_excel_integration(access_token) elif product == "π Word": handle_word_integration(access_token) elif product == "π¬ Viva": handle_viva_integration(access_token) elif product == "π Power Platform": handle_power_platform_integration(access_token) elif product == "π‘ PowerPoint": handle_powerpoint_integration(access_token) else: st.write("No products selected. Please select products from the sidebar.") if __name__ == "__main__": main()