|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import json
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import logging
|
|
import os
|
|
import re
|
|
import sys
|
|
from urllib.parse import urlparse, parse_qs
|
|
|
|
import unicodedata
|
|
|
|
import yt_dlp
|
|
|
|
from App_Function_Libraries.DB.DB_Manager import check_media_and_whisper_model
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def normalize_title(title):
|
|
|
|
title = unicodedata.normalize('NFKD', title).encode('ascii', 'ignore').decode('ascii')
|
|
title = title.replace('/', '_').replace('\\', '_').replace(':', '_').replace('"', '').replace('*', '').replace('?',
|
|
'').replace(
|
|
'<', '').replace('>', '').replace('|', '')
|
|
return title
|
|
|
|
def get_video_info(url: str) -> dict:
|
|
ydl_opts = {
|
|
'quiet': True,
|
|
'no_warnings': True,
|
|
'skip_download': True,
|
|
}
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
try:
|
|
info_dict = ydl.extract_info(url, download=False)
|
|
return info_dict
|
|
except Exception as e:
|
|
logging.error(f"Error extracting video info: {e}")
|
|
return None
|
|
|
|
|
|
def get_youtube(video_url):
|
|
ydl_opts = {
|
|
'format': 'bestaudio[ext=m4a]',
|
|
'noplaylist': False,
|
|
'quiet': True,
|
|
'extract_flat': True
|
|
}
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
logging.debug("About to extract youtube info")
|
|
info_dict = ydl.extract_info(video_url, download=False)
|
|
logging.debug("Youtube info successfully extracted")
|
|
return info_dict
|
|
|
|
|
|
def get_playlist_videos(playlist_url):
|
|
ydl_opts = {
|
|
'extract_flat': True,
|
|
'skip_download': True,
|
|
'quiet': True
|
|
}
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
info = ydl.extract_info(playlist_url, download=False)
|
|
|
|
if 'entries' in info:
|
|
video_urls = [entry['url'] for entry in info['entries']]
|
|
playlist_title = info['title']
|
|
return video_urls, playlist_title
|
|
else:
|
|
print("No videos found in the playlist.")
|
|
return [], None
|
|
|
|
|
|
def download_video(video_url, download_path, info_dict, download_video_flag, current_whisper_model):
|
|
global video_file_path, ffmpeg_path
|
|
global audio_file_path
|
|
|
|
|
|
logging.debug("About to normalize downloaded video title")
|
|
if 'title' not in info_dict or 'ext' not in info_dict:
|
|
logging.error("info_dict is missing 'title' or 'ext'")
|
|
return None
|
|
|
|
normalized_video_title = normalize_title(info_dict['title'])
|
|
|
|
|
|
|
|
should_download, reason = check_media_and_whisper_model(
|
|
title=normalized_video_title,
|
|
url=video_url,
|
|
current_whisper_model=current_whisper_model
|
|
)
|
|
|
|
if not should_download:
|
|
logging.info(f"Skipping download: {reason}")
|
|
return None
|
|
|
|
logging.info(f"Proceeding with download: {reason}")
|
|
|
|
video_file_path = os.path.join(download_path, f"{normalized_video_title}.{info_dict['ext']}")
|
|
|
|
|
|
if os.path.exists(video_file_path):
|
|
logging.info(f"Video file already exists: {video_file_path}")
|
|
return video_file_path
|
|
|
|
|
|
if sys.platform.startswith('win'):
|
|
ffmpeg_path = os.path.join(os.getcwd(), 'Bin', 'ffmpeg.exe')
|
|
elif sys.platform.startswith('linux'):
|
|
ffmpeg_path = 'ffmpeg'
|
|
elif sys.platform.startswith('darwin'):
|
|
ffmpeg_path = 'ffmpeg'
|
|
|
|
if download_video_flag:
|
|
video_file_path = os.path.join(download_path, f"{normalized_video_title}.mp4")
|
|
ydl_opts_video = {
|
|
'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]',
|
|
'outtmpl': video_file_path,
|
|
'ffmpeg_location': ffmpeg_path
|
|
}
|
|
|
|
try:
|
|
with yt_dlp.YoutubeDL(ydl_opts_video) as ydl:
|
|
logging.debug("yt_dlp: About to download video with youtube-dl")
|
|
ydl.download([video_url])
|
|
logging.debug("yt_dlp: Video successfully downloaded with youtube-dl")
|
|
if os.path.exists(video_file_path):
|
|
return video_file_path
|
|
else:
|
|
logging.error("yt_dlp: Video file not found after download")
|
|
return None
|
|
except Exception as e:
|
|
logging.error(f"yt_dlp: Error downloading video: {e}")
|
|
return None
|
|
elif not download_video_flag:
|
|
video_file_path = os.path.join(download_path, f"{normalized_video_title}.mp4")
|
|
|
|
ydl_opts = {
|
|
'format': 'bestaudio[ext=m4a]',
|
|
'quiet': True,
|
|
'outtmpl': video_file_path
|
|
}
|
|
|
|
try:
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
logging.debug("yt_dlp: About to download video with youtube-dl")
|
|
ydl.download([video_url])
|
|
logging.debug("yt_dlp: Video successfully downloaded with youtube-dl")
|
|
if os.path.exists(video_file_path):
|
|
return video_file_path
|
|
else:
|
|
logging.error("yt_dlp: Video file not found after download")
|
|
return None
|
|
except Exception as e:
|
|
logging.error(f"yt_dlp: Error downloading video: {e}")
|
|
return None
|
|
|
|
else:
|
|
logging.debug("download_video: Download video flag is set to False and video file path is not found")
|
|
return None
|
|
|
|
|
|
def extract_video_info(url):
|
|
try:
|
|
with yt_dlp.YoutubeDL({'quiet': True}) as ydl:
|
|
info = ydl.extract_info(url, download=False)
|
|
|
|
|
|
log_info = {
|
|
'title': info.get('title'),
|
|
'duration': info.get('duration'),
|
|
'upload_date': info.get('upload_date')
|
|
}
|
|
logging.debug(f"Extracted info for {url}: {log_info}")
|
|
|
|
return info
|
|
except Exception as e:
|
|
logging.error(f"Error extracting video info for {url}: {str(e)}", exc_info=True)
|
|
return None
|
|
|
|
|
|
def get_youtube_playlist_urls(playlist_id):
|
|
ydl_opts = {
|
|
'extract_flat': True,
|
|
'quiet': True,
|
|
}
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
result = ydl.extract_info(f'https://www.youtube.com/playlist?list={playlist_id}', download=False)
|
|
return [entry['url'] for entry in result['entries'] if entry.get('url')]
|
|
|
|
|
|
def parse_and_expand_urls(urls):
|
|
logging.info(f"Starting parse_and_expand_urls with input: {urls}")
|
|
expanded_urls = []
|
|
|
|
for url in urls:
|
|
try:
|
|
logging.info(f"Processing URL: {url}")
|
|
parsed_url = urlparse(url)
|
|
logging.debug(f"Parsed URL components: {parsed_url}")
|
|
|
|
|
|
if 'youtube.com' in parsed_url.netloc and 'list' in parsed_url.query:
|
|
playlist_id = parse_qs(parsed_url.query)['list'][0]
|
|
logging.info(f"Detected YouTube playlist with ID: {playlist_id}")
|
|
playlist_urls = get_youtube_playlist_urls(playlist_id)
|
|
logging.info(f"Expanded playlist URLs: {playlist_urls}")
|
|
expanded_urls.extend(playlist_urls)
|
|
|
|
|
|
elif 'youtu.be' in parsed_url.netloc:
|
|
video_id = parsed_url.path.lstrip('/')
|
|
full_url = f'https://www.youtube.com/watch?v={video_id}'
|
|
logging.info(f"Expanded YouTube short URL to: {full_url}")
|
|
expanded_urls.append(full_url)
|
|
|
|
|
|
elif 'vimeo.com' in parsed_url.netloc:
|
|
video_id = parsed_url.path.lstrip('/')
|
|
full_url = f'https://vimeo.com/{video_id}'
|
|
logging.info(f"Processed Vimeo URL: {full_url}")
|
|
expanded_urls.append(full_url)
|
|
|
|
|
|
|
|
else:
|
|
logging.info(f"URL not recognized as special case, adding as-is: {url}")
|
|
expanded_urls.append(url)
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error processing URL {url}: {str(e)}", exc_info=True)
|
|
|
|
|
|
|
|
logging.info(f"Final expanded URLs: {expanded_urls}")
|
|
return expanded_urls
|
|
|
|
|
|
def extract_metadata(url, use_cookies=False, cookies=None):
|
|
ydl_opts = {
|
|
'quiet': True,
|
|
'no_warnings': True,
|
|
'extract_flat': True,
|
|
'skip_download': True,
|
|
}
|
|
|
|
if use_cookies and cookies:
|
|
try:
|
|
cookie_dict = json.loads(cookies)
|
|
ydl_opts['cookiefile'] = cookie_dict
|
|
except json.JSONDecodeError:
|
|
logging.warning("Invalid cookie format. Proceeding without cookies.")
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
try:
|
|
info = ydl.extract_info(url, download=False)
|
|
metadata = {
|
|
'title': info.get('title'),
|
|
'uploader': info.get('uploader'),
|
|
'upload_date': info.get('upload_date'),
|
|
'view_count': info.get('view_count'),
|
|
'like_count': info.get('like_count'),
|
|
'duration': info.get('duration'),
|
|
'tags': info.get('tags'),
|
|
'description': info.get('description')
|
|
}
|
|
|
|
|
|
safe_metadata = {
|
|
'title': metadata.get('title', 'No title'),
|
|
'duration': metadata.get('duration', 'Unknown duration'),
|
|
'upload_date': metadata.get('upload_date', 'Unknown upload date'),
|
|
'uploader': metadata.get('uploader', 'Unknown uploader')
|
|
}
|
|
|
|
logging.info(f"Successfully extracted metadata for {url}: {safe_metadata}")
|
|
return metadata
|
|
except Exception as e:
|
|
logging.error(f"Error extracting metadata for {url}: {str(e)}", exc_info=True)
|
|
return None
|
|
|
|
|
|
def generate_timestamped_url(url, hours, minutes, seconds):
|
|
|
|
video_id_match = re.search(r'(?:v=|)([0-9A-Za-z_-]{11}).*', url)
|
|
if not video_id_match:
|
|
return "Invalid YouTube URL"
|
|
|
|
video_id = video_id_match.group(1)
|
|
|
|
|
|
total_seconds = int(hours) * 3600 + int(minutes) * 60 + int(seconds)
|
|
|
|
|
|
new_url = f"https://www.youtube.com/watch?v={video_id}&t={total_seconds}s"
|
|
|
|
return new_url
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|