efdemo001A / app.py
allinaigc's picture
Upload app.py
f7167fd verified
"""
1. 支持所有多个文件类型的大模型批处理功能,可以清洗文本数据,提取关键信息,并生成标准格式的输出文件。
1. 支持多种文件类型的上传,包括pdf, docx, xlsx, csv, json等。
1. Streamlit不支持上传一个文件夹,可以用ctrl+A上传所有文件。会自动显示上传的文件名字。
错误信息:
1. 如果上传的单个文件中的内容超过大模型的上下文,可能会报错。需要确认文件内容,或者需要更换长文大模型。
1. 纯txt文件中可能会因为Encoding(utf-8)的问题而导致部分失败,PDF和JSON文件可以基本保证成功。
"""
##TODO: 1. 每次调用新的csv文件。
# -*- coding: utf-8 -*-
import streamlit as st
import openai
import os
import numpy as np
import pandas as pd
import json
import csv
import tempfile
from tempfile import NamedTemporaryFile
import pathlib
from pathlib import Path
import re
from re import sub
from itertools import product
import time
from time import sleep
from datetime import datetime
import streamlit_authenticator as stauth
# from langchain_community.vectorstores import FAISS
# from langchain.embeddings.huggingface import HuggingFaceEmbeddings
# from langchain_core.output_parsers import StrOutputParser
# from langchain_core.runnables import RunnablePassthrough
from langchain.llms.base import LLM
from langchain.llms.utils import enforce_stop_tokens
from typing import Dict, List, Optional, Tuple, Union
import requests
import streamlit as st
# import rag_reponse_002
import dashscope
from dotenv import load_dotenv
from datetime import datetime
import pytz
from pytz import timezone
from datetime import date
import qwen_response
from save_info import save_csv_info
import streamlit_ext as ste ##TODO: 为了点击download button后保持页面。
import create_newfile
## 获得程序运行的当前时间
def get_current_time():
beijing_tz = timezone('Asia/Shanghai')
beijing_time = datetime.now(beijing_tz)
current_time = beijing_time.strftime('%H:%M:%S')
return current_time
load_dotenv()
### 设置openai的API key
os.environ["OPENAI_API_KEY"] = os.environ['user_token']
openai.api_key = os.environ['user_token']
bing_search_api_key = os.environ['bing_api_key']
dashscope.api_key = os.environ['dashscope_api_key']
### Streamlit页面设定。
st.set_page_config(layout="wide", page_icon="🌀", page_title="人工智能大模型的智能信息探索平台")
st.title("EF教育人工智能大模型文本清洗与挖掘平台")
# st.title("人工智能大模型文本清洗与挖掘平台(可内网部署)")
# st.subheader("Large Language Model-based Knowledge Base QA System")
# st.warning("_声明:内容由人工智能生成,仅供参考。如果您本人使用或对外传播本服务生成的输出,您应当主动核查输出内容的真实性、准确性,避免传播虚假信息。_")
# st.info("_声明:内容由人工智能生成,仅供参考。如果您本人使用或对外传播本服务生成的输出,您应当主动核查输出内容的真实性、准确性,避免传播虚假信息。_")
st.write("_声明:内容由人工智能生成,仅供参考。如果您本人使用或对外传播本服务生成的输出,您应当主动核查输出内容的真实性、准确性,避免传播虚假信息。_")
# st.divider()
### authentication with a local yaml file.
import yaml
from yaml.loader import SafeLoader
with open('./config.yaml') as file:
config = yaml.load(file, Loader=SafeLoader)
authenticator = stauth.Authenticate(
config['credentials'],
config['cookie']['name'],
config['cookie']['key'],
config['cookie']['expiry_days'],
config['preauthorized']
)
user, authentication_status, username = authenticator.login('main')
# user, authentication_status, username = authenticator.login('用户登录', 'main')
## 清楚所有对话记录。
def clear_all():
st.session_state.conversation = None
st.session_state.chat_history = None
st.session_state.messages = []
message_placeholder = st.empty()
## 只用这一个就可以了。
# st.rerun()
return None
if authentication_status:
with st.sidebar:
st.markdown(
"""
<style>
[data-testid="stSidebar"][aria-expanded="true"]{
min-width: 400px;
max-width: 400px;
}
""",
unsafe_allow_html=True,
)
### siderbar的题目。
### siderbar的题目。
# st.header(f'**大语言模型专家系统工作设定区**')
st.header(f'**欢迎 **{username}** 使用本系统** ')
st.write(f'_Large Language Model Expert System Environment_')
# st.write(f'_Welcome and Hope U Enjoy Staying Here_')
authenticator.logout('登出', 'sidebar')
### 提交请求
submit_btn = st.sidebar.button("开始执行操作", use_container_width=True, type='primary')
### 清除记录,重启一轮新对话。
st.sidebar.button("清除记录,重启一个新任务", on_click=clear_all, use_container_width=True, type='secondary')
### 定义任务目标
task_goal = st.selectbox(label="**任务目标**", options=['文本内容清洗', '人才资料甄选', '销售机会挖掘', '日志文件分析', '舆情内容分析', '潜在投诉挖掘', '合规稽核纠偏'], index=0)
match task_goal:
case '文本内容清洗':
prompt_sys = """你是一个文本内容清洗专家。你需要完成我给你的任务。"""
case '人才资料甄选':
prompt_sys = """你是一个人才资料甄选专家。你需要完成我给你的任务。"""
case '销售机会挖掘':
prompt_sys = """你是一个销售机会挖掘专家。你需要完成我给你的任务。"""
case '日志文件分析':
prompt_sys = """你是一个日志文件分析专家。你需要完成我给你的任务。"""
case '舆情内容分析':
prompt_sys = """你是一个舆情内容分析专家。你需要完成我给你的任务。"""
case '潜在投诉挖掘':
prompt_sys = """你是一个潜在投诉挖掘专家。你需要完成我给你的任务。"""
case '合规稽核纠偏':
prompt_sys = """你是一个合规稽核纠偏专家。你需要完成我给你的任务。"""
## 在sidebar上的三个分页显示,用st.tabs实现。
tab_1, tab_2, tab_4 = st.tabs(['使用须知', '模型参数', '系统角色设定'])
# tab_1, tab_2, tab_3, tab_4 = st.tabs(['使用须知', '模型参数', '提示词模板', '系统角色设定'])
# with st.expander(label='**使用须知**', expanded=False):
with tab_1:
# st.markdown("#### 快速上手指南")
with st.text(body="说明"):
st.markdown("* 重启一个新任务时,只需要刷新页面(按Ctrl/Command + R)即可。")
with st.text(body="说明"):
st.markdown("* 为了保护数据与隐私,所有对话均不会被保存,刷新页面立即删除。敬请放心。")
# with st.text(body="说明"):
# st.markdown("* “GPT-4”回答质量极佳,但速度缓慢,建议适当使用。")
with st.text(body="说明"):
st.markdown("* 现有仅限一次任务执行,将不会保持之前的任务结果记录。")
with st.text(body="说明"):
st.markdown("""* 系统的工作流程如下:
1. 用户提交待处理的文件。
1. 系统将问题转换为机器可理解的格式。
1. 系统使用大语言模型来进行全量信息探索。
1. 系统使用内置的智能系统来进行基于文本高维特征的探索。
1. 系统返回完整且准确的答案。""")
## 大模型参数
# with st.expander(label='**大语言模型参数**', expanded=True):
with tab_2:
max_tokens = st.slider(label='Max_Token(生成结果时最大字数)', min_value=100, max_value=8096, value=4096,step=100)
temperature = st.slider(label='Temperature (温度)', min_value=0.0, max_value=1.0, value=0.8, step=0.1)
top_p = st.slider(label='Top_P (核采样)', min_value=0.0, max_value=1.0, value=0.6, step=0.1)
frequency_penalty = st.slider(label='Frequency Penalty (重复度惩罚因子)', min_value=-2.0, max_value=2.0, value=1.0, step=0.1)
presence_penalty = st.slider(label='Presence Penalty (控制主题的重复度)', min_value=-2.0, max_value=2.0, value=1.0, step=0.1)
# with tab_3:
# # st.markdown("#### Prompt提示词参考资料")
# # with st.expander(label="**大语言模型基础提示词Prompt示例**", expanded=False):
# st.code(
# body="我是一个企业主,我需要关注哪些“存货”相关的数据资源规则?", language='plaintext')
# st.code(
# body="作为零售商,了解哪些关键的库存管理指标对我至关重要?", language='plaintext')
# st.code(body="企业主在监控库存时,应如何确保遵守行业法规和最佳实践?",
# language='plaintext')
# st.code(body="在数字化时代,我应该关注哪些技术工具或平台来优化我的库存数据流程?", language='plaintext')
# st.code(body="我应该如何定期审查和分析这些库存数据以提高运营效率?", language='plaintext')
# st.code(body="如何设置预警系统来避免过度库存或缺货情况?", language='plaintext')
with tab_4:
st.text_area(label='系统角色设定', value='你是一个人工智能,你需要回答我提出的问题,或者完成我交代的任务。你需要使用我提问的语言(如中文、英文)来回答。', height=200, label_visibility='hidden')
elif authentication_status == False:
st.error('⛔ 用户名或密码错误!')
elif authentication_status == None:
st.warning('⬆ 请先登录!')
### 上传文件的模块
def upload_file(uploaded_file):
if uploaded_file is not None:
# filename = uploaded_file.name
# st.write(filename) # print out the whole file name to validate. not to show in the final version.
try:
# if '.pdf' in filename: ### original code here.
if '.pdf' in uploaded_file.name:
pdf_filename = uploaded_file.name ### original code here.
# print('PDF file:', pdf_filename)
# with st.status('正在为您解析新知识库...', expanded=False, state='running') as status:
spinner = st.spinner('正在为您解析新知识库...请耐心等待')
with spinner:
uploaded_file_name = "File_provided"
temp_dir = tempfile.TemporaryDirectory()
# ! working.
uploaded_file_path = pathlib.Path(temp_dir.name) / uploaded_file_name
with open(pdf_filename, 'wb') as output_temporary_file:
# with open(f'./{username}_upload.pdf', 'wb') as output_temporary_file: ### original code here. 可能会造成在引用信息来源时文件名不对的问题。
# ! 必须用这种格式读入内容,然后才可以写入temporary文件夹中。
# output_temporary_file.write(uploaded_file.getvalue())
output_temporary_file.write(uploaded_file.getvalue())
return pdf_filename
else:
# if '.csv' in filename: ### original code here.
if '.csv' in uploaded_file.name:
print('start the csv file processing...')
csv_filename = uploaded_file.name
filename = uploaded_file.name
csv_file = pd.read_csv(uploaded_file)
csv_file.to_csv(f'./{username}/{username}_upload.csv', encoding='utf-8', index=False)
st.write(csv_file[:3]) # 这里只是显示文件,后面需要定位文件所在的绝对路径。
elif '.txt' in uploaded_file.name:
print('start the txt file processing...')
txt_filename = uploaded_file.name
filename = uploaded_file.name
txt_file = uploaded_file.getvalue()
file = open(f"{txt_filename}", 'rb')
content = file.read()
content = file.split(b'\n')
# with open(file=f'./{username}/{username}_upload.txt', mode='rb', encoding='utf-8') as output_temporary_file:
# output_temporary_file.write(txt_file)
st.write(file)
### json格式文件
elif '.json' in uploaded_file.name:
print('start the json file processing...')
json_filename = uploaded_file.name
json_file = uploaded_file.getvalue()
# file = open(f"{txt_filename}", 'rb')
# content = file.read()
# content = file.split(b'\n')
# with open(file=f'./{username}/{username}_upload.txt', mode='rb', encoding='utf-8') as output_temporary_file:
# output_temporary_file.write(txt_file)
# st.write(json_filename)
# else:
# xls_file = pd.read_excel(uploaded_file)
# xls_file.to_csv(f'./{username}_upload.csv', index=False)
# st.write(xls_file[:3])
print('end the file processing...')
# uploaded_file_name = "File_provided"
# temp_dir = tempfile.TemporaryDirectory()
# ! working.
# uploaded_file_path = pathlib.Path(temp_dir.name) / uploaded_file_name
# with open('./upload.csv', 'wb') as output_temporary_file:
# with open(f'./{username}_upload.csv', 'wb') as output_temporary_file:
# print(f'./{name}_upload.csv')
# ! 必须用这种格式读入内容,然后才可以写入temporary文件夹中。
# output_temporary_file.write(uploaded_file.getvalue())
# st.write(uploaded_file_path) #* 可以查看文件是否真实存在,然后是否可以
except Exception as e:
# st.write(e)
pass
## 以下代码是为了解决上传文件后,文件路径和文件名不对的问题。
# uploaded_file_name = "File_provided"
# temp_dir = tempfile.TemporaryDirectory()
# # ! working.
# uploaded_file_path = pathlib.Path(temp_dir.name) / uploaded_file_name
# # with open('./upload.csv', 'wb') as output_temporary_file:
# with open(f'./{name}_upload.csv', 'wb') as output_temporary_file:
# # print(f'./{name}_upload.csv')
# # ! 必须用这种格式读入内容,然后才可以写入temporary文件夹中。
# # output_temporary_file.write(uploaded_file.getvalue())
# output_temporary_file.write(uploaded_file.getvalue())
# # st.write(uploaded_file_path) # * 可以查看文件是否真实存在,然后是否可以
# # st.write('Now file saved successfully.')
# return pdf_filename, csv_filename
return None
## streamlit中显示上传文件的模块
try:
### 目前docx模块在huggingface的python3.10报错。暂时不支持docx文件。
uploaded_file = st.file_uploader(
"选择需要处理的文件(注:可一次选择多个文件)", type=(["txt", "PDF", "CSV", "xlsx","xls","json"]), accept_multiple_files=True)
## 获得上传所有文件的大小。
uploaded_filesize = round(sum(file.size for file in uploaded_file) / 1000, 2)
#总共上传了{}个文件。'.format(len(uploaded_file))) ### 显示上传了多少文件。
# 默认状态下没有上传文件,None,会报错。需要判断。
if uploaded_file:
## 显示上传文件的信息。
metric_col1, metric_col2, metric_col3, metric_col4 = st.columns(4)
metric_col1.metric(label='上传的文件数', value=f"{len(uploaded_file)}个", delta=None)
metric_col2.metric(label='上传文件的大小', value=f"{uploaded_filesize} KB", delta=None)
metric_col3.metric(label='上传文件的时间', value=f"{get_current_time()}", delta=None)
metric_col4.metric(label='当前日期', value=f"{str(date.today())}", delta=None)
# # uploaded_file_path = upload_file(uploaded_file)
# upload_file(uploaded_file)
except Exception as e:
print(e)
pass
st.divider()
### 提示词部分。
with st.expander(label='**标准模块**', expanded=True):
col_1, col_2, col_3, col_4 = st.columns(4)
with col_1:
prompt_input = st.text_area(label='**原始文件的说明**', value="""内容是一通完整的邀约中心电话记录。其中说话人2是公司邀约专员,说话人1是客户。""", height=200, label_visibility='visible')
with col_2:
prompt_caution = st.text_area(label='**注意事项**', value='忽略所有语法错误和错别字。',height=200, label_visibility='visible')
with col_3:
prompt_rule = st.text_area(label='**规则定义**', value='无特定规则要求。',height=200, label_visibility='visible')
with col_4:
prompt_output = st.text_area(label='**输出结果的要求**', value=
"""【邀约的整体情况】用一句话来简单概述判断这一通话的整体情况。
【客户的关注点】 客户关注点是哪些?
【是否邀约成功】判断邀约是否成功,即客户是否愿意来参加活动。
""", height=200, label_visibility='visible')
# st.write(f"{prompt_explain} {prompt_notice} {prompt_rule} {prompt_ouput}")
### 专业模式部分。
with st.expander(label='**定制模块**', expanded=False):
# settings_col_1, settings_col_2, settings_col_3, settings_col_4, settings_col_5 = st.columns(5)
settings_col_1, settings_col_2, settings_col_3 = st.columns(3)
settings_col_1.toggle('高质量模式', value=False, key='high_end_mode')
settings_col_2.toggle('长文模式', value=False, key='length_context_mode')
settings_col_3.toggle('强力模式', value=False, key='powerful_mode')
# with settings_col_1:
# settings_col_1.toggle('高质量模式', value=False, key='high_end_mode')
# with settings_col_2:
# settings_col_2.toggle('长文模式', value=False, key='length_context_mode')
# with settings_col_3:
# settings_col_3.toggle('强力模式', value=False, key='powerful_mode')
### rag设定
rag_col_1, rag_col_2 = st.columns(2)
### 爬虫网站
target_url = rag_col_1.text_input('目标网址', value='https://www.123abc.com', label_visibility='visible', disabled=True)
### 加载知识来源
target_database = rag_col_2.multiselect(label='信息增强', options=['互联网', '知识库', '规则库', '案例库'], default=['规则库'], disabled=True)
### 各类高级设定
advance_col_1, advance_col_2, advance_col_3, advance_col_4, advance_col_5 = st.columns(5)
with advance_col_1:
prompt_explain = st.text_area(label='筛选规则定制', value='', height=200, label_visibility='visible', disabled=True)
with advance_col_2:
prompt_notice = st.text_area(label='敏感词制定', value='',height=200, label_visibility='visible', disabled=True)
with advance_col_3:
prompt_rule = st.text_area(label='过滤名单', value='',height=200, label_visibility='visible', disabled=True)
with advance_col_4:
prompt_ouput = st.text_area(label='词云频率设定', value='',height=200, label_visibility='visible', disabled=True)
with advance_col_5:
prompt_ouput = st.text_area(label='其他设定', value='',height=200, label_visibility='visible', disabled=True)
# st.write(f"{prompt_explain} {prompt_notice} {prompt_rule} {prompt_ouput}")
### 以下是工作区,包括进度等。
st.divider()
### prompt区
# prompt_sys = """你是一个法律专家。你需要完成我给你的任务。"""
# prompt_input = """我给你的数据中包括了一通完整的法律援助中心的电话记录,其中的数据格式说明如下:
# 1. "input"和“instruction”都代表了用户的问题,可以忽略“instruction”部分的内容。
# 2. “output”是法律顾问的回答内容。"""
# prompt_caution = """你忽略所有语法错误和错别字。"""
# prompt_output = """现在,我需要你帮忙我整理这通电话的内容,用如下格式(你只需要提供以下格式要求的内容,不需要输出任何其他说明或者解释。):
# 【用户问题】用一段话来总结用户的核心问题。
# 【法律顾问回答】用一段话来总结法律顾问的回答要点。
# 【整体情况】用一句话来简单概述判断这一通话的整体情况。
# 【是否解决】判断问题是否解决。
import st_data_parser
### 用LLM总结语料的函数 , output_filepath是每次都会创建一个新的csv结果文件。
def llm_summary(file, file_content, output_filepath):
# call_content = file_content
# call_content = pd.read_json(call_content.decode('utf-8'))
### 通过自定义的data_parser解析文件内容。
print('file:', file)
call_content = st_data_parser.parser(file=file)
print('call_content:', call_content)
### a simple user prompt test.
## user_prompt = f"""
# 我给你的数据中包括了一通完整的法律援助中心的电话记录,其中的数据格式说明如下:
# 1. "input"和“instruction”都代表了用户的问题,可以忽略“instruction”部分的内容。
# 2. “output”是法律顾问的回答内容。
# 现在,我需要你帮忙我整理这通电话的内容,用如下格式(你只需要提供以下格式要求的内容,不需要输出任何其他说明或者解释。):
# 【用户问题】用一段话来总结用户的核心问题。
# 【法律顾问回答】用一段话来总结法律顾问的回答要点。
# 【整体情况】用一句话来简单概述判断这一通话的整体情况。
# 【是否解决】判断问题是否解决。
# 数据内容如下:{call_content}
# """
user_prompt = prompt_sys + prompt_rule + prompt_input + prompt_caution + """我需要你帮忙我整理这通电话的内容,用如下格式(你只需要提供以下格式要求的内容,不需要输出任何其他说明或者解释。):\n""" + prompt_output + f"""数据内容如下:f{call_content}"""
print("---"*30)
print('user_prompt:', user_prompt)
llm_output = qwen_response.call_with_messages(prompt=user_prompt)
# summary = chatgpt.chatgpt(user_prompt=user_prompt)
# print(summary)
## 将文件编号和summary存入summary.csv文件
# summary_csv = pd.read_csv('./summary_qwen.csv')
summary_csv = pd.read_csv('./summary_qwen.csv', encoding='utf-8')
# summary_csv = pd.read_csv('./summary_qwen.csv', encoding='utf-8', low_memory=True)
print('summary_csv:', summary_csv)
print("---"*30)
filename = os.path.basename(file.name) # Get the filename from the file path
# filename = os.path.basename(file_path) # Get the filename from the file path
filename_without_extension = os.path.splitext(filename)[0]
### 每次运行都创建一个新的oupout文件。
# output_filepath = create_newfile.new_output_file(username)
final_output_filepath = f'{output_filepath}'
# final_filepath = f'./{username}/output.csv'
# final_filepath = './summary_qwen.csv'
save_csv_info(filepath=final_output_filepath, ID=filename_without_extension, output=llm_output)
return None
### 主程序main
def main(uploaded_file=uploaded_file):
output_filepath = create_newfile.new_output_file(username)
## 在存在上传文件时,启动LLM程序。
if uploaded_file and submit_btn:
### Progress bar, 进度条设定
progress_bar_text = "**正在处理您的任务...**"
progress_bar = st.progress(0, text=progress_bar_text)
###记录运行时间
start_time = time.time()
### 记录正常运行的文件数,和缺失的文件数
success_files = []
fail_files = []
for i, file in enumerate(uploaded_file):
print(f'正在处理第{i+1}个文件', file.name)
try:
llm_summary(file=file, file_content=file.read(), output_filepath=output_filepath) ## 核心程序。
success_files.append(file.name)
except Exception as e:
print(e)
fail_files.append(file.name)
pass
progress_bar.progress((i+1)/len(uploaded_file), text=progress_bar_text) ## 展示进度条。
progress_bar = st.empty ## 重置进度条。
final_data = pd.read_csv(f'{output_filepath}', encoding='utf-8') ## 在ste.download_button处需要先获得data内容,这里将data进行赋值。
# final_data = pd.read_csv(f'./{username}/output.csv', encoding='utf-8') ## 在ste.download_button处需要先获得data内容,这里将data进行赋值。
end_time = time.time()
run_time = round((end_time - start_time),2)
### 输出结果。
st.success(f"任务结束!请点击下方按钮保存结果文件。总运行时长{run_time}秒。成功处理了{len(success_files)}个文件;未完成文件数{len(fail_files)}个。", icon='💯')
if len(fail_files) > 0:
failed_file_elements = ', '.join(fail_files) ## 无法直接放入f""中。
st.warning(f"未完成的文件如下:{failed_file_elements}") ## 一次性打印出每个列表元素。1
# st.download_button(label='下载输出文件!', data=final_data, file_name='summary_qwen.csv', mime='text/csv')
ste.download_button(
label="点击下载结果文件",
data=final_data,
file_name='final_data.csv',
mime='text/csv',
)
return None
if __name__ == '__main__':
main()