We use heapq to make a priority queue:
# using heapq from the standard library
import heapq
class PriorityQueue:
def __init__(self):
self._queue = []
def push(self, item, priority):
heapq.heappush(self._queue, (priority, item))
def pop(self):
if not self.is_empty():
return heapq.heappop(self._queue)[1]
else:
raise IndexError("pop from an empty priority queue")
def is_empty(self):
return len(self._queue) == 0
# example usage
priority_queue = PriorityQueue()
priority_queue.push("Task 1", 3)
priority_queue.push("Task 2", 1)
priority_queue.push("Task 3", 2)
while not priority_queue.is_empty():
print(priority_queue.pop())
For a second based timer:
# priority queue based timer
import heapq
import time
class Timer:
def __init__(self):
self._queue = []
def schedule(self, callback, delay):
execution_time = time.time() + delay
heapq.heappush(self._queue, (execution_time, callback))
def run_pending(self):
current_time = time.time()
while self._queue and self._queue[0][0] <= current_time:
_, callback = heapq.heappop(self._queue)
callback()
def is_empty(self):
return len(self._queue) == 0
def callback1():
print("Callback 1 executed")
def callback2():
print("Callback 2 executed")
timer = Timer()
timer.schedule(callback1, 2)
timer.schedule(callback2, 7)
while not timer.is_empty():
timer.run_pending()
time.sleep(1)
For a millisecond based timer:
# priority queue based timer
import heapq
import time
class Timer:
def __init__(self):
self._queue = []
def schedule(self, callback, delay):
ms = time.time_ns()/10**6
execution_time = ms + delay
heapq.heappush(self._queue, (execution_time, callback))
def run_pending(self):
ms = time.time_ns()/10**6
while self._queue and self._queue[0][0] <= ms:
_, callback = heapq.heappop(self._queue)
callback()
def is_empty(self):
return len(self._queue) == 0
def callback1():
print("Callback 1 executed")
def callback2():
print("Callback 2 executed")
timer = Timer()
timer.schedule(callback1, 200)
timer.schedule(callback2, 700)
timer.schedule(callback2, 2700)
while not timer.is_empty():
timer.run_pending()
time.sleep(1/1000)
Note that I don't know how accurate these are.