sohojoe commited on
Commit
80eea9e
1 Parent(s): 31bfb8d

added pipeline test for a simple pipeline

Browse files
Files changed (1) hide show
  1. pipeline_test.py +82 -0
pipeline_test.py ADDED
@@ -0,0 +1,82 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import random
3
+ import time
4
+
5
+
6
+ class Job:
7
+ def __init__(self, id, data):
8
+ self.id = id
9
+ self.data = data
10
+
11
+
12
+ async def node1(worker_id: int, input_queue, output_queue):
13
+ while True:
14
+ job:Job = await input_queue.get()
15
+ job.data += f' (processed by node 1, worker {worker_id})'
16
+ await output_queue.put(job)
17
+
18
+ async def node2(worker_id: int, input_queue, output_queue):
19
+ while True:
20
+ job:Job = await input_queue.get()
21
+ sleep_duration = 0.8 + 0.4 * random.random() # Generate a random sleep duration between 0.8 and 1.2 seconds
22
+ await asyncio.sleep(sleep_duration)
23
+ job.data += f' (processed by node 2, worker {worker_id})'
24
+ await output_queue.put(job)
25
+
26
+ async def node3(worker_id: int, input_queue, job_sync):
27
+ buffer = {}
28
+ next_i = 0
29
+ while True:
30
+ job:Job = await input_queue.get()
31
+ buffer[job.id] = job # Store the data in the buffer
32
+ # While the next expected item is in the buffer, output it and increment the index
33
+ while next_i in buffer:
34
+ curr_job = buffer.pop(next_i)
35
+ curr_job.data += f' (processed by node 3, worker {worker_id})'
36
+ print(f'{curr_job.id} - {curr_job.data}')
37
+ next_i += 1
38
+ job_sync.append(curr_job)
39
+
40
+ async def main():
41
+ input_queue = asyncio.Queue()
42
+ buffer_queue = asyncio.Queue()
43
+ output_queue = asyncio.Queue()
44
+
45
+ num_jobs = 100
46
+ joe_source = [Job(i, "") for i in range(num_jobs)]
47
+ job_sync = []
48
+
49
+ task1 = asyncio.create_task(node1(None, input_queue, buffer_queue))
50
+ task3 = asyncio.create_task(node3(None, output_queue, job_sync))
51
+
52
+ num_workers = 5
53
+ tasks2 = []
54
+ for i in range(num_workers):
55
+ task2 = asyncio.create_task(node2(i + 1, buffer_queue, output_queue))
56
+ tasks2.append(task2)
57
+
58
+ for job in joe_source:
59
+ await input_queue.put(job)
60
+
61
+ try:
62
+ # await asyncio.gather(task1, *tasks2, task3)
63
+ while len(job_sync) < num_jobs:
64
+ await asyncio.sleep(0.1)
65
+ except asyncio.CancelledError:
66
+ print("Pipeline cancelled")
67
+ task1.cancel()
68
+ for task in tasks2:
69
+ task.cancel()
70
+ task3.cancel()
71
+ await asyncio.gather(task1, *tasks2, task3, return_exceptions=True)
72
+
73
+
74
+ start_time = time.time()
75
+
76
+ try:
77
+ asyncio.run(main())
78
+ except KeyboardInterrupt:
79
+ print("Pipeline interrupted by user")
80
+
81
+ end_time = time.time()
82
+ print(f"Pipeline processed in {end_time - start_time} seconds.")