instance1 / Instance.py
ChandimaPrabath's picture
fix dir
78ec209
raw
history blame
7.39 kB
import os
import requests
import json
import time
from threading import Thread, Event
from requests.exceptions import RequestException
from tqdm import tqdm
import logging
CACHE_DIR = os.getenv("CACHE_DIR")
download_progress = {}
class Instance:
def __init__(self, id, url, cache_dir, token, repo, load_balancer_api, initial_delay=1):
self.version = "0.0.1 Alpha"
self.id = id
self.url = url
self.CACHE_DIR = cache_dir
self.TOKEN = token
self.REPO = repo
self.MUSIC_STORE = {}
self.download_threads = {}
self.file_structure = None
self.load_balancer_api = load_balancer_api
self.initial_delay = initial_delay
self.last_report_time = time.time()
self.re_register_event = Event()
if not os.path.exists(self.CACHE_DIR):
os.makedirs(self.CACHE_DIR)
self.register_to_load_balancer()
self.reload_file_structure()
registration_thread = Thread(target=self.monitor_registration)
registration_thread.daemon = True
registration_thread.start()
indexer_thread = Thread(target=self.get_file_structure_periodically)
indexer_thread.daemon = True
indexer_thread.start()
def reload_file_structure(self):
self.file_structure = self.load_balancer_api.get_file_structure()
logging.info("File structure reloaded successfully.")
def get_file_structure_periodically(self):
while True:
time.sleep(300) # Every 5 minutes
logging.info("Re-running indexer and reloading file structure.")
self.reload_file_structure()
def compile_report(self):
self.last_report_time = time.time()
cache_size = self.get_cache_size()
report = {
"instance_id": self.id,
"instance_url": self.url,
"music_store": self.MUSIC_STORE,
"cache_size": cache_size
}
return report
def register_to_load_balancer(self):
result = self.load_balancer_api.register_instance(self.id, self.url)
if result is not None:
logging.info(f'Registered instance {self.id} to load balancer.')
else:
logging.error(f'Failed to register instance {self.id} to load balancer.')
def monitor_registration(self):
while True:
if time.time() - self.last_report_time > 60:
logging.info('1 minute passed since last report. Re-registering...')
self.register_to_load_balancer()
self.last_report_time = time.time()
time.sleep(30)
def get_cache_size(self):
total_size = 0
for dirpath, dirnames, filenames in os.walk(CACHE_DIR):
for f in filenames:
fp = os.path.join(dirpath, f)
total_size += os.path.getsize(fp)
return {"cache_size": f"{total_size / (1024 * 1024 * 1024):.2f} GB"}
@staticmethod
def read_json(file_path):
if os.path.exists(file_path):
with open(file_path, 'r') as json_file:
return json.load(json_file)
return {}
def download_music(self, file_url, token, cache_path, music_id, title, chunk_size=100 * 1024 * 1024):
print(f"Downloading file from URL: {file_url} to {cache_path}")
headers = {'Authorization': f'Bearer {token}'}
try:
response = requests.get(file_url, headers=headers, stream=True)
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
download_progress[music_id] = {"total": total_size, "downloaded": 0, "status": "Downloading", "start_time": time.time()}
os.makedirs(os.path.dirname(cache_path), exist_ok=True)
with open(cache_path, 'wb') as file, tqdm(total=total_size, unit='B', unit_scale=True, desc=cache_path) as pbar:
for data in response.iter_content(chunk_size=chunk_size):
file.write(data)
pbar.update(len(data))
download_progress[music_id]["downloaded"] += len(data)
print(f'File cached to {cache_path} successfully.')
self.MUSIC_STORE[title] = cache_path
download_progress[music_id]["status"] = "Completed"
except RequestException as e:
print(f"Error downloading file: {e}")
download_progress[music_id]["status"] = "Failed"
except IOError as e:
print(f"Error writing file {cache_path}: {e}")
download_progress[music_id]["status"] = "Failed"
finally:
if download_progress[music_id]["status"] != "Downloading":
download_progress[music_id]["end_time"] = time.time()
@staticmethod
def get_download_progress(id):
if id in download_progress:
total = download_progress[id]["total"]
downloaded = download_progress[id]["downloaded"]
status = download_progress[id].get("status", "In Progress")
progress = (downloaded / total) * 100 if total > 0 else 0
eta = None
if status == "Downloading" and downloaded > 0:
elapsed_time = time.time() - download_progress[id]["start_time"]
estimated_total_time = elapsed_time * (total / downloaded)
eta = estimated_total_time - elapsed_time
elif status == "Completed":
eta = 0
return {"total": total, "downloaded": downloaded, "progress": progress, "status": status, "eta": eta}
return {"total": 0, "downloaded": 0, "progress": 0, "status": "Not Found", "eta": None}
def load_json(self, file_path):
with open(file_path, 'r') as file:
return json.load(file)
def find_music_path(self, title):
"""Find the path of the music in the indexed data based on the title."""
for directory in self.file_structure:
if directory['type'] == 'directory':
for sub_directory in directory['contents']:
if sub_directory['type'] == 'file' and title.lower() in sub_directory['path'].lower():
return sub_directory['path']
return None
def get_music_id(self, title):
return title.replace(" ", "_").lower()
def bytes_to_human_readable(self, num, suffix="B"):
for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]:
if abs(num) < 1024.0:
return f"{num:3.1f} {unit}{suffix}"
num /= 1024.0
return f"{num:.1f} Y{suffix}"
def register_to_load_balancer(self):
retries = 0
delay = self.initial_delay
max_delay = 120
while True:
try:
result = self.load_balancer_api.register_instance(self.id, self.url)
if result:
logging.info(f'Successfully registered instance {self.id} to load balancer.')
return result
except Exception as e:
logging.error(f'Error during registration: {e}')
retries += 1
logging.warning(f'Attempt {retries} to register instance {self.id} failed. Retrying in {delay} seconds...')
time.sleep(delay)
delay = min(delay * 2, max_delay)