from .logging_setup import logger from whisperx.utils import get_writer from .utils import remove_files, run_command, remove_directory_contents from typing import List import srt import re import os import copy import string import soundfile as sf from PIL import Image, ImageOps, ImageDraw, ImageFont punctuation_list = list( string.punctuation + "¡¿«»„”“”‚‘’「」『』《》()【】〈〉〔〕〖〗〘〙〚〛⸤⸥⸨⸩" ) symbol_list = punctuation_list + ["", "..", "..."] def extract_from_srt(file_path): with open(file_path, "r", encoding="utf-8") as file: srt_content = file.read() subtitle_generator = srt.parse(srt_content) srt_content_list = list(subtitle_generator) return srt_content_list def clean_text(text): # Remove content within square brackets text = re.sub(r'\[.*?\]', '', text) # Add pattern to remove content within tags text = re.sub(r'.*?', '', text) # Remove HTML tags text = re.sub(r'<.*?>', '', text) # Remove "♫" and "♪" content text = re.sub(r'♫.*?♫', '', text) text = re.sub(r'♪.*?♪', '', text) # Replace newline characters with an empty string text = text.replace("\n", ". ") # Remove double quotation marks text = text.replace('"', '') # Collapse multiple spaces and replace with a single space text = re.sub(r"\s+", " ", text) # Normalize spaces around periods text = re.sub(r"[\s\.]+(?=\s)", ". ", text) # Check if there are ♫ or ♪ symbols present if '♫' in text or '♪' in text: return "" text = text.strip() # Valid text return text if text not in symbol_list else "" def srt_file_to_segments(file_path, speaker=False): try: srt_content_list = extract_from_srt(file_path) except Exception as error: logger.error(str(error)) fixed_file = "fixed_sub.srt" remove_files(fixed_file) fix_sub = f'ffmpeg -i "{file_path}" "{fixed_file}" -y' run_command(fix_sub) srt_content_list = extract_from_srt(fixed_file) segments = [] for segment in srt_content_list: text = clean_text(str(segment.content)) if text: segments.append( { "text": text, "start": float(segment.start.total_seconds()), "end": float(segment.end.total_seconds()), } ) if not segments: raise Exception("No data found in srt subtitle file") if speaker: segments = [{**seg, "speaker": "SPEAKER_00"} for seg in segments] return {"segments": segments} # documents def dehyphenate(lines: List[str], line_no: int) -> List[str]: next_line = lines[line_no + 1] word_suffix = next_line.split(" ")[0] lines[line_no] = lines[line_no][:-1] + word_suffix lines[line_no + 1] = lines[line_no + 1][len(word_suffix):] return lines def remove_hyphens(text: str) -> str: """ This fails for: * Natural dashes: well-known, self-replication, use-cases, non-semantic, Post-processing, Window-wise, viewpoint-dependent * Trailing math operands: 2 - 4 * Names: Lopez-Ferreras, VGG-19, CIFAR-100 """ lines = [line.rstrip() for line in text.split("\n")] # Find dashes line_numbers = [] for line_no, line in enumerate(lines[:-1]): if line.endswith("-"): line_numbers.append(line_no) # Replace for line_no in line_numbers: lines = dehyphenate(lines, line_no) return "\n".join(lines) def pdf_to_txt(pdf_file, start_page, end_page): from pypdf import PdfReader with open(pdf_file, "rb") as file: reader = PdfReader(file) logger.debug(f"Total pages: {reader.get_num_pages()}") text = "" start_page_idx = max((start_page-1), 0) end_page_inx = min((end_page), (reader.get_num_pages())) document_pages = reader.pages[start_page_idx:end_page_inx] logger.info( f"Selected pages from {start_page_idx} to {end_page_inx}: " f"{len(document_pages)}" ) for page in document_pages: text += remove_hyphens(page.extract_text()) return text def docx_to_txt(docx_file): # https://github.com/AlJohri/docx2pdf update from docx import Document doc = Document(docx_file) text = "" for paragraph in doc.paragraphs: text += paragraph.text + "\n" return text def replace_multiple_elements(text, replacements): pattern = re.compile("|".join(map(re.escape, replacements.keys()))) replaced_text = pattern.sub( lambda match: replacements[match.group(0)], text ) # Remove multiple spaces replaced_text = re.sub(r"\s+", " ", replaced_text) return replaced_text def document_preprocessor(file_path, is_string, start_page, end_page): if not is_string: file_ext = os.path.splitext(file_path)[1].lower() if is_string: text = file_path elif file_ext == ".pdf": text = pdf_to_txt(file_path, start_page, end_page) elif file_ext == ".docx": text = docx_to_txt(file_path) elif file_ext == ".txt": with open( file_path, "r", encoding='utf-8', errors='replace' ) as file: text = file.read() else: raise Exception("Unsupported file format") # Add space to break segments more easily later replacements = { "、": "、 ", "。": "。 ", # "\n": " ", } text = replace_multiple_elements(text, replacements) # Save text to a .txt file # file_name = os.path.splitext(os.path.basename(file_path))[0] txt_file_path = "./text_preprocessor.txt" with open( txt_file_path, "w", encoding='utf-8', errors='replace' ) as txt_file: txt_file.write(text) return txt_file_path, text def split_text_into_chunks(text, chunk_size): words = re.findall(r"\b\w+\b", text) chunks = [] current_chunk = "" for word in words: if ( len(current_chunk) + len(word) + 1 <= chunk_size ): # Adding 1 for the space between words if current_chunk: current_chunk += " " current_chunk += word else: chunks.append(current_chunk) current_chunk = word if current_chunk: chunks.append(current_chunk) return chunks def determine_chunk_size(file_name): patterns = { re.compile(r".*-(Male|Female)$"): 1024, # by character re.compile(r".* BARK$"): 100, # t 64 256 re.compile(r".* VITS$"): 500, re.compile( r".+\.(wav|mp3|ogg|m4a)$" ): 150, # t 250 400 api automatic split re.compile(r".* VITS-onnx$"): 250, # automatic sentence split re.compile(r".* OpenAI-TTS$"): 1024 # max charaters 4096 } for pattern, chunk_size in patterns.items(): if pattern.match(file_name): return chunk_size # Default chunk size if the file doesn't match any pattern; max 1800 return 100 def plain_text_to_segments(result_text=None, chunk_size=None): if not chunk_size: chunk_size = 100 text_chunks = split_text_into_chunks(result_text, chunk_size) segments_chunks = [] for num, chunk in enumerate(text_chunks): chunk_dict = { "text": chunk, "start": (1.0 + num), "end": (2.0 + num), "speaker": "SPEAKER_00", } segments_chunks.append(chunk_dict) result_diarize = {"segments": segments_chunks} return result_diarize def segments_to_plain_text(result_diarize): complete_text = "" for seg in result_diarize["segments"]: complete_text += seg["text"] + " " # issue # Save text to a .txt file # file_name = os.path.splitext(os.path.basename(file_path))[0] txt_file_path = "./text_translation.txt" with open( txt_file_path, "w", encoding='utf-8', errors='replace' ) as txt_file: txt_file.write(complete_text) return txt_file_path, complete_text # doc to video COLORS = { "black": (0, 0, 0), "white": (255, 255, 255), "red": (255, 0, 0), "green": (0, 255, 0), "blue": (0, 0, 255), "yellow": (255, 255, 0), "light_gray": (200, 200, 200), "light_blue": (173, 216, 230), "light_green": (144, 238, 144), "light_yellow": (255, 255, 224), "light_pink": (255, 182, 193), "lavender": (230, 230, 250), "peach": (255, 218, 185), "light_cyan": (224, 255, 255), "light_salmon": (255, 160, 122), "light_green_yellow": (173, 255, 47), } BORDER_COLORS = ["dynamic"] + list(COLORS.keys()) def calculate_average_color(img): # Resize the image to a small size for faster processing img_small = img.resize((50, 50)) # Calculate the average color average_color = img_small.convert("RGB").resize((1, 1)).getpixel((0, 0)) return average_color def add_border_to_image( image_path, target_width, target_height, border_color=None ): img = Image.open(image_path) # Calculate the width and height for the new image with borders original_width, original_height = img.size original_aspect_ratio = original_width / original_height target_aspect_ratio = target_width / target_height # Resize the image to fit the target resolution retaining aspect ratio if original_aspect_ratio > target_aspect_ratio: # Image is wider, calculate new height new_height = int(target_width / original_aspect_ratio) resized_img = img.resize((target_width, new_height)) else: # Image is taller, calculate new width new_width = int(target_height * original_aspect_ratio) resized_img = img.resize((new_width, target_height)) # Calculate padding for borders padding = (0, 0, 0, 0) if resized_img.size[0] != target_width or resized_img.size[1] != target_height: if original_aspect_ratio > target_aspect_ratio: # Add borders vertically padding = (0, (target_height - resized_img.size[1]) // 2, 0, (target_height - resized_img.size[1]) // 2) else: # Add borders horizontally padding = ((target_width - resized_img.size[0]) // 2, 0, (target_width - resized_img.size[0]) // 2, 0) # Add borders with specified color if not border_color or border_color == "dynamic": border_color = calculate_average_color(resized_img) else: border_color = COLORS.get(border_color, (0, 0, 0)) bordered_img = ImageOps.expand(resized_img, padding, fill=border_color) bordered_img.save(image_path) return image_path def resize_and_position_subimage( subimage, max_width, max_height, subimage_position, main_width, main_height ): subimage_width, subimage_height = subimage.size # Resize subimage if it exceeds maximum dimensions if subimage_width > max_width or subimage_height > max_height: # Calculate scaling factor width_scale = max_width / subimage_width height_scale = max_height / subimage_height scale = min(width_scale, height_scale) # Resize subimage subimage = subimage.resize( (int(subimage_width * scale), int(subimage_height * scale)) ) # Calculate position to place the subimage if subimage_position == "top-left": subimage_x = 0 subimage_y = 0 elif subimage_position == "top-right": subimage_x = main_width - subimage.width subimage_y = 0 elif subimage_position == "bottom-left": subimage_x = 0 subimage_y = main_height - subimage.height elif subimage_position == "bottom-right": subimage_x = main_width - subimage.width subimage_y = main_height - subimage.height else: raise ValueError( "Invalid subimage_position. Choose from 'top-left', 'top-right'," " 'bottom-left', or 'bottom-right'." ) return subimage, subimage_x, subimage_y def create_image_with_text_and_subimages( text, subimages, width, height, text_color, background_color, output_file ): # Create an image with the specified resolution and background color image = Image.new('RGB', (width, height), color=background_color) # Initialize ImageDraw object draw = ImageDraw.Draw(image) # Load a font font = ImageFont.load_default() # You can specify your font file here # Calculate text size and position text_bbox = draw.textbbox((0, 0), text, font=font) text_width = text_bbox[2] - text_bbox[0] text_height = text_bbox[3] - text_bbox[1] text_x = (width - text_width) / 2 text_y = (height - text_height) / 2 # Draw text on the image draw.text((text_x, text_y), text, fill=text_color, font=font) # Paste subimages onto the main image for subimage_path, subimage_position in subimages: # Open the subimage subimage = Image.open(subimage_path) # Convert subimage to RGBA mode if it doesn't have an alpha channel if subimage.mode != 'RGBA': subimage = subimage.convert('RGBA') # Resize and position the subimage subimage, subimage_x, subimage_y = resize_and_position_subimage( subimage, width / 4, height / 4, subimage_position, width, height ) # Paste the subimage onto the main image image.paste(subimage, (int(subimage_x), int(subimage_y)), subimage) image.save(output_file) return output_file def doc_to_txtximg_pages( document, width, height, start_page, end_page, bcolor ): from pypdf import PdfReader images_folder = "pdf_images/" os.makedirs(images_folder, exist_ok=True) remove_directory_contents(images_folder) # First image text_image = os.path.basename(document)[:-4] subimages = [("./assets/logo.jpeg", "top-left")] text_color = (255, 255, 255) if bcolor == "black" else (0, 0, 0) # w|b background_color = COLORS.get(bcolor, (255, 255, 255)) # dynamic white first_image = "pdf_images/0000_00_aaa.png" create_image_with_text_and_subimages( text_image, subimages, width, height, text_color, background_color, first_image ) reader = PdfReader(document) logger.debug(f"Total pages: {reader.get_num_pages()}") start_page_idx = max((start_page-1), 0) end_page_inx = min((end_page), (reader.get_num_pages())) document_pages = reader.pages[start_page_idx:end_page_inx] logger.info( f"Selected pages from {start_page_idx} to {end_page_inx}: " f"{len(document_pages)}" ) data_doc = {} for i, page in enumerate(document_pages): count = 0 images = [] for image_file_object in page.images: img_name = f"{images_folder}{i:04d}_{count:02d}_{image_file_object.name}" images.append(img_name) with open(img_name, "wb") as fp: fp.write(image_file_object.data) count += 1 img_name = add_border_to_image(img_name, width, height, bcolor) data_doc[i] = { "text": remove_hyphens(page.extract_text()), "images": images } return data_doc def page_data_to_segments(result_text=None, chunk_size=None): if not chunk_size: chunk_size = 100 segments_chunks = [] time_global = 0 for page, result_data in result_text.items(): # result_image = result_data["images"] result_text = result_data["text"] text_chunks = split_text_into_chunks(result_text, chunk_size) if not text_chunks: text_chunks = [" "] for chunk in text_chunks: chunk_dict = { "text": chunk, "start": (1.0 + time_global), "end": (2.0 + time_global), "speaker": "SPEAKER_00", "page": page, } segments_chunks.append(chunk_dict) time_global += 1 result_diarize = {"segments": segments_chunks} return result_diarize def update_page_data(result_diarize, doc_data): complete_text = "" current_page = result_diarize["segments"][0]["page"] text_page = "" for seg in result_diarize["segments"]: text = seg["text"] + " " # issue complete_text += text page = seg["page"] if page == current_page: text_page += text else: doc_data[current_page]["text"] = text_page # Next text_page = text current_page = page if doc_data[current_page]["text"] != text_page: doc_data[current_page]["text"] = text_page return doc_data def fix_timestamps_docs(result_diarize, audio_files): current_start = 0.0 for seg, audio in zip(result_diarize["segments"], audio_files): duration = round(sf.info(audio).duration, 2) seg["start"] = current_start current_start += duration seg["end"] = current_start return result_diarize def create_video_from_images( doc_data, result_diarize ): # First image path first_image = "pdf_images/0000_00_aaa.png" # Time segments and images max_pages_idx = len(doc_data) - 1 current_page = result_diarize["segments"][0]["page"] duration_page = 0.0 last_image = None for seg in result_diarize["segments"]: start = seg["start"] end = seg["end"] duration_seg = end - start page = seg["page"] if page == current_page: duration_page += duration_seg else: images = doc_data[current_page]["images"] if first_image: images = [first_image] + images first_image = None if not doc_data[min(max_pages_idx, (current_page+1))]["text"].strip(): images = images + doc_data[min(max_pages_idx, (current_page+1))]["images"] if not images and last_image: images = [last_image] # Calculate images duration time_duration_per_image = round((duration_page / len(images)), 2) doc_data[current_page]["time_per_image"] = time_duration_per_image # Next values doc_data[current_page]["images"] = images last_image = images[-1] duration_page = duration_seg current_page = page if "time_per_image" not in doc_data[current_page].keys(): images = doc_data[current_page]["images"] if first_image: images = [first_image] + images if not images: images = [last_image] time_duration_per_image = round((duration_page / len(images)), 2) doc_data[current_page]["time_per_image"] = time_duration_per_image # Timestamped image video. with open("list.txt", "w") as file: for i, page in enumerate(doc_data.values()): duration = page["time_per_image"] for img in page["images"]: if i == len(doc_data) - 1 and img == page["images"][-1]: # Check if it's the last item file.write(f"file {img}\n") file.write(f"outpoint {duration}") else: file.write(f"file {img}\n") file.write(f"outpoint {duration}\n") out_video = "video_from_images.mp4" remove_files(out_video) cm = f"ffmpeg -y -f concat -i list.txt -c:v libx264 -preset veryfast -crf 18 -pix_fmt yuv420p {out_video}" cm_alt = f"ffmpeg -f concat -i list.txt -c:v libx264 -r 30 -pix_fmt yuv420p -y {out_video}" try: run_command(cm) except Exception as error: logger.error(str(error)) remove_files(out_video) run_command(cm_alt) return out_video def merge_video_and_audio(video_doc, final_wav_file): fixed_audio = "fixed_audio.mp3" remove_files(fixed_audio) cm = f"ffmpeg -i {final_wav_file} -c:a libmp3lame {fixed_audio}" run_command(cm) vid_out = "video_book.mp4" remove_files(vid_out) cm = f"ffmpeg -i {video_doc} -i {fixed_audio} -c:v copy -c:a copy -map 0:v -map 1:a -shortest {vid_out}" run_command(cm) return vid_out # subtitles def get_subtitle( language, segments_data, extension, filename=None, highlight_words=False, ): if not filename: filename = "task_subtitle" is_ass_extension = False if extension == "ass": is_ass_extension = True extension = "srt" sub_file = filename + "." + extension support_name = filename + ".mp3" remove_files(sub_file) writer = get_writer(extension, output_dir=".") word_options = { "highlight_words": highlight_words, "max_line_count": None, "max_line_width": None, } # Get data subs subtitle_data = copy.deepcopy(segments_data) subtitle_data["language"] = ( "ja" if language in ["ja", "zh", "zh-TW"] else language ) # Clean if not highlight_words: subtitle_data.pop("word_segments", None) for segment in subtitle_data["segments"]: for key in ["speaker", "chars", "words"]: segment.pop(key, None) writer( subtitle_data, support_name, word_options, ) if is_ass_extension: temp_name = filename + ".ass" remove_files(temp_name) convert_sub = f'ffmpeg -i "{sub_file}" "{temp_name}" -y' run_command(convert_sub) sub_file = temp_name return sub_file def process_subtitles( deep_copied_result, align_language, result_diarize, output_format_subtitle, TRANSLATE_AUDIO_TO, ): name_ori = "sub_ori." name_tra = "sub_tra." remove_files( [name_ori + output_format_subtitle, name_tra + output_format_subtitle] ) writer = get_writer(output_format_subtitle, output_dir=".") word_options = { "highlight_words": False, "max_line_count": None, "max_line_width": None, } # original lang subs_copy_result = copy.deepcopy(deep_copied_result) subs_copy_result["language"] = ( "zh" if align_language == "zh-TW" else align_language ) for segment in subs_copy_result["segments"]: segment.pop("speaker", None) try: writer( subs_copy_result, name_ori[:-1] + ".mp3", word_options, ) except Exception as error: logger.error(str(error)) if str(error) == "list indices must be integers or slices, not str": logger.error( "Related to poor word segmentation" " in segments after alignment." ) subs_copy_result["segments"][0].pop("words") writer( subs_copy_result, name_ori[:-1] + ".mp3", word_options, ) # translated lang subs_tra_copy_result = copy.deepcopy(result_diarize) subs_tra_copy_result["language"] = ( "ja" if TRANSLATE_AUDIO_TO in ["ja", "zh", "zh-TW"] else align_language ) subs_tra_copy_result.pop("word_segments", None) for segment in subs_tra_copy_result["segments"]: for key in ["speaker", "chars", "words"]: segment.pop(key, None) writer( subs_tra_copy_result, name_tra[:-1] + ".mp3", word_options, ) return name_tra + output_format_subtitle def linguistic_level_segments( result_base, linguistic_unit="word", # word or char ): linguistic_unit = linguistic_unit[:4] linguistic_unit_key = linguistic_unit + "s" result = copy.deepcopy(result_base) if linguistic_unit_key not in result["segments"][0].keys(): raise ValueError("No alignment detected, can't process") segments_by_unit = [] for segment in result["segments"]: segment_units = segment[linguistic_unit_key] # segment_speaker = segment.get("speaker", "SPEAKER_00") for unit in segment_units: text = unit[linguistic_unit] if "start" in unit.keys(): segments_by_unit.append( { "start": unit["start"], "end": unit["end"], "text": text, # "speaker": segment_speaker, } ) elif not segments_by_unit: pass else: segments_by_unit[-1]["text"] += text return {"segments": segments_by_unit} def break_aling_segments( result: dict, break_characters: str = "", # ":|,|.|" ): result_align = copy.deepcopy(result) break_characters_list = break_characters.split("|") break_characters_list = [i for i in break_characters_list if i != ''] if not break_characters_list: logger.info("No valid break characters were specified.") return result logger.info(f"Redivide text segments by: {str(break_characters_list)}") # create new with filters normal = [] def process_chars(chars, letter_new_start, num, text): start_key, end_key = "start", "end" start_value = end_value = None for char in chars: if start_key in char: start_value = char[start_key] break for char in reversed(chars): if end_key in char: end_value = char[end_key] break if not start_value or not end_value: raise Exception( f"Unable to obtain a valid timestamp for chars: {str(chars)}" ) return { "start": start_value, "end": end_value, "text": text, "words": chars, } for i, segment in enumerate(result_align['segments']): logger.debug(f"- Process segment: {i}, text: {segment['text']}") # start = segment['start'] letter_new_start = 0 for num, char in enumerate(segment['chars']): if char["char"] is None: continue # if "start" in char: # start = char["start"] # if "end" in char: # end = char["end"] # Break by character if char['char'] in break_characters_list: text = segment['text'][letter_new_start:num+1] logger.debug( f"Break in: {char['char']}, position: {num}, text: {text}" ) chars = segment['chars'][letter_new_start:num+1] if not text: logger.debug("No text") continue if num == 0 and not text.strip(): logger.debug("blank space in start") continue if len(text) == 1: logger.debug(f"Short char append, num: {num}") normal[-1]["text"] += text normal[-1]["words"].append(chars) continue # logger.debug(chars) normal_dict = process_chars(chars, letter_new_start, num, text) letter_new_start = num+1 normal.append(normal_dict) # If we reach the end of the segment, add the last part of chars. if num == len(segment["chars"]) - 1: text = segment['text'][letter_new_start:num+1] # If remain text len is not default len text if num not in [len(text)-1, len(text)] and text: logger.debug(f'Remaining text: {text}') if not text: logger.debug("No remaining text.") continue if len(text) == 1: logger.debug(f"Short char append, num: {num}") normal[-1]["text"] += text normal[-1]["words"].append(chars) continue chars = segment['chars'][letter_new_start:num+1] normal_dict = process_chars(chars, letter_new_start, num, text) letter_new_start = num+1 normal.append(normal_dict) # Rename char to word for item in normal: words_list = item['words'] for word_item in words_list: if 'char' in word_item: word_item['word'] = word_item.pop('char') # Convert to dict default break_segments = {"segments": normal} msg_count = ( f"Segment count before: {len(result['segments'])}, " f"after: {len(break_segments['segments'])}." ) logger.info(msg_count) return break_segments