import os import requests import json import time from threading import Thread, Event from requests.exceptions import RequestException from tqdm import tqdm import logging CACHE_DIR = os.getenv("CACHE_DIR") download_progress = {} class Instance: def __init__(self, id, url, cache_dir, token, repo, load_balancer_api, initial_delay=1): self.version = "0.0.1 Alpha" self.id = id self.url = url self.CACHE_DIR = cache_dir self.TOKEN = token self.REPO = repo self.MUSIC_STORE = {} self.download_threads = {} self.file_structure = None self.load_balancer_api = load_balancer_api self.initial_delay = initial_delay self.last_report_time = time.time() self.re_register_event = Event() if not os.path.exists(self.CACHE_DIR): os.makedirs(self.CACHE_DIR) self.register_to_load_balancer() self.reload_file_structure() registration_thread = Thread(target=self.monitor_registration) registration_thread.daemon = True registration_thread.start() indexer_thread = Thread(target=self.get_file_structure_periodically) indexer_thread.daemon = True indexer_thread.start() def reload_file_structure(self): self.file_structure = self.load_balancer_api.get_file_structure() logging.info("File structure reloaded successfully.") def get_file_structure_periodically(self): while True: time.sleep(300) # Every 5 minutes logging.info("Re-running indexer and reloading file structure.") self.reload_file_structure() def compile_report(self): self.last_report_time = time.time() cache_size = self.get_cache_size() report = { "instance_id": self.id, "instance_url": self.url, "music_store": self.MUSIC_STORE, "cache_size": cache_size } return report def register_to_load_balancer(self): result = self.load_balancer_api.register_instance(self.id, self.url) if result is not None: logging.info(f'Registered instance {self.id} to load balancer.') else: logging.error(f'Failed to register instance {self.id} to load balancer.') def monitor_registration(self): while True: if time.time() - self.last_report_time > 60: logging.info('1 minute passed since last report. Re-registering...') self.register_to_load_balancer() self.last_report_time = time.time() time.sleep(30) def get_cache_size(self): total_size = 0 for dirpath, dirnames, filenames in os.walk(CACHE_DIR): for f in filenames: fp = os.path.join(dirpath, f) total_size += os.path.getsize(fp) return {"cache_size": f"{total_size / (1024 * 1024 * 1024):.2f} GB"} @staticmethod def read_json(file_path): if os.path.exists(file_path): with open(file_path, 'r') as json_file: return json.load(json_file) return {} def download_music(self, file_url, token, cache_path, music_id, title, chunk_size=100 * 1024 * 1024): print(f"Downloading file from URL: {file_url} to {cache_path}") headers = {'Authorization': f'Bearer {token}'} try: response = requests.get(file_url, headers=headers, stream=True) response.raise_for_status() total_size = int(response.headers.get('content-length', 0)) download_progress[music_id] = {"total": total_size, "downloaded": 0, "status": "Downloading", "start_time": time.time()} os.makedirs(os.path.dirname(cache_path), exist_ok=True) with open(cache_path, 'wb') as file, tqdm(total=total_size, unit='B', unit_scale=True, desc=cache_path) as pbar: for data in response.iter_content(chunk_size=chunk_size): file.write(data) pbar.update(len(data)) download_progress[music_id]["downloaded"] += len(data) print(f'File cached to {cache_path} successfully.') self.MUSIC_STORE[title] = cache_path download_progress[music_id]["status"] = "Completed" except RequestException as e: print(f"Error downloading file: {e}") download_progress[music_id]["status"] = "Failed" except IOError as e: print(f"Error writing file {cache_path}: {e}") download_progress[music_id]["status"] = "Failed" finally: if download_progress[music_id]["status"] != "Downloading": download_progress[music_id]["end_time"] = time.time() @staticmethod def get_download_progress(id): if id in download_progress: total = download_progress[id]["total"] downloaded = download_progress[id]["downloaded"] status = download_progress[id].get("status", "In Progress") progress = (downloaded / total) * 100 if total > 0 else 0 eta = None if status == "Downloading" and downloaded > 0: elapsed_time = time.time() - download_progress[id]["start_time"] estimated_total_time = elapsed_time * (total / downloaded) eta = estimated_total_time - elapsed_time elif status == "Completed": eta = 0 return {"total": total, "downloaded": downloaded, "progress": progress, "status": status, "eta": eta} return {"total": 0, "downloaded": 0, "progress": 0, "status": "Not Found", "eta": None} def load_json(self, file_path): with open(file_path, 'r') as file: return json.load(file) def find_music_path(self, title): """Find the path of the music in the indexed data based on the title.""" for directory in self.file_structure: if directory['type'] == 'directory': for sub_directory in directory['contents']: if sub_directory['type'] == 'file' and title.lower() in sub_directory['path'].lower(): return sub_directory['path'] return None def get_music_id(self, title): return title.replace(" ", "_").lower() def bytes_to_human_readable(self, num, suffix="B"): for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]: if abs(num) < 1024.0: return f"{num:3.1f} {unit}{suffix}" num /= 1024.0 return f"{num:.1f} Y{suffix}" def register_to_load_balancer(self): retries = 0 delay = self.initial_delay max_delay = 120 while True: try: result = self.load_balancer_api.register_instance(self.id, self.url) if result: logging.info(f'Successfully registered instance {self.id} to load balancer.') return result except Exception as e: logging.error(f'Error during registration: {e}') retries += 1 logging.warning(f'Attempt {retries} to register instance {self.id} failed. Retrying in {delay} seconds...') time.sleep(delay) delay = min(delay * 2, max_delay)