ChandimaPrabath commited on
Commit
e1cb714
1 Parent(s): e0663d3

revert to 0.0.2.7 V Beta

Browse files
Files changed (3) hide show
  1. LoadBalancer.py +64 -52
  2. hf_scrapper.py +21 -21
  3. indexer.py +20 -34
LoadBalancer.py CHANGED
@@ -1,10 +1,11 @@
1
  import os
2
  import json
3
- import asyncio
4
- import logging
5
- import re
6
  from indexer import indexer
 
7
  from tvdb import fetch_and_cache_json
 
 
 
8
  from utils import convert_to_gb
9
  from api import InstancesAPI
10
 
@@ -14,13 +15,13 @@ download_progress = {}
14
 
15
  class LoadBalancer:
16
  def __init__(self, cache_dir, index_file, token, repo, polling_interval=4, max_retries=3, initial_delay=1):
17
- self.version = "0.0.2.9 V Beta"
18
  self.instances = []
19
  self.instances_health = {}
20
  self.polling_interval = polling_interval
21
  self.max_retries = max_retries
22
  self.initial_delay = initial_delay
23
- self.stop_event = asyncio.Event()
24
  self.instances_api = InstancesAPI(self.instances)
25
  self.CACHE_DIR = cache_dir
26
  self.INDEX_FILE = index_file
@@ -39,13 +40,18 @@ class LoadBalancer:
39
  indexer()
40
 
41
  # Load the file structure JSON
42
- asyncio.run(self.load_file_structure())
 
 
 
 
 
43
 
44
- # Start polling and file checking in separate tasks
45
- asyncio.create_task(self.start_polling())
46
- asyncio.create_task(self.check_file_updates())
47
 
48
- async def load_file_structure(self):
49
  if not os.path.exists(self.INDEX_FILE):
50
  raise FileNotFoundError(f"{self.INDEX_FILE} not found. Please make sure the file exists.")
51
 
@@ -53,21 +59,23 @@ class LoadBalancer:
53
  self.file_structure = json.load(f)
54
  logging.info("File structure loaded successfully.")
55
 
56
- async def check_file_updates(self):
57
  while not self.stop_event.is_set():
58
  if self.index_file_last_modified != os.path.getmtime(self.INDEX_FILE):
59
  logging.info(f"{self.INDEX_FILE} has been updated. Re-indexing...")
60
  indexer() # Re-run the indexer
61
- await self.load_file_structure() # Reload the file structure
62
  self.index_file_last_modified = os.path.getmtime(self.INDEX_FILE)
63
 
64
- # Restart prefetching task
65
- if hasattr(self, 'prefetch_task') and not self.prefetch_task.done():
66
- await self.prefetch_task
67
 
68
- self.prefetch_task = asyncio.create_task(self.start_prefetching())
 
 
69
 
70
- await asyncio.sleep(120) # Check every 2 minutes
71
 
72
  def register_instance(self, instance_url):
73
  if instance_url not in self.instances:
@@ -84,8 +92,8 @@ class LoadBalancer:
84
  else:
85
  logging.info(f"Instance {instance_url} not found for removal.")
86
 
87
- async def get_reports(self):
88
- reports = await self.instances_api.fetch_reports()
89
 
90
  # Initialize temporary JSON data holders
91
  temp_film_store = {}
@@ -129,27 +137,30 @@ class LoadBalancer:
129
  logging.info("Film and TV Stores processed successfully.")
130
  self.update_instances_health(instance=instance_url, cache_size=cache_size)
131
 
132
- async def start_polling(self):
133
  logging.info("Starting polling.")
134
  while not self.stop_event.is_set():
135
- await self.get_reports()
136
- await asyncio.sleep(self.polling_interval)
137
  logging.info("Polling stopped.")
138
 
139
- async def stop_polling(self):
140
  logging.info("Stopping polling.")
141
  self.stop_event.set()
142
 
143
- async def start_prefetching(self):
144
- """Start the metadata prefetching."""
145
- await self.prefetch_metadata()
 
 
146
 
147
  def update_instances_health(self, instance, cache_size):
148
- self.instances_health[instance] = {"used": cache_size["cache_size"],
149
  "total": "50 GB"}
150
  logging.info(f"Updated instance {instance} with cache size {cache_size}")
151
 
152
- async def download_film_to_best_instance(self, title):
 
153
  """
154
  Downloads a film to the first instance that has more free space on the self.instance_health list variable.
155
  The instance_health looks like this:
@@ -176,14 +187,14 @@ class LoadBalancer:
176
  best_instance = instance_url
177
 
178
  if best_instance:
179
- result = await self.instances_api.download_film(best_instance, title)
180
  film_id = result["film_id"]
181
  status = result["status"]
182
  progress_url = f'{best_instance}/api/progress/{film_id}'
183
  response = {
184
- "film_id": film_id,
185
- "status": status,
186
- "progress_url": progress_url
187
  }
188
 
189
  return response
@@ -191,9 +202,9 @@ class LoadBalancer:
191
  logging.error("No suitable instance found for downloading the film.")
192
  return {"error": "No suitable instance found for downloading the film."}
193
 
194
- async def download_episode_to_best_instance(self, title, season, episode):
195
  """
196
- Downloads an episode to the first instance that has more free space on the self.instance_health list variable.
197
  The instance_health looks like this:
198
  {
199
  "https://unicone-studio-instance1.hf.space": {
@@ -202,9 +213,9 @@ class LoadBalancer:
202
  }
203
  }
204
  Args:
205
- title (str): The title of the TV show.
206
- season (str): The season of the TV show.
207
- episode (str): The episode of the TV show.
208
  """
209
  best_instance = None
210
  max_free_space = -1
@@ -220,22 +231,23 @@ class LoadBalancer:
220
  best_instance = instance_url
221
 
222
  if best_instance:
223
- result = await self.instances_api.download_episode(best_instance, title, season, episode)
224
  episode_id = result["episode_id"]
225
  status = result["status"]
226
  progress_url = f'{best_instance}/api/progress/{episode_id}'
227
  response = {
228
- "episode_id": episode_id,
229
- "status": status,
230
- "progress_url": progress_url
231
  }
232
 
233
  return response
234
  else:
235
- logging.error("No suitable instance found for downloading the episode.")
236
- return {"error": "No suitable instance found for downloading the episode."}
237
 
238
- async def find_movie_path(self, title):
 
239
  """Find the path of the movie in the JSON data based on the title."""
240
  for directory in self.file_structure:
241
  if directory['type'] == 'directory' and directory['path'] == 'films':
@@ -246,7 +258,7 @@ class LoadBalancer:
246
  return item['path']
247
  return None
248
 
249
- async def find_tv_path(self, title):
250
  """Find the path of the TV show in the JSON data based on the title."""
251
  for directory in self.file_structure:
252
  if directory['type'] == 'directory' and directory['path'] == 'tv':
@@ -255,7 +267,7 @@ class LoadBalancer:
255
  return sub_directory['path']
256
  return None
257
 
258
- async def get_tv_structure(self, title):
259
  """Find the path of the TV show in the JSON data based on the title."""
260
  for directory in self.file_structure:
261
  if directory['type'] == 'directory' and directory['path'] == 'tv':
@@ -264,11 +276,11 @@ class LoadBalancer:
264
  return sub_directory
265
  return None
266
 
267
- async def get_film_id(self, title):
268
  """Generate a film ID based on the title."""
269
  return title.replace(" ", "_").lower()
270
 
271
- async def prefetch_metadata(self):
272
  """Prefetch metadata for all items in the file structure."""
273
  for item in self.file_structure:
274
  if 'contents' in item:
@@ -291,9 +303,9 @@ class LoadBalancer:
291
  title = parts[0].strip()
292
  year = int(parts[-1])
293
 
294
- await fetch_and_cache_json(original_title, title, media_type, year)
295
 
296
- async def get_all_tv_shows(self):
297
  """Get all TV shows from the indexed cache structure JSON file."""
298
  tv_shows = {}
299
  for directory in self.file_structure:
@@ -314,7 +326,7 @@ class LoadBalancer:
314
  })
315
  return tv_shows
316
 
317
- async def get_all_films(self):
318
  """Get all films from the indexed cache structure JSON file."""
319
  films = []
320
  for directory in self.file_structure:
@@ -322,4 +334,4 @@ class LoadBalancer:
322
  for sub_directory in directory['contents']:
323
  if sub_directory['type'] == 'directory':
324
  films.append(sub_directory['path'])
325
- return films
 
1
  import os
2
  import json
 
 
 
3
  from indexer import indexer
4
+ import re
5
  from tvdb import fetch_and_cache_json
6
+ from threading import Event, Thread
7
+ import time
8
+ import logging
9
  from utils import convert_to_gb
10
  from api import InstancesAPI
11
 
 
15
 
16
  class LoadBalancer:
17
  def __init__(self, cache_dir, index_file, token, repo, polling_interval=4, max_retries=3, initial_delay=1):
18
+ self.version = "0.0.2.7 V Beta"
19
  self.instances = []
20
  self.instances_health = {}
21
  self.polling_interval = polling_interval
22
  self.max_retries = max_retries
23
  self.initial_delay = initial_delay
24
+ self.stop_event = Event()
25
  self.instances_api = InstancesAPI(self.instances)
26
  self.CACHE_DIR = cache_dir
27
  self.INDEX_FILE = index_file
 
40
  indexer()
41
 
42
  # Load the file structure JSON
43
+ self.load_file_structure()
44
+
45
+ # Start polling and file checking in separate threads
46
+ polling_thread = Thread(target=self.start_polling)
47
+ polling_thread.daemon = True
48
+ polling_thread.start()
49
 
50
+ file_checking_thread = Thread(target=self.check_file_updates)
51
+ file_checking_thread.daemon = True
52
+ file_checking_thread.start()
53
 
54
+ def load_file_structure(self):
55
  if not os.path.exists(self.INDEX_FILE):
56
  raise FileNotFoundError(f"{self.INDEX_FILE} not found. Please make sure the file exists.")
57
 
 
59
  self.file_structure = json.load(f)
60
  logging.info("File structure loaded successfully.")
61
 
62
+ def check_file_updates(self):
63
  while not self.stop_event.is_set():
64
  if self.index_file_last_modified != os.path.getmtime(self.INDEX_FILE):
65
  logging.info(f"{self.INDEX_FILE} has been updated. Re-indexing...")
66
  indexer() # Re-run the indexer
67
+ self.load_file_structure() # Reload the file structure
68
  self.index_file_last_modified = os.path.getmtime(self.INDEX_FILE)
69
 
70
+ # Restart prefetching thread
71
+ if hasattr(self, 'prefetch_thread') and self.prefetch_thread.is_alive():
72
+ self.prefetch_thread.join()
73
 
74
+ self.prefetch_thread = Thread(target=self.start_prefetching)
75
+ self.prefetch_thread.daemon = True
76
+ self.prefetch_thread.start()
77
 
78
+ time.sleep(120) # Check every 2 minutes
79
 
80
  def register_instance(self, instance_url):
81
  if instance_url not in self.instances:
 
92
  else:
93
  logging.info(f"Instance {instance_url} not found for removal.")
94
 
95
+ def get_reports(self):
96
+ reports = self.instances_api.fetch_reports()
97
 
98
  # Initialize temporary JSON data holders
99
  temp_film_store = {}
 
137
  logging.info("Film and TV Stores processed successfully.")
138
  self.update_instances_health(instance=instance_url, cache_size=cache_size)
139
 
140
+ def start_polling(self):
141
  logging.info("Starting polling.")
142
  while not self.stop_event.is_set():
143
+ self.get_reports()
144
+ time.sleep(self.polling_interval)
145
  logging.info("Polling stopped.")
146
 
147
+ def stop_polling(self):
148
  logging.info("Stopping polling.")
149
  self.stop_event.set()
150
 
151
+ def start_prefetching(self):
152
+ """Start the metadata prefetching in a separate thread."""
153
+ self.prefetch_metadata()
154
+
155
+ #################################################################
156
 
157
  def update_instances_health(self, instance, cache_size):
158
+ self.instances_health[instance] = {"used":cache_size["cache_size"],
159
  "total": "50 GB"}
160
  logging.info(f"Updated instance {instance} with cache size {cache_size}")
161
 
162
+
163
+ def download_film_to_best_instance(self, title):
164
  """
165
  Downloads a film to the first instance that has more free space on the self.instance_health list variable.
166
  The instance_health looks like this:
 
187
  best_instance = instance_url
188
 
189
  if best_instance:
190
+ result = self.instances_api.download_film(best_instance, title)
191
  film_id = result["film_id"]
192
  status = result["status"]
193
  progress_url = f'{best_instance}/api/progress/{film_id}'
194
  response = {
195
+ "film_id":film_id,
196
+ "status":status,
197
+ "progress_url":progress_url
198
  }
199
 
200
  return response
 
202
  logging.error("No suitable instance found for downloading the film.")
203
  return {"error": "No suitable instance found for downloading the film."}
204
 
205
+ def download_episode_to_best_instance(self, title, season, episode):
206
  """
207
+ Downloads a episode to the first instance that has more free space on the self.instance_health list variable.
208
  The instance_health looks like this:
209
  {
210
  "https://unicone-studio-instance1.hf.space": {
 
213
  }
214
  }
215
  Args:
216
+ title (str): The title of the Tv show.
217
+ season (str): The season of the Tv show.
218
+ episode (str): The title of the Tv show.
219
  """
220
  best_instance = None
221
  max_free_space = -1
 
231
  best_instance = instance_url
232
 
233
  if best_instance:
234
+ result = self.instances_api.download_episode(best_instance, title, season, episode)
235
  episode_id = result["episode_id"]
236
  status = result["status"]
237
  progress_url = f'{best_instance}/api/progress/{episode_id}'
238
  response = {
239
+ "episode_id":episode_id,
240
+ "status":status,
241
+ "progress_url":progress_url
242
  }
243
 
244
  return response
245
  else:
246
+ logging.error("No suitable instance found for downloading the film.")
247
+ return {"error": "No suitable instance found for downloading the film."}
248
 
249
+ #################################################################
250
+ def find_movie_path(self, title):
251
  """Find the path of the movie in the JSON data based on the title."""
252
  for directory in self.file_structure:
253
  if directory['type'] == 'directory' and directory['path'] == 'films':
 
258
  return item['path']
259
  return None
260
 
261
+ def find_tv_path(self, title):
262
  """Find the path of the TV show in the JSON data based on the title."""
263
  for directory in self.file_structure:
264
  if directory['type'] == 'directory' and directory['path'] == 'tv':
 
267
  return sub_directory['path']
268
  return None
269
 
270
+ def get_tv_structure(self, title):
271
  """Find the path of the TV show in the JSON data based on the title."""
272
  for directory in self.file_structure:
273
  if directory['type'] == 'directory' and directory['path'] == 'tv':
 
276
  return sub_directory
277
  return None
278
 
279
+ def get_film_id(self, title):
280
  """Generate a film ID based on the title."""
281
  return title.replace(" ", "_").lower()
282
 
283
+ def prefetch_metadata(self):
284
  """Prefetch metadata for all items in the file structure."""
285
  for item in self.file_structure:
286
  if 'contents' in item:
 
303
  title = parts[0].strip()
304
  year = int(parts[-1])
305
 
306
+ fetch_and_cache_json(original_title, title, media_type, year)
307
 
308
+ def get_all_tv_shows(self):
309
  """Get all TV shows from the indexed cache structure JSON file."""
310
  tv_shows = {}
311
  for directory in self.file_structure:
 
326
  })
327
  return tv_shows
328
 
329
+ def get_all_films(self):
330
  """Get all films from the indexed cache structure JSON file."""
331
  films = []
332
  for directory in self.file_structure:
 
334
  for sub_directory in directory['contents']:
335
  if sub_directory['type'] == 'directory':
336
  films.append(sub_directory['path'])
337
+ return films
hf_scrapper.py CHANGED
@@ -1,19 +1,17 @@
1
  import os
 
2
  import json
3
- import aiohttp
4
- import asyncio
5
- import aiofiles
6
  import urllib.request
7
- from aiohttp import ClientSession, ClientTimeout
8
- from aiohttp.client_exceptions import ClientError
9
- from tqdm.asyncio import tqdm
10
 
11
  CACHE_DIR = os.getenv("CACHE_DIR")
12
  CACHE_JSON_PATH = os.path.join(CACHE_DIR, "cached_films.json")
13
 
14
  download_progress = {}
15
 
16
- async def get_system_proxies():
17
  """
18
  Retrieves the system's HTTP and HTTPS proxies.
19
 
@@ -31,7 +29,7 @@ async def get_system_proxies():
31
  print(f"Error getting system proxies: {e}")
32
  return {}
33
 
34
- async def get_file_structure(repo, token, path="", proxies=None):
35
  """
36
  Fetches the file structure of a specified Hugging Face repository.
37
 
@@ -46,18 +44,16 @@ async def get_file_structure(repo, token, path="", proxies=None):
46
  """
47
  api_url = f"https://huggingface.co/api/models/{repo}/tree/main/{path}"
48
  headers = {'Authorization': f'Bearer {token}'}
49
- timeout = ClientTimeout(total=10)
50
- async with ClientSession(timeout=timeout) as session:
51
- print(f"Fetching file structure from URL: {api_url} with proxies: {proxies}")
52
- try:
53
- async with session.get(api_url, headers=headers, proxy=proxies.get("http")) as response:
54
- response.raise_for_status()
55
- return await response.json()
56
- except ClientError as e:
57
- print(f"Error fetching file structure: {e}")
58
- return []
59
 
60
- async def write_file_structure_to_json(file_structure, file_path):
61
  """
62
  Writes the file structure to a JSON file.
63
 
@@ -66,8 +62,12 @@ async def write_file_structure_to_json(file_structure, file_path):
66
  file_path (str): The path where the JSON file will be saved.
67
  """
68
  try:
69
- async with aiofiles.open(file_path, 'w') as json_file:
70
- await json_file.write(json.dumps(file_structure, indent=2))
71
  print(f'File structure written to {file_path}')
72
  except IOError as e:
73
  print(f"Error writing file structure to JSON: {e}")
 
 
 
 
 
1
  import os
2
+ import requests
3
  import json
 
 
 
4
  import urllib.request
5
+ import time
6
+ from requests.exceptions import RequestException
7
+ from tqdm import tqdm
8
 
9
  CACHE_DIR = os.getenv("CACHE_DIR")
10
  CACHE_JSON_PATH = os.path.join(CACHE_DIR, "cached_films.json")
11
 
12
  download_progress = {}
13
 
14
+ def get_system_proxies():
15
  """
16
  Retrieves the system's HTTP and HTTPS proxies.
17
 
 
29
  print(f"Error getting system proxies: {e}")
30
  return {}
31
 
32
+ def get_file_structure(repo, token, path="", proxies=None):
33
  """
34
  Fetches the file structure of a specified Hugging Face repository.
35
 
 
44
  """
45
  api_url = f"https://huggingface.co/api/models/{repo}/tree/main/{path}"
46
  headers = {'Authorization': f'Bearer {token}'}
47
+ print(f"Fetching file structure from URL: {api_url} with proxies: {proxies}")
48
+ try:
49
+ response = requests.get(api_url, headers=headers, proxies=proxies)
50
+ response.raise_for_status()
51
+ return response.json()
52
+ except RequestException as e:
53
+ print(f"Error fetching file structure: {e}")
54
+ return []
 
 
55
 
56
+ def write_file_structure_to_json(file_structure, file_path):
57
  """
58
  Writes the file structure to a JSON file.
59
 
 
62
  file_path (str): The path where the JSON file will be saved.
63
  """
64
  try:
65
+ with open(file_path, 'w') as json_file:
66
+ json.dump(file_structure, json_file, indent=2)
67
  print(f'File structure written to {file_path}')
68
  except IOError as e:
69
  print(f"Error writing file structure to JSON: {e}")
70
+
71
+ if __name__ == "__main__":
72
+ file_url = "https://huggingface.co/Unicone-Studio/jellyfin_media/resolve/main/films/Funky%20Monkey%202004/Funky%20Monkey%20(2004)%20Web-dl%201080p.mp4"
73
+ token = os.getenv("TOKEN")
indexer.py CHANGED
@@ -1,45 +1,31 @@
1
  import json
2
- import logging
3
- import asyncio
4
  from hf_scrapper import get_system_proxies, get_file_structure, write_file_structure_to_json
5
  from dotenv import load_dotenv
6
  import os
7
 
8
  load_dotenv()
9
 
10
- async def index_repository(token, repo, current_path="", proxies=None):
11
- try:
12
- file_structure = await get_file_structure(repo, token, current_path, proxies)
13
- full_structure = []
14
- for item in file_structure:
15
- if item['type'] == 'directory':
16
- sub_directory_structure = await index_repository(token, repo, item['path'], proxies)
17
- full_structure.append({
18
- "type": "directory",
19
- "path": item['path'],
20
- "contents": sub_directory_structure
21
- })
22
- else:
23
- full_structure.append(item)
24
- return full_structure
25
- except Exception as e:
26
- logging.error(f"Error indexing repository: {e}")
27
- raise
28
 
29
- async def indexer():
30
  token = os.getenv("TOKEN")
31
  repo = os.getenv("REPO")
32
  output_path = os.getenv("INDEX_FILE")
33
-
34
- if not token or not repo or not output_path:
35
- logging.error("Environment variables TOKEN, REPO, or INDEX_FILE are not set.")
36
- return
37
-
38
- proxies = await get_system_proxies()
39
-
40
- try:
41
- full_structure = await index_repository(token, repo, "", proxies)
42
- await write_file_structure_to_json(full_structure, output_path)
43
- logging.info(f"Full file structure for repository '{repo}' has been indexed and saved to {output_path}")
44
- except Exception as e:
45
- logging.error(f"Error during indexing: {e}")
 
1
  import json
 
 
2
  from hf_scrapper import get_system_proxies, get_file_structure, write_file_structure_to_json
3
  from dotenv import load_dotenv
4
  import os
5
 
6
  load_dotenv()
7
 
8
+ def index_repository(token, repo, current_path="", proxies=None):
9
+ file_structure = get_file_structure(repo, token, current_path, proxies)
10
+ full_structure = []
11
+ for item in file_structure:
12
+ if item['type'] == 'directory':
13
+ sub_directory_structure = index_repository(token, repo, item['path'], proxies)
14
+ full_structure.append({
15
+ "type": "directory",
16
+ "path": item['path'],
17
+ "contents": sub_directory_structure
18
+ })
19
+ else:
20
+ full_structure.append(item)
21
+ return full_structure
 
 
 
 
22
 
23
+ def indexer():
24
  token = os.getenv("TOKEN")
25
  repo = os.getenv("REPO")
26
  output_path = os.getenv("INDEX_FILE")
27
+ proxies = get_system_proxies()
28
+ full_structure = index_repository(token, repo, "", proxies)
29
+ write_file_structure_to_json(full_structure, output_path)
30
+ print(f"Full file structure for repository '{repo}' has been indexed and saved to {output_path}")
31
+