|
import os |
|
import requests |
|
from bs4 import BeautifulSoup |
|
from urllib.parse import urljoin |
|
import pandas as pd |
|
import numpy as np |
|
import zipfile |
|
import textract |
|
import gradio as gr |
|
import shutil |
|
from pypdf import PdfReader |
|
|
|
def browse_folder(url): |
|
if url.lower().endswith(('docs', 'docs/')): |
|
return gr.update(choices=[]) |
|
response = requests.get(url) |
|
response.raise_for_status() |
|
|
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
|
excel_links = [a['href'] + '/' for a in soup.find_all('a', href=True) if a['href'].startswith(url)] |
|
|
|
return gr.update(choices=excel_links) |
|
|
|
|
|
|
|
def extract_statuses(url): |
|
|
|
response = requests.get(url) |
|
|
|
|
|
soup = BeautifulSoup(response.content, 'html.parser') |
|
|
|
|
|
links = soup.find_all('a') |
|
|
|
|
|
for link in links: |
|
href = link.get('href') |
|
if href and (href.endswith('.xls') or href.endswith('.xlsx')): |
|
excel_url = href if href.startswith('http') else url + href |
|
excel_response = requests.get(excel_url) |
|
file_name = 'guide_status.xlsx' |
|
|
|
|
|
with open(file_name, 'wb') as f: |
|
f.write(excel_response.content) |
|
|
|
|
|
df = pd.read_excel(file_name) |
|
|
|
|
|
if 'TDoc Status' in df.columns: |
|
unique_statuses = df['TDoc Status'].unique().tolist() |
|
print(f'Downloaded {file_name} and extracted statuses: {unique_statuses}') |
|
|
|
|
|
if 'withdrawn' in unique_statuses: |
|
unique_statuses.remove('withdrawn') |
|
return gr.update(choices=unique_statuses, value=unique_statuses) |
|
else: |
|
print(f"'TDoc Status' column not found in {file_name}") |
|
return [] |
|
|
|
|
|
import os |
|
import requests |
|
from bs4 import BeautifulSoup |
|
import pandas as pd |
|
import gradio as gr |
|
|
|
|
|
def scrape(url, folder_name, status_list, sorted_files, progress=gr.Progress()): |
|
filenames = [] |
|
status_filenames = [] |
|
df = pd.DataFrame() |
|
excel_file = "guide_status.xlsx" |
|
|
|
print("Downloading zip files directly from the URL...") |
|
response = requests.get(url) |
|
soup = BeautifulSoup(response.content, 'html.parser') |
|
|
|
|
|
zip_links = [a['href'] for a in soup.find_all('a', href=True) if a['href'].endswith('.zip') ] |
|
|
|
sorted_files_tab = [] |
|
|
|
if len(sorted_files) != 0: |
|
for link in zip_links: |
|
for file in sorted_files: |
|
if file in link: |
|
sorted_files_tab.append(link) |
|
|
|
if len(sorted_files_tab) != 0: |
|
zip_links = sorted_files_tab |
|
|
|
|
|
status_filenames = [url + link if not link.startswith('http') else link for link in zip_links] |
|
print(f"Filenames from URL: {status_filenames}") |
|
|
|
download_directory = folder_name |
|
if not os.path.exists(download_directory): |
|
os.makedirs(download_directory) |
|
|
|
pourcentss = 0.05 |
|
|
|
|
|
|
|
|
|
for file_url in status_filenames: |
|
|
|
|
|
filename = os.path.basename(file_url) |
|
save_path = os.path.join(download_directory, filename) |
|
progress(pourcentss, desc='Downloading') |
|
pourcentss += 0.4 / max(len(status_filenames), 1) |
|
|
|
|
|
|
|
try: |
|
with requests.get(file_url, stream=True) as r: |
|
r.raise_for_status() |
|
with open(save_path, 'wb') as f: |
|
for chunk in r.iter_content(chunk_size=8192): |
|
f.write(chunk) |
|
except requests.exceptions.HTTPError as e: |
|
print(f"HTTP error occurred while downloading {file_url}: {e}") |
|
|
|
return True, len(status_filenames) |
|
|
|
|
|
|
|
|
|
|
|
|
|
def extractZip(url): |
|
|
|
nom_extract = url.split("/")[-3] + "_extraction" |
|
if os.path.exists(nom_extract): |
|
shutil.rmtree(nom_extract) |
|
extract_directory = nom_extract |
|
|
|
download_directory = url.split("/")[-3] + "_downloads" |
|
|
|
|
|
|
|
for zip_file in os.listdir(download_directory): |
|
zip_path = os.path.join(download_directory, zip_file) |
|
|
|
if zip_file.endswith(".zip"): |
|
extract_dir = os.path.join(extract_directory, os.path.splitext(zip_file)[0]) |
|
|
|
|
|
if os.path.exists(zip_path): |
|
|
|
if not os.path.exists(extract_dir): |
|
os.makedirs(extract_dir) |
|
|
|
|
|
try: |
|
with zipfile.ZipFile(zip_path, 'r') as zip_ref: |
|
zip_ref.extractall(extract_dir) |
|
|
|
print(f"Extraction terminée pour {zip_file}") |
|
except: |
|
print(f"Erreur: Extraction {zip_file}") |
|
else: |
|
print(f"Fichier zip {zip_file} introuvable") |
|
|
|
print("Toutes les extractions sont terminées !") |
|
|
|
|
|
def excel3gpp(url): |
|
response = requests.get(url) |
|
response.raise_for_status() |
|
|
|
|
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
|
|
|
excel_links = [a['href'] for a in soup.find_all('a', href=True) if a['href'].endswith(('.xlsx', '.xls'))] |
|
|
|
|
|
if excel_links: |
|
excel_url = excel_links[0] |
|
if not excel_url.startswith('http'): |
|
excel_url = os.path.join(url, excel_url) |
|
|
|
|
|
excel_response = requests.get(excel_url) |
|
excel_response.raise_for_status() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
nom_guide = 'guide.xlsx' |
|
if os.path.exists(nom_guide): |
|
os.remove(nom_guide) |
|
filepath = nom_guide |
|
|
|
|
|
with open(filepath, 'wb') as f: |
|
f.write(excel_response.content) |
|
print(f'Excel file downloaded and saved as: {filepath}') |
|
|
|
|
|
|
|
def replace_line_breaks(text): |
|
return text.replace("\n", "/n") |
|
|
|
def remod_text(text): |
|
return text.replace("/n", "\n") |
|
|
|
def update_excel(data, excel_file, url): |
|
new_df_columns = ["URL", "File", "Type", "Title", "Source", "Related WIs", "Status", "Content"] |
|
temp_df = pd.DataFrame(data, columns=new_df_columns) |
|
|
|
try: |
|
|
|
if os.path.exists(excel_file): |
|
old_df = pd.read_excel(excel_file) |
|
df = pd.concat([old_df, temp_df], axis=0, ignore_index=True) |
|
else: |
|
df = temp_df |
|
|
|
|
|
df.to_excel(excel_file, index=False) |
|
except Exception as e: |
|
print(f"Error updating Excel file: {e}") |
|
|
|
def extractionPrincipale(url, excel_file=None, status_list=None, progress=gr.Progress()): |
|
nom_download = url.split("/")[-3] + "_downloads" |
|
if os.path.exists(nom_download): |
|
shutil.rmtree(nom_download) |
|
folder_name = nom_download |
|
|
|
nom_status = url.split("/")[-3] + "_status.xlsx" |
|
if os.path.exists(nom_status): |
|
os.remove(nom_status) |
|
temp_excel = nom_status |
|
|
|
progress(0.0,desc='Downloading') |
|
|
|
|
|
sorted_files = [] |
|
|
|
try: |
|
guide_file = 'guide.xlsx' |
|
if os.path.exists(guide_file): |
|
dfStatus = pd.read_excel(guide_file) |
|
|
|
|
|
if len(dfStatus['TDoc Status'].unique().tolist()) != len (status_list): |
|
|
|
|
|
keys_statuses_filename = dfStatus['TDoc'].tolist() |
|
values_unique_statuses = dfStatus['TDoc Status'].tolist() |
|
|
|
doc_statuses = dict(zip(keys_statuses_filename, values_unique_statuses)) |
|
for key in doc_statuses.keys(): |
|
if doc_statuses[key] in status_list: |
|
sorted_files.append(key) |
|
|
|
print(sorted_files) |
|
except Exception as e: |
|
print(f"Not able to retrieve informations from 'guide.xlsx' ") |
|
|
|
result, count = scrape(url, folder_name, status_list, sorted_files) |
|
if result: |
|
print("Success") |
|
else: |
|
return(None) |
|
|
|
progress(0.4,desc='Extraction') |
|
extractZip(url) |
|
progress(0.5,desc='Extraction 2') |
|
excel3gpp(url) |
|
progress(0.6,desc='Creating Excel File') |
|
|
|
|
|
extract_directory = url.split("/")[-3] + "_extraction" |
|
TabCategories = ["URL", "File", "Title", "Source", "Related WIs", "Content"] |
|
categories = { |
|
"Other": TabCategories, |
|
"CR": TabCategories, |
|
"pCR": TabCategories, |
|
"LS": TabCategories, |
|
"WID": TabCategories, |
|
"SID": TabCategories, |
|
"DISCUSSION": TabCategories, |
|
"pdf": TabCategories, |
|
"ppt": TabCategories, |
|
"pptx": TabCategories |
|
} |
|
|
|
pourcents2=0.6 |
|
data = [] |
|
errors_count = 0 |
|
processed_count = 0 |
|
|
|
pre_title_section = None |
|
|
|
try: |
|
df = pd.read_excel(temp_excel) |
|
except Exception as e: |
|
print(f"Initializing a new DataFrame because: {e}") |
|
df = pd.DataFrame(columns=["URL", "File", "Type", "Title", "Source", "Related WIs","Status", "Content"]) |
|
|
|
for folder in os.listdir(extract_directory): |
|
folder_path = os.path.join(extract_directory, folder) |
|
if os.path.isdir(folder_path): |
|
for file in os.listdir(folder_path): |
|
progress(min(pourcents2,0.99),desc='Creating Excel File') |
|
pourcents2+=0.4/count |
|
|
|
|
|
if file == "__MACOSX": |
|
continue |
|
file_path = os.path.join(folder_path, file) |
|
if file.endswith((".pptx", ".ppt", ".pdf", ".docx", ".doc", ".DOCX")): |
|
try: |
|
text = textract.process(file_path).decode('utf-8') |
|
if file.endswith((".pdf")): |
|
pdfReader = PdfReader(file_path) |
|
except Exception as e: |
|
print(f"Error processing {file_path}: {e}") |
|
errors_count += 1 |
|
continue |
|
|
|
cleaned_text_lines = text.split('\n') |
|
cleaned_text = '\n'.join([line.strip('|').strip() for line in cleaned_text_lines if line.strip()]) |
|
|
|
title = "" |
|
debut = "" |
|
sections = cleaned_text.split("Title:") |
|
if len(sections) > 1: |
|
pre_title_section = sections[0].strip().split() |
|
title = sections[1].strip().split("\n")[0].strip() |
|
debut = sections[0].strip() |
|
|
|
category = "Other" |
|
if file.endswith(".pdf"): |
|
category = "pdf" |
|
elif file.endswith((".ppt", ".pptx")): |
|
category = "ppt" |
|
elif "CHANGE REQUEST" in debut: |
|
category = "CR" |
|
elif "Discussion" in title: |
|
category = "DISCUSSION" |
|
elif "WID" in title: |
|
category = "WID" |
|
elif "SID" in title: |
|
category = "SID" |
|
elif "LS" in title: |
|
category = "LS" |
|
elif pre_title_section and pre_title_section[-1] == 'pCR': |
|
category = "pCR" |
|
elif "Pseudo-CR" in title: |
|
category = "pCR" |
|
|
|
|
|
contenu = "" |
|
if category in categories: |
|
columns = categories[category] |
|
extracted_content = [] |
|
if category == "CR": |
|
reason_for_change = "" |
|
summary_of_change = "" |
|
if len(sections) > 1: |
|
reason_for_change = sections[1].split("Reason for change", 1)[-1].split("Summary of change")[0].strip() |
|
summary_of_change = sections[1].split("Summary of change", 1)[-1].split("Consequences if not")[0].strip() |
|
extracted_content.append(f"Reason for change: {reason_for_change}") |
|
extracted_content.append(f"Summary of change: {summary_of_change}") |
|
elif category == "pCR": |
|
if len(sections) > 1: |
|
pcr_specific_content = sections[1].split("Introduction", 1)[-1].split("First Change")[0].strip() |
|
extracted_content.append(f"Introduction: {pcr_specific_content}") |
|
elif category == "LS": |
|
overall_review = "" |
|
if len(sections) > 1: |
|
overall_review = sections[1].split("Overall description", 1)[-1].strip() |
|
extracted_content.append(f"Overall review: {overall_review}") |
|
elif category in ["WID", "SID"]: |
|
objective = "" |
|
start_index = cleaned_text.find("Objective") |
|
end_index = cleaned_text.find("Expected Output and Time scale") |
|
if start_index != -1 and end_index != -1: |
|
objective = cleaned_text[start_index + len("Objective"):end_index].strip() |
|
extracted_content.append(f"Objective: {objective}") |
|
elif category == "DISCUSSION": |
|
Discussion = "" |
|
extracted_text = replace_line_breaks(cleaned_text) |
|
start_index_doc_for = extracted_text.find("Document for:") |
|
if start_index_doc_for != -1: |
|
start_index_word_after_doc_for = start_index_doc_for + len("Document for:") |
|
end_index_word_after_doc_for = start_index_word_after_doc_for + extracted_text[start_index_word_after_doc_for:].find("/n") |
|
word_after_doc_for = extracted_text[start_index_word_after_doc_for:end_index_word_after_doc_for].strip() |
|
result_intro = '' |
|
result_conclusion = '' |
|
result_info = '' |
|
if word_after_doc_for.lower() == "discussion": |
|
start_index_intro = extracted_text.find("Introduction") |
|
end_index_intro = extracted_text.find("Discussion", start_index_intro) |
|
|
|
intro_text = "" |
|
if start_index_intro != -1 and end_index_intro != -1: |
|
intro_text = extracted_text[start_index_intro + len("Introduction"):end_index_intro].strip() |
|
result_intro = remod_text(intro_text) |
|
else: |
|
result_intro = "Introduction section not found." |
|
|
|
|
|
start_index_conclusion = extracted_text.find("Conclusion", end_index_intro) |
|
end_index_conclusion = extracted_text.find("Proposal", start_index_conclusion if start_index_conclusion != -1 else end_index_intro) |
|
|
|
conclusion_text = "" |
|
if start_index_conclusion != -1 and end_index_conclusion != -1: |
|
conclusion_text = extracted_text[start_index_conclusion + len("Conclusion"):end_index_conclusion].strip() |
|
result_conclusion = remod_text(conclusion_text) |
|
elif start_index_conclusion == -1: |
|
start_index_proposal = extracted_text.find("Proposal", end_index_intro) |
|
if start_index_proposal != -1: |
|
end_index_proposal = len(extracted_text) |
|
proposal_text = extracted_text[start_index_proposal + len("Proposal"):end_index_proposal].strip() |
|
result_conclusion = remod_text(proposal_text) |
|
else: |
|
result_conclusion = "Conclusion/Proposal section not found." |
|
else: |
|
|
|
conclusion_text = extracted_text[start_index_conclusion + len("Conclusion"):].strip() |
|
result_conclusion = remod_text(conclusion_text) |
|
Discussion=f"Introduction: {result_intro}\nConclusion/Proposal: {result_conclusion}" |
|
elif word_after_doc_for.lower() == "information": |
|
start_index_info = extracted_text.find(word_after_doc_for) |
|
if start_index_info != -1: |
|
info_to_end = extracted_text[start_index_info + len("Information"):].strip() |
|
result_info = remod_text(info_to_end) |
|
Discussion = f"Discussion:{result_info}" |
|
else: |
|
Discussion = "The word after 'Document for:' is not 'Discussion', 'DISCUSSION', 'Information', or 'INFORMATION'." |
|
else: |
|
Discussion = "The phrase 'Document for:' was not found." |
|
|
|
|
|
discussion_details = Discussion |
|
extracted_content.append(discussion_details) |
|
|
|
elif category == "pdf": |
|
try: |
|
tabLine = [] |
|
file = pdfReader |
|
pdfNumberPages = len(file.pages) |
|
words_limit = 1000 |
|
for pdfPage in range(0, pdfNumberPages): |
|
|
|
load_page = file.get_page(pdfPage) |
|
text = load_page.extract_text() |
|
lines = text.split("\n") |
|
sizeOfLines = len(lines) - 1 |
|
keyword = ["objective", "introduction", "summary", "scope"] |
|
|
|
for index, line in enumerate(lines): |
|
print(line) |
|
for key in keyword: |
|
line = line.lower() |
|
|
|
if key in line: |
|
print("Found keyword") |
|
lineBool = True |
|
lineIndex = index |
|
previousSelectedLines = [] |
|
stringLength = 0 |
|
linesForSelection = lines |
|
loadOnce = True |
|
selectedPdfPage = pdfPage |
|
|
|
while lineBool: |
|
print(lineIndex) |
|
if stringLength > words_limit or lineIndex < 0: |
|
lineBool = False |
|
else: |
|
if lineIndex == 0: |
|
print(f"Line index == 0") |
|
|
|
if pdfPage == 0: |
|
lineBool = False |
|
|
|
else: |
|
try: |
|
selectedPdfPage -= 1 |
|
newLoad_page = file.get_page(selectedPdfPage) |
|
newText = newLoad_page.extract_text() |
|
newLines = newText.split("\n") |
|
linesForSelection = newLines |
|
print(f"len newLines{len(newLines)}") |
|
lineIndex = len(newLines) - 1 |
|
except Exception as e: |
|
print(f"Loading previous PDF page failed") |
|
lineBool = False |
|
|
|
previousSelectedLines.append(linesForSelection[lineIndex]) |
|
stringLength += len(linesForSelection[lineIndex]) |
|
|
|
lineIndex -= 1 |
|
previousSelectedLines = ' '.join(previousSelectedLines[::-1]) |
|
|
|
lineBool = True |
|
lineIndex = index + 1 |
|
nextSelectedLines = "" |
|
linesForSelection = lines |
|
loadOnce = True |
|
selectedPdfPage = pdfPage |
|
|
|
while lineBool: |
|
|
|
if len(nextSelectedLines.split()) > words_limit: |
|
lineBool = False |
|
else: |
|
if lineIndex > sizeOfLines: |
|
lineBool = False |
|
|
|
if pdfPage == pdfNumberPages - 1: |
|
lineBool = False |
|
|
|
else: |
|
try: |
|
selectedPdfPage += 1 |
|
newLoad_page = file.get_page(selectedPdfPage) |
|
newText = newLoad_page.extract_text() |
|
newLines = newText.split("\n") |
|
linesForSelection = newLines |
|
lineIndex = 0 |
|
except Exception as e: |
|
print(f"Loading next PDF page failed") |
|
lineBool = False |
|
else: |
|
nextSelectedLines += " " + linesForSelection[lineIndex] |
|
lineIndex += 1 |
|
|
|
print(f"Previous Lines : {previousSelectedLines}") |
|
print(f"Next Lines : {nextSelectedLines}") |
|
selectedText = previousSelectedLines + ' ' + nextSelectedLines |
|
print(selectedText) |
|
tabLine.append([pdfPage, selectedText, key]) |
|
print(f"Selected line in keywords is: {line}") |
|
|
|
for r in tabLine: |
|
extracted_content.append(f'PDF Page number {r[0]} extracted text from the KEYWORD {r[2]} : \n') |
|
extracted_content.append(''.join(r[1])) |
|
except Exception as e: |
|
print(f"Error occured while extracting PDF content : {e}") |
|
|
|
contenu = "\n".join(extracted_content) |
|
|
|
|
|
|
|
source = "" |
|
RelatedWIs = "" |
|
status = "" |
|
data.append([url+ "/" + folder + '.zip', folder , category, title, source, RelatedWIs, status, contenu]) |
|
|
|
guide_file = 'guide.xlsx' |
|
if os.path.exists(guide_file): |
|
|
|
try: |
|
guide_df = pd.read_excel(guide_file, usecols=['Source', 'TDoc', 'Related WIs', 'TDoc Status']) |
|
|
|
|
|
tdoc_source_map = {row['TDoc']: row['Source'] for index, row in guide_df.iterrows()} |
|
tdoc_relatedWIs_map = {row['TDoc']: row['Related WIs'] for index, row in guide_df.iterrows()} |
|
tdoc_status_map = {row['TDoc']: row['TDoc Status'] for index, row in guide_df.iterrows()} |
|
|
|
for item in data: |
|
nom_du_fichier = item[1] |
|
if nom_du_fichier in tdoc_source_map: |
|
item[4] = tdoc_source_map[nom_du_fichier] |
|
item[5] = tdoc_relatedWIs_map[nom_du_fichier] |
|
item[6] = tdoc_status_map[nom_du_fichier] |
|
|
|
|
|
except Exception as e: |
|
print(f"An error occurred while processing {guide_file}: {e}") |
|
|
|
else: |
|
print(f"File {guide_file} not found. Skipping operations that require this file.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
processed_count += 1 |
|
|
|
|
|
if processed_count % 20 == 0: |
|
update_excel(data, temp_excel, url) |
|
print(f"Updated after processing {processed_count} files.") |
|
data = [] |
|
|
|
if data: |
|
|
|
update_excel(data, temp_excel, url) |
|
print(f"Final update after processing all files.") |
|
|
|
file_name = temp_excel |
|
|
|
return file_name |
|
|
|
|