# # Copyright 2023 Venafi, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import os import requests import json import pandas as pd import numpy as np import datetime import io import matplotlib.pyplot as plt import openai import plotly import gradio as gr # Initiate functions to be defined with API key input. data_structure_overview = '' data_relationships_overview = '' fig ='' output_string = '' def prompt_analyze_reporting(prompt): output = openai.ChatCompletion.create(model="gpt-3.5-turbo",temperature = 0.0, messages=[{"role": "user", "content": data_structure_overview}, {"role": "user", "content": data_relationships_overview},{"role": "user", "content": f"""Do not attempt to use .csv files in your code."""}, {"role": "user", "content": f"""Only use plotly to output charts, graphs, or figures. Do not use matplotlib or other charting libraries. Name the chart object as 'fig'"""}, {"role": "user", "content": f"""Create a python script to: {prompt}"""} ]) global parsed_response parsed_response = output.choices[0].message.content.strip().split('```python')[len(output.choices[0].message.content.strip().split('```python')) -1 ].split('```')[0] parsed_response_global = f"""global fig global string {parsed_response}""" exec(parsed_response_global) return fig def prompt_analyze_questions(prompt): output = openai.ChatCompletion.create(model="gpt-3.5-turbo",temperature = 0.0, messages=[{"role": "user", "content": data_structure_overview}, {"role": "user", "content": data_relationships_overview},{"role": "user", "content": f"""Do not attempt to use .csv files in your code."""}, {"role": "user", "content": f"""Do not attempt to create charts or visualize the question with graphics. Only provide string responses."""}, {"role": "user", "content": f"""If you are asked to create visualizations or graphs, create a python script to store a string variable named output_string with the text 'Sorry, I cannot create reporting, select 'Add Reporting' to create reports."""}, {"role": "user", "content": f"""Create a python script to: {prompt}"""}, {"role": "user", "content": f"""Store the final response as a string variable named output_string"""} ]) global parsed_response parsed_response = output.choices[0].message.content.strip().split('```python')[len(output.choices[0].message.content.strip().split('```python')) -1 ].split('```')[0] parsed_response_global = f"""global fig global string {parsed_response} globals().update(locals())""" exec(parsed_response_global) return output_string # Data Retrieval def getData(tlspc_api_key, openai_api_key): try: # Store OpenAI API Key openai.api_key = openai_api_key # Get Certificate Data cert_url = "https://api.venafi.cloud/outagedetection/v1/certificates?ownershipTree=false&excludeSupersededInstances=false&limit=10000" headers = { "accept": "application/json", "tppl-api-key": tlspc_api_key } cert_response = requests.get(cert_url, headers=headers) certs_df = pd.json_normalize(cert_response.json()['certificates']).convert_dtypes() certs_df.rename(columns = {'id':'certificateId'}, inplace = True) certs_df.drop(['companyId'],axis=1,inplace=True) certs_df['validityStart'] = pd.to_datetime(certs_df['validityStart']).dt.date certs_df['validityEnd'] = pd.to_datetime(certs_df['validityEnd']).dt.date # Application Data and Formatting application_url = "https://api.venafi.cloud/outagedetection/v1/applications" headers = { "accept": "application/json", "tppl-api-key": tlspc_api_key } application_response = requests.get(application_url, headers=headers) application_df = pd.json_normalize(application_response.json()['applications']).convert_dtypes() application_df_2 = application_df[['id', 'name', 'description', 'fullyQualifiedDomainNames', 'ipRanges', 'ports', 'modificationDate', 'creationDate','ownership.owningUsers', 'ownership.owningTeams']] # Flatten application owners and re-merge application_owners = pd.json_normalize(application_response.json()['applications'], record_path = ['ownerIdsAndTypes'], meta = ['id']).convert_dtypes() application_df = pd.merge(application_df_2, application_owners, left_on = 'id', right_on = 'id') application_df.rename(columns = {'id':'application_id', 'creationDate':'application_creationDate', 'modificationDate':'application_modificationDate'}, inplace = True) # User Data users_url = "https://api.venafi.cloud/v1/users" headers = { "accept": "application/json", "tppl-api-key": tlspc_api_key } users_response = requests.get(users_url, headers=headers) users_df = pd.json_normalize(users_response.json()['users']).convert_dtypes() users_df.rename(columns = {'id':'user_id'}, inplace = True) users_df.drop(['companyId'],axis=1,inplace=True) # Teams Data teams_url = "https://api.venafi.cloud/v1/teams" headers = { "accept": "application/json", "tppl-api-key": tlspc_api_key } teams_response = requests.get(teams_url, headers=headers) teams_df = pd.json_normalize(teams_response.json()['teams']).convert_dtypes() teams_df.rename(columns = {'id':'team_id', 'modificationDate':'teams_modificationDate'}, inplace = True) teams_df.drop(['companyId'],axis=1,inplace=True) # Machines Data machines_url = "https://api.venafi.cloud/v1/machines" headers = { "accept": "application/json", "tppl-api-key": tlspc_api_key } machines_response = requests.get(machines_url, headers=headers) machines_df = pd.json_normalize(machines_response.json()['machines']).convert_dtypes() machines_df.rename(columns = {'id':'machine_id', 'creationDate':'machine_creationDate', 'modificationDate':'machine_modificationDate'}, inplace = True) machines_df.drop(['companyId'],axis=1,inplace=True) # Machine Identities Data machine_identities_url = "https://api.venafi.cloud/v1/machineidentities" headers = { "accept": "application/json", "tppl-api-key": tlspc_api_key } machine_identities_response = requests.get(machine_identities_url, headers=headers) machine_identities_df = pd.json_normalize(machine_identities_response.json()['machineIdentities']).convert_dtypes().iloc[:,:7] machine_identities_df.rename(columns = {'machineId':'machine_id', 'id':'machine_identity_id', 'creationDate':'machine_identity_creationDate', 'modificationDate':'machine_identities_modificationDate'}, inplace = True) machine_identities_df.drop(['companyId'],axis=1,inplace=True) # Certificate Requests def getCertRequests(): currentPage = 0 cert_requests_url = "https://api.venafi.cloud/outagedetection/v1/certificaterequestssearch" headers = { "accept": "application/json", "tppl-api-key": tlspc_api_key} payload = { "paging": { "pageNumber": 1, "pageSize": 1000}} response = requests.post(url=cert_requests_url, headers=headers,json=payload) if(response.status_code != 200): raise Exception('Error retrieving certificate requests:' + "\n" + response.text + "\n=============\n") data = response.json() cert_requests = data['certificateRequests'] while data['numFound'] > (currentPage * 1000): currentPage+=1 print('Getting page ' + str(currentPage) + ': Number remaining - ' + str(data['numFound'] - currentPage*1000)) payload['paging']['pageNumber'] = currentPage response = requests.post(url=cert_requests_url, headers=headers,json=payload) data = response.json() cert_requests += data['certificateRequests'] return cert_requests cert_requests_json = getCertRequests() cert_requests_df = pd.json_normalize(cert_requests_json).convert_dtypes() cert_requests_df.rename(columns = {'id':'cert_request_id', 'creationDate':'cert_request_creationDate'}, inplace = True) cert_requests_df.drop(['companyId'],axis=1,inplace=True) # Issuing Templates issuing_template_url = "https://api.venafi.cloud/v1/certificateissuingtemplates" headers = { "accept": "application/json", "tppl-api-key": tlspc_api_key } issuing_template_response = requests.get(issuing_template_url, headers=headers) issuing_templates_df = pd.json_normalize(issuing_template_response.json()['certificateIssuingTemplates']).convert_dtypes() issuing_templates_df.rename(columns = {'id':'issuing_template_id', 'creationDate':'issuing_template_creationDate'}, inplace = True) issuing_templates_df.drop(['companyId'],axis=1,inplace=True) # Prompt Engineering # Get data structure for each dataframe to be passed in initial prompt users_data_description = users_df.dtypes.apply(lambda x: x.name).to_dict() application_data_description = application_df.dtypes.apply(lambda x: x.name).to_dict() certificate_data_description = certs_df.dtypes.apply(lambda x: x.name).to_dict() teams_data_description = teams_df.dtypes.apply(lambda x: x.name).to_dict() machines_data_description = machines_df.dtypes.apply(lambda x: x.name).to_dict() machine_identities_data_description = machine_identities_df.dtypes.apply(lambda x: x.name).to_dict() cert_requests_data_description = cert_requests_df.dtypes.apply(lambda x: x.name).to_dict() issuing_templates_data_description = issuing_templates_df.dtypes.apply(lambda x: x.name).to_dict() data_structure_overview = f"""I have multiple python pandas dataframes. One is named application_df which contains data on applications and has the following structure: {application_data_description}. Another python pandas dataframe is named users_df and contains user information and has the following structure: {users_data_description}. Another python pandas dataframe is named certs_df and contains certificate information and has the following structure: {certificate_data_description}. Another python pandas dataframe is named teams_df and contains teams information and has the following structure: {teams_data_description}. Another python pandas dataframe is named machines_df and contains machine information and has the following structure: {machines_data_description}. Another python pandas dataframe is named machine_identities_df and contains machine identity information and has the following structure: {machine_identities_data_description}. Another python pandas dataframe is named cert_requests_df and contains certificate request information and has the following structure: {cert_requests_data_description} Another python pandas dataframe is named issuing_templates_df and contains issuing template information and has the following structure: {issuing_templates_data_description} """ data_relationships_overview = """The dataframes relate to eachother in the following manner. The column values in the 'user_id' column in users_df match the column values in the 'ownerId' column in application_df. The column values in the 'team_id' column in teams_df match the column values in the 'owningTeamId' column in machines_df. The column values in the 'certificateOwnerUserId' column in cert_requests_df match the column values in the 'user_id' column in users_df. The column values in the 'certificateIssuingTemplateId' column in cert_requests_df match the column values in the 'issuing_template_id' column in issuing_templates_df. The column values in the 'certificateOwnerUserId' column in cert_requests_df match the column values in the 'user_id' column in users_df. The column values in the 'certificateIssuingTemplateId' column in certs_request_df match the column values in the 'issuing_template_id' column in issuing_templates_df. """ # Store variables for use in other portions of the application globals().update(locals()) return 'Data successfully loaded!' except: return 'Error in loading data. Please try again.' # User facing application with gr.Blocks(theme='aliabid94/new-theme') as demo: gr.Image('https://design.venafi.com/dist/svg/logos/venafi/logo-venafi-combo.svg', height = 50, width = 150, show_share_button = False, show_download_button = False, show_label = False, container=False) gr.Markdown("Get Answers to questions from your TLS Protect Cloud data or Generate Reporting with this Generative AI application from Venafi.") with gr.Tab('Read Me'): gr.Markdown(""" # Welcome to Venafi Explorer! This is an experimental generative AI application for the Venafi Control Plane. \ It leverages Venafi's proprietary data capture technology in combination with the OpenAI API to use natural language to provide answers and insights surrounding your Venafi Control Plane environment.\ Please note to use Venafi Explorer you will need to have both a TLS Protect Cloud API key (Try it for free at venafi.com/signup/) as well as an OpenAI API Key. \ To get started, navigate to the 'API Keys' tab to input your API keys and ingest data from your TLS Protect Cloud environment. """) with gr.Tab("API Keys"): tlspc_api_key = gr.Textbox(label = 'Please provide your TLS Protect Cloud API Key:', type = 'password') openai_api_key = gr.Textbox(label = 'Please provide your OpenAI API Key:', type = 'password', placeholder = 'Note: To use the OpenAI API, you need a paid account') api_key_output = gr.Textbox(label = 'Result') load_button = gr.Button('Load TLS Protect Cloud Data') with gr.Tab("Answer Questions"): #prompt_tlspc_key = gr.Textbox(label = 'Please provide your TLS Protect Cloud API Key:') prompt_questions = gr.Textbox(label = 'Input prompt here:', placeholder = "Try something like 'What is the name of the issuing template that has been used to request the most certificates?'") text_output = gr.Textbox(label = 'Response:') text_button = gr.Button("Submit") with gr.Tab("Create Graphs"): prompt_reporting = gr.Textbox(label = 'Input prompt here:', placeholder = "Try something like 'Plot a line chart of certificate issuances over time'") chart_output = gr.Plot(label = 'Output:') chart_button = gr.Button("Submit") text_button.click(prompt_analyze_questions, inputs=prompt_questions, outputs=text_output) chart_button.click(prompt_analyze_reporting, inputs=prompt_reporting, outputs=chart_output) load_button.click(getData, inputs=[tlspc_api_key, openai_api_key], outputs=api_key_output) demo.launch()