import os import json from indexer import indexer import re from tvdb import fetch_and_cache_json from threading import Event, Thread import time import logging from utils import convert_to_gb from api import InstancesAPI CACHE_DIR = os.getenv("CACHE_DIR") download_progress = {} class LoadBalancer: def __init__(self, cache_dir, index_file, token, repo, polling_interval=4, max_retries=3, initial_delay=1): self.version = "0.0.2.10 V Beta" self.instances = [] self.instances_health = {} self.polling_interval = polling_interval self.max_retries = max_retries self.initial_delay = initial_delay self.stop_event = Event() self.instances_api = InstancesAPI(self.instances) self.CACHE_DIR = cache_dir self.INDEX_FILE = index_file self.TOKEN = token self.REPO = repo self.FILM_STORE = {} self.TV_STORE = {} self.file_structure = None self.index_file_last_modified = None # Ensure CACHE_DIR exists if not os.path.exists(self.CACHE_DIR): os.makedirs(self.CACHE_DIR) # Index the file structure initially indexer() # Load the file structure JSON self.load_file_structure() # Start polling and file checking in separate threads polling_thread = Thread(target=self.start_polling) polling_thread.daemon = True polling_thread.start() file_checking_thread = Thread(target=self.check_file_updates) file_checking_thread.daemon = True file_checking_thread.start() def load_file_structure(self): if not os.path.exists(self.INDEX_FILE): raise FileNotFoundError(f"{self.INDEX_FILE} not found. Please make sure the file exists.") with open(self.INDEX_FILE, 'r') as f: self.file_structure = json.load(f) logging.info("File structure loaded successfully.") def check_file_updates(self): while not self.stop_event.is_set(): if self.index_file_last_modified != os.path.getmtime(self.INDEX_FILE): logging.info(f"{self.INDEX_FILE} has been updated. Re-indexing...") indexer() # Re-run the indexer self.load_file_structure() # Reload the file structure self.index_file_last_modified = os.path.getmtime(self.INDEX_FILE) # Restart prefetching thread if hasattr(self, 'prefetch_thread') and self.prefetch_thread.is_alive(): self.prefetch_thread.join() self.prefetch_thread = Thread(target=self.start_prefetching) self.prefetch_thread.daemon = True self.prefetch_thread.start() time.sleep(120) # Check every 2 minutes def register_instance(self, instance_url): if instance_url not in self.instances: self.instances.append(instance_url) logging.info(f"Registered instance {instance_url}") else: logging.info(f"Instance {instance_url} is already registered.") def remove_instance(self, instance_url): if instance_url in self.instances: self.instances.remove(instance_url) self.instances_health.pop(instance_url, None) logging.info(f"Removed instance {instance_url}") else: logging.info(f"Instance {instance_url} not found for removal.") def get_reports(self): reports = self.instances_api.fetch_reports() # Initialize temporary JSON data holders temp_film_store = {} temp_tv_store = {} for instance_url in self.instances[:]: # Copy list to avoid modification during iteration if instance_url in reports: report = reports[instance_url] logging.info(f"Report from {instance_url}: {report}") self.process_report(instance_url, report, temp_film_store, temp_tv_store) else: logging.error(f"Failed to get report from {instance_url}. Removing instance.") self.remove_instance(instance_url) self.FILM_STORE = temp_film_store self.TV_STORE = temp_tv_store def process_report(self, instance_url, report, temp_film_store, temp_tv_store): film_store = report.get('film_store', {}) tv_store = report.get('tv_store', {}) cache_size = report.get('cache_size') logging.info(f"Processing report from {instance_url}") # Update temporary film store for title, path in film_store.items(): url = f"{instance_url}/api/film/{title.replace(' ', '%20')}" temp_film_store[title] = url # Update temporary TV store for title, seasons in tv_store.items(): if title not in temp_tv_store: temp_tv_store[title] = {} for season, episodes in seasons.items(): if season not in temp_tv_store[title]: temp_tv_store[title][season] = {} for episode, path in episodes.items(): url = f"{instance_url}/api/tv/{title.replace(' ', '%20')}/{season.replace(' ', '%20')}/{episode.replace(' ', '%20')}" temp_tv_store[title][season][episode] = url logging.info("Film and TV Stores processed successfully.") self.update_instances_health(instance=instance_url, cache_size=cache_size) def start_polling(self): logging.info("Starting polling.") while not self.stop_event.is_set(): self.get_reports() time.sleep(self.polling_interval) logging.info("Polling stopped.") def stop_polling(self): logging.info("Stopping polling.") self.stop_event.set() def start_prefetching(self): """Start the metadata prefetching in a separate thread.""" self.prefetch_metadata() ################################################################# def update_instances_health(self, instance, cache_size): self.instances_health[instance] = {"used":cache_size["cache_size"], "total": "50 GB"} logging.info(f"Updated instance {instance} with cache size {cache_size}") def download_film_to_best_instance(self, title): """ Downloads a film to the first instance that has more free space on the self.instance_health list variable. The instance_health looks like this: { "https://unicone-studio-instance1.hf.space": { "total": "50 GB", "used": "3.33 GB" } } Args: title (str): The title of the film. """ best_instance = None max_free_space = -1 # Calculate free space for each instance for instance_url, space_info in self.instances_health.items(): total_space = convert_to_gb(space_info['total']) used_space = convert_to_gb(space_info['used']) free_space = total_space - used_space if free_space > max_free_space: max_free_space = free_space best_instance = instance_url if best_instance: result = self.instances_api.download_film(best_instance, title) film_id = result["film_id"] status = result["status"] progress_url = f'{best_instance}/api/progress/{film_id}' response = { "film_id":film_id, "status":status, "progress_url":progress_url } return response else: logging.error("No suitable instance found for downloading the film.") return {"error": "No suitable instance found for downloading the film."} def download_episode_to_best_instance(self, title, season, episode): """ Downloads a episode to the first instance that has more free space on the self.instance_health list variable. The instance_health looks like this: { "https://unicone-studio-instance1.hf.space": { "total": "50 GB", "used": "3.33 GB" } } Args: title (str): The title of the Tv show. season (str): The season of the Tv show. episode (str): The title of the Tv show. """ best_instance = None max_free_space = -1 # Calculate free space for each instance for instance_url, space_info in self.instances_health.items(): total_space = convert_to_gb(space_info['total']) used_space = convert_to_gb(space_info['used']) free_space = total_space - used_space if free_space > max_free_space: max_free_space = free_space best_instance = instance_url if best_instance: result = self.instances_api.download_episode(best_instance, title, season, episode) episode_id = result["episode_id"] status = result["status"] progress_url = f'{best_instance}/api/progress/{episode_id}' response = { "episode_id":episode_id, "status":status, "progress_url":progress_url } return response else: logging.error("No suitable instance found for downloading the film.") return {"error": "No suitable instance found for downloading the film."} ################################################################# def find_movie_path(self, title): """Find the path of the movie in the JSON data based on the title.""" for directory in self.file_structure: if directory['type'] == 'directory' and directory['path'] == 'films': for sub_directory in directory['contents']: if sub_directory['type'] == 'directory': for item in sub_directory['contents']: if item['type'] == 'file' and title.lower() in item['path'].lower(): return item['path'] return None def find_tv_path(self, title): """Find the path of the TV show in the JSON data based on the title.""" for directory in self.file_structure: if directory['type'] == 'directory' and directory['path'] == 'tv': for sub_directory in directory['contents']: if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower(): return sub_directory['path'] return None def get_tv_structure(self, title): """Find the path of the TV show in the JSON data based on the title.""" for directory in self.file_structure: if directory['type'] == 'directory' and directory['path'] == 'tv': for sub_directory in directory['contents']: if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower(): return sub_directory return None def get_film_id(self, title): """Generate a film ID based on the title.""" return title.replace(" ", "_").lower() def prefetch_metadata(self): """Prefetch metadata for all items in the file structure.""" for item in self.file_structure: if 'contents' in item: for sub_item in item['contents']: original_title = sub_item['path'].split('/')[-1] media_type = 'series' if item['path'].startswith('tv') else 'movie' title = original_title year = None # Extract year from the title if available match = re.search(r'\((\d{4})\)', original_title) if match: year_str = match.group(1) if year_str.isdigit() and len(year_str) == 4: title = original_title[:match.start()].strip() year = int(year_str) else: parts = original_title.rsplit(' ', 1) if len(parts) > 1 and parts[-1].isdigit() and len(parts[-1]) == 4: title = parts[0].strip() year = int(parts[-1]) fetch_and_cache_json(original_title, title, media_type, year) def get_all_tv_shows(self): """Get all TV shows from the indexed cache structure JSON file.""" tv_shows = {} for directory in self.file_structure: if directory['type'] == 'directory' and directory['path'] == 'tv': for sub_directory in directory['contents']: if sub_directory['type'] == 'directory': show_title = sub_directory['path'].split('/')[-1] tv_shows[show_title] = [] for season_directory in sub_directory['contents']: if season_directory['type'] == 'directory': season = season_directory['path'].split('/')[-1] for episode in season_directory['contents']: if episode['type'] == 'file': tv_shows[show_title].append({ "season": season, "episode": episode['path'].split('/')[-1], "path": episode['path'] }) return tv_shows def get_all_films(self): """Get all films from the indexed cache structure JSON file.""" films = [] for directory in self.file_structure: if directory['type'] == 'directory' and directory['path'] == 'films': for sub_directory in directory['contents']: if sub_directory['type'] == 'directory': films.append(sub_directory['path']) return films