# # Copyright 2023 Venafi, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import os import requests import json import pandas as pd import numpy as np import datetime import io import matplotlib.pyplot as plt import openai import plotly import gradio as gr # Initiate functions to be defined with API key input. data_structure_overview = '' data_relationships_overview = '' fig ='' output_string = '' def prompt_analyze_reporting(prompt, passed_args): data_structure_overview = passed_args['data_structure_overview'] data_relationships_overview = passed_args['data_relationships_overview'] passed_args['fig'] = '' output = openai.chat.completions.create(model="gpt-3.5-turbo",temperature = 0.0, messages=[{"role": "user", "content": data_structure_overview}, {"role": "user", "content": data_relationships_overview},{"role": "user", "content": f"""Do not attempt to use .csv files in your code."""}, {"role": "user", "content": f"""Only use plotly to output charts, graphs, or figures. Do not use matplotlib or other charting libraries. Name the chart object as 'fig' but do not show it"""}, {"role": "user", "content": f"""Create a python script to: {prompt}"""} ]) parsed_response = output.choices[0].message.content.strip().split('```python')[len(output.choices[0].message.content.strip().split('```python')) -1 ].split('```')[0] exec(parsed_response,globals(),passed_args) fig = passed_args['fig'] return fig def prompt_analyze_questions(prompt, passed_args): data_structure_overview = passed_args['data_structure_overview'] data_relationships_overview = passed_args['data_relationships_overview'] passed_args['output_string']='' output = openai.chat.completions.create(model="gpt-3.5-turbo",temperature = 0.0, messages=[{"role": "user", "content": data_structure_overview}, {"role": "user", "content": data_relationships_overview},{"role": "user", "content": f"""Do not attempt to use .csv files in your code."""}, {"role": "user", "content": f"""Do not attempt to create charts or visualize the question with graphics. Only provide string responses."""}, {"role": "user", "content": f"""If you are asked to create visualizations or graphs, create a python script to store a string variable named output_string with the text 'Sorry, I cannot create reporting, select 'Add Reporting' to create reports."""}, {"role": "user", "content": f"""Create a python script to: {prompt}"""}, {"role": "user", "content": f"""Store the final response as a string variable named output_string"""} ]) parsed_response = output.choices[0].message.content.strip().split('```python')[len(output.choices[0].message.content.strip().split('```python')) -1 ].split('```')[0] exec(parsed_response,globals(),passed_args) output_string = passed_args['output_string'] return output_string # Data Retrieval # Some API calls may fail if customers haven't fully configured their environment def getData(tlspc_api_key, openai_api_key): # Store OpenAI API Key openai.api_key = openai_api_key # Create dictionary for adding variables dfs = {} # Get Cert Requests def getCertRequests(): currentPage = 0 cert_requests_url = "https://api.venafi.cloud/outagedetection/v1/certificaterequestssearch" headers = { "accept": "application/json", "tppl-api-key": tlspc_api_key} payload = {"paging": { "pageNumber": 1, "pageSize": 1000}} response = requests.post(url=cert_requests_url, headers=headers, json=payload) if (response.status_code != 200): raise Exception('Error retrieving certificate requests:' + "\n" + response.text + "\n=============\n") data = response.json() cert_requests = data['certificateRequests'] while data['numFound'] > (currentPage * 1000): currentPage += 1 # print('Getting page ' + str(currentPage) + ': Number remaining - ' + str( # data['numFound'] - currentPage * 1000)) payload['paging']['pageNumber'] = currentPage response = requests.post(url=cert_requests_url, headers=headers, json=payload) data = response.json() cert_requests += data['certificateRequests'] return cert_requests try: cert_requests_json = getCertRequests() certificate_requests_df = pd.json_normalize(cert_requests_json).convert_dtypes() certificate_requests_df.rename(columns = {'id':'cert_request_id', 'creationDate':'cert_request_creationDate'}, inplace = True) certificate_requests_df.drop(['companyId'],axis=1,inplace=True) dfs['certificate_requests_df'] = certificate_requests_df except: pass # Certificate inventory cert_url = "https://api.venafi.cloud/outagedetection/v1/certificatesearch?ownershipTree=true&excludeSupersededInstances=false" def getCerts(): currentPage = 0 payload = { "paging": { "pageNumber": currentPage }} headers = { "accept": "application/json", "tppl-api-key": tlspc_api_key} response = requests.post(url=cert_url, headers=headers,json=payload) if(response.status_code != 200): raise Exception('Error retrieving certificates:' + "\n" + response.text + "\n=============\n") data = response.json() certs = data['certificates'] while data['count'] >= 1: currentPage += 1 payload['paging']['pageNumber'] = currentPage response = requests.post(url=cert_url, headers=headers,json=payload) data = response.json() certs += data['certificates'] return certs try: certs_json = getCerts() certificates_df = pd.json_normalize(certs_json).convert_dtypes() certificates_df.rename(columns = {'id':'certificateId'}, inplace = True) dfs['certificates_df'] = certificates_df except Exception: pass # Application Data and Formatting application_url = "https://api.venafi.cloud/outagedetection/v1/applications" headers = { "accept": "application/json", "tppl-api-key": tlspc_api_key } try: application_response = requests.get(application_url, headers=headers) application_df = pd.json_normalize(application_response.json()['applications']).convert_dtypes() cols = ['id', 'name', 'description', 'fullyQualifiedDomainNames', 'ipRanges', 'ports', 'modificationDate', 'creationDate','ownership.owningUsers', 'ownership.owningTeams'] for c in cols: df_cols = application_df.columns if c not in df_cols: application_df[c] = np.nan application_df_2 = application_df[['id', 'name', 'description', 'fullyQualifiedDomainNames', 'ipRanges', 'ports', 'modificationDate', 'creationDate','ownership.owningUsers', 'ownership.owningTeams']] # Flatten application owners and re-merge application_owners = pd.json_normalize(application_response.json()['applications'], record_path = ['ownerIdsAndTypes'], meta = ['id']).convert_dtypes() applications_df = pd.merge(application_df_2, application_owners, left_on = 'id', right_on = 'id') applications_df.rename(columns = {'id':'application_id', 'creationDate':'application_creationDate', 'modificationDate':'application_modificationDate'}, inplace = True) dfs['applications_df'] = applications_df except Exception: pass # User Data users_url = "https://api.venafi.cloud/v1/users" headers = { "accept": "application/json", "tppl-api-key": tlspc_api_key } try: users_response = requests.get(users_url, headers=headers) users_df = pd.json_normalize(users_response.json()['users']).convert_dtypes() users_df.rename(columns = {'id':'user_id'}, inplace = True) users_df.drop(['companyId'],axis=1,inplace=True) dfs['users_df'] = users_df except Exception: pass # Teams Data teams_url = "https://api.venafi.cloud/v1/teams" headers = { "accept": "application/json", "tppl-api-key": tlspc_api_key } try: teams_response = requests.get(teams_url, headers=headers) teams_df = pd.json_normalize(teams_response.json()['teams']).convert_dtypes() teams_df.rename(columns = {'id':'team_id', 'modificationDate':'teams_modificationDate'}, inplace = True) teams_df.drop(['companyId'],axis=1,inplace=True) dfs['teams_df'] = teams_df except Exception: pass # Machines Data machines_url = "https://api.venafi.cloud/v1/machines" headers = { "accept": "application/json", "tppl-api-key": tlspc_api_key } try: machines_response = requests.get(machines_url, headers=headers) machines_df = pd.json_normalize(machines_response.json()['machines']).convert_dtypes() machines_df.rename(columns = {'id':'machine_id', 'creationDate':'machine_creationDate', 'modificationDate':'machine_modificationDate'}, inplace = True) machines_df.drop(['companyId'],axis=1,inplace=True) dfs['machines_df'] = machines_df except Exception: pass # Machine Identities Data machine_identities_url = "https://api.venafi.cloud/v1/machineidentities" headers = { "accept": "application/json", "tppl-api-key": tlspc_api_key } try: machine_identities_response = requests.get(machine_identities_url, headers=headers) machine_identities_df = pd.json_normalize(machine_identities_response.json()['machineIdentities']).convert_dtypes().iloc[:,:7] machine_identities_df.rename(columns = {'machineId':'machine_id', 'id':'machine_identity_id', 'creationDate':'machine_identity_creationDate', 'modificationDate':'machine_identities_modificationDate'}, inplace = True) machine_identities_df.drop(['companyId'],axis=1,inplace=True) dfs['machine_identities_df'] = machine_identities_df except Exception: pass # Issuing Templates issuing_template_url = "https://api.venafi.cloud/v1/certificateissuingtemplates" headers = { "accept": "application/json", "tppl-api-key": tlspc_api_key } try: issuing_template_response = requests.get(issuing_template_url, headers=headers) issuing_templates_df = pd.json_normalize(issuing_template_response.json()['certificateIssuingTemplates']).convert_dtypes() issuing_templates_df.rename(columns = {'id':'issuing_template_id', 'creationDate':'issuing_template_creationDate'}, inplace = True) issuing_templates_df.drop(['companyId'],axis=1,inplace=True) dfs['issuing_templates_df'] = issuing_templates_df except Exception: pass # Remove dictionary objects that aren't dataframes def dataframe_filtering(pair): key, value = pair if isinstance(value, pd.DataFrame): return True else: return False dfs = dict(filter(dataframe_filtering, dfs.items())) # Prompt Engineering # Get data structure for each dataframe to be passed in initial prompt dict_data_types = {} for i in dfs.keys(): dict_data_types[i] = dfs[i].dtypes.apply(lambda x: x.name).to_dict() data_structure_overview = 'I have multiple python pandas dataframes.' for i in dict_data_types: data_structure_overview += str('\nOne is named ' + i + ' and contains ' + i.split('df')[0].replace('_',' ') + ' information and has the following structure: ' + str(dict_data_types[i])) # Define the relationships of the dataframes to eachother data_relationships_overview = "The dataframes relate to eachother in the following manner:" if 'users_df' in dfs.keys() and 'applications_df' in dfs.keys(): data_relationships_overview += "/nThe column values in the 'user_id' column in users_df match the column values in the 'ownerId' column in application_df." if 'teams_df' in dfs.keys() and 'machines_df' in dfs.keys(): data_relationships_overview += "/nThe column values in the 'team_id' column in teams_df match the column values in the 'owningTeamId' column in machines_df." if 'certificate_requests_df' in dfs.keys() and 'users_df' in dfs.keys(): data_relationships_overview += "/nThe column values in the 'certificateOwnerUserId' column in cert_requests_df match the column values in the 'user_id' column in users_df." if 'certificate_requests_df' in dfs.keys() and 'issuing_templates_df' in dfs.keys(): data_relationships_overview += "/nThe column values in the 'certificateIssuingTemplateId' column in cert_requests_df match the column values in the 'issuing_template_id' column in issuing_templates_df." if 'machine_identities_df' in dfs.keys() and 'certificates_df' in dfs.keys(): data_relationships_overview += "/nThe column values in the 'certificateId' column in machine_identities_df match the column values in the 'certificateId' column in certificates_df." if 'machine_identities_df' in dfs.keys() and 'machines_df' in dfs.keys(): data_relationships_overview += "/nThe column values in the 'machine_id' column in machine_identities_df match the column values in the 'machine_id' column in machines_df." # If the data relationship overview is blank, just use a '' to avoid confusing the LLM if data_relationships_overview == "The dataframes relate to eachother in the following manner:": data_relationships_overview = '' dfs['data_structure_overview'] = data_structure_overview dfs['data_relationships_overview'] = data_relationships_overview passed_args = dfs return 'Data successfully loaded!', passed_args # User facing application with gr.Blocks(theme='aliabid94/new-theme') as demo: #gr.Image('https://design.venafi.com/dist/svg/logos/venafi/logo-venafi-combo.svg', height = 50, width = 200, gr.Image('V Experimental.svg', height=50, width=225, show_share_button = False, show_download_button = False, show_label = False, container=False) gr.Markdown("**Vikram Explorer** is an entirely new way to get answers and insights to solve machine identity management problems. Release of this opensource project under Apache 2.0 license is part of Venafi Athena for Community initiatives.") with gr.Tab('Get Started!'): gr.Markdown(""" This is an experimental opensource project. It combines TLS Protect Cloud’s modern APIs and data with the intelligence of OpenAI ChatGPT to answer questions and give new insights. The project uses in-memory data analysis and on-the-fly AI code generation to answer questions, and keep data outside of OpenAI. At just over 300 lines of code, it's simplicity shows the future potential for AI. To get started, navigate to the API Keys tab. This will connect to your TLS Protect Cloud and OpenAI accounts. Signup for TLS Protect Cloud at [venafi.com/signup](https://venafi.com/signup). Use of this project either when run locally in your environment or on Hugging Face may introduce risks. Running this project accesses data from your TLS Protect Cloud account. While this project does not store your TLS Protect Cloud data or send that data to OpenAI, the prompts entered are sent to OpenAI for Python code generation. As with every opensource project, application, or online service that uses your API keys, you are strongly recommended to rotate your API keys after use. Venafi does not track use of this project on Hugging Face or collect or process any data of project users. """) with gr.Tab("API Keys"): tlspc_api_key = gr.Textbox(label = 'Please provide your TLS Protect Cloud API Key:', type = 'password') openai_api_key = gr.Textbox(label = 'Please provide your OpenAI API Key:', type = 'password', placeholder = 'Note: To use the OpenAI API, you need a paid account') api_key_output = gr.Textbox(label = 'Result') load_button = gr.Button('Load TLS Protect Cloud Data') with gr.Tab("Answer Questions"): #prompt_tlspc_key = gr.Textbox(label = 'Please provide your TLS Protect Cloud API Key:') prompt_questions = gr.Textbox(label = 'Input prompt here:', placeholder = "Try something like 'What is the name of the issuing template that has been used to request the most certificates?'") text_output = gr.Textbox(label = 'Response:') text_button = gr.Button("Submit") with gr.Tab("Visualize Answers"): prompt_reporting = gr.Textbox(label = 'Input prompt here:', placeholder = "Try something like 'Plot a line chart of certificate issuances over time'") chart_output = gr.Plot(label = 'Output:') chart_button = gr.Button("Submit") gr.Markdown("Reminder: This is an experimental project to test new capabilities. Take care and understand risks of using API keys and accessing data especially if running from Hugging Face. Rotate keys after use.") passed_args = gr.State([]) text_button.click(prompt_analyze_questions, inputs=[prompt_questions,passed_args], outputs=text_output) chart_button.click(prompt_analyze_reporting, inputs=[prompt_reporting,passed_args], outputs=chart_output) load_button.click(getData, inputs=[tlspc_api_key, openai_api_key], outputs=[api_key_output,passed_args]) demo.launch(show_error = True)