from multiprocessing import Process, Queue, Event import time import uuid import os NB_WORKERS = 3 NB_MAILS_PER_5_SECONDS = 2 MAIL_MANAGEMENT_DURATION_SECONDS = .2 # reduced so we do not timeout class StopSentinel: pass def list_new_mails_id(): for i in range(NB_MAILS_PER_5_SECONDS): # fake mailbox msg list yield str(uuid.uuid1()) def mail_worker(msg_ids, event): pid = os.getpid() print(f"Starting worker PID = {pid} and queue={msg_ids} queue size={msg_ids.qsize()} ...") while True: if event.is_set(): break # terminate # prevent lost messages by preventing the main print(f"[{pid}] Waiting a mail to manage...") msg_id = msg_ids.get() if isinstance(msg_id, StopSentinel): break print(f"[{pid}] managing mail msg_id = {msg_id} ...") # here should read mail msg_id and remove it from mailbox when finish print(f"[{pid}] --> fake duration of {MAIL_MANAGEMENT_DURATION_SECONDS}s") time.sleep(MAIL_MANAGEMENT_DURATION_SECONDS) if __name__ == "__main__": msg_id_queue = Queue() events = [Event() for _ in range(NB_WORKERS)] processes = [Process(target=mail_worker, args=(msg_id_queue, events[i])) for i in range(NB_WORKERS)] for p in processes: p.start() for iteration_number in range(5): # so we eventually terminate for msg_id in list_new_mails_id(): msg_id_queue.put(msg_id) print("\nWaiting for new mails to come...\n") time.sleep(.2) # Reduced so we do not timeout # Kill some processes from time to time if iteration_number == 1: # Kill the first process print('terminating process 0') events[0].set() elif iteration_number == 2: # Kill the first process print('terminating process 1') events[1].set() # Put enough stop sentinels in the queue (extras are okay) stop_sentinel = StopSentinel() for _ in range(NB_WORKERS): msg_id_queue.put(stop_sentinel) for p in processes: p.join()# your code goes here print('Done!')
Standard input is empty
Starting worker PID = 25186 and queue=<multiprocessing.queues.Queue object at 0x14f3f7f09520> queue size=0 ... [25186] Waiting a mail to manage... [25186] managing mail msg_id = 58b33cca-c0f6-11ed-9fad-1b0a51668dc0 ... [25186] --> fake duration of 0.2s Starting worker PID = 25187 and queue=<multiprocessing.queues.Queue object at 0x14f3f7f09520> queue size=1 ... [25187] Waiting a mail to manage... [25187] managing mail msg_id = 5894ac06-c0f6-11ed-9fad-1b0a51668dc0 ... [25187] --> fake duration of 0.2s [25187] Waiting a mail to manage... [25187] managing mail msg_id = 58d1f35e-c0f6-11ed-9fad-1b0a51668dc0 ... [25187] --> fake duration of 0.2s Starting worker PID = 25188 and queue=<multiprocessing.queues.Queue object at 0x14f3f7f09520> queue size=2 ... [25188] Waiting a mail to manage... [25188] managing mail msg_id = 58948ce4-c0f6-11ed-9fad-1b0a51668dc0 ... [25188] --> fake duration of 0.2s [25188] Waiting a mail to manage... [25188] managing mail msg_id = 58b34116-c0f6-11ed-9fad-1b0a51668dc0 ... [25188] --> fake duration of 0.2s [25188] Waiting a mail to manage... [25188] managing mail msg_id = 58d1f6f6-c0f6-11ed-9fad-1b0a51668dc0 ... [25188] --> fake duration of 0.2s [25188] Waiting a mail to manage... [25188] managing mail msg_id = 58f08eb8-c0f6-11ed-9fad-1b0a51668dc0 ... [25188] --> fake duration of 0.2s [25188] Waiting a mail to manage... [25188] managing mail msg_id = 58f091ec-c0f6-11ed-9fad-1b0a51668dc0 ... [25188] --> fake duration of 0.2s [25188] Waiting a mail to manage... [25188] managing mail msg_id = 590f1c48-c0f6-11ed-9fad-1b0a51668dc0 ... [25188] --> fake duration of 0.2s [25188] Waiting a mail to manage... [25188] managing mail msg_id = 590f2058-c0f6-11ed-9fad-1b0a51668dc0 ... [25188] --> fake duration of 0.2s [25188] Waiting a mail to manage... Waiting for new mails to come... Waiting for new mails to come... terminating process 0 Waiting for new mails to come... terminating process 1 Waiting for new mails to come... Waiting for new mails to come... Done!