fork download
  1. from multiprocessing import Process, Queue, Event
  2. import time
  3. import uuid
  4. import os
  5.  
  6.  
  7. NB_WORKERS = 3
  8. NB_MAILS_PER_5_SECONDS = 2
  9. MAIL_MANAGEMENT_DURATION_SECONDS = .2 # reduced so we do not timeout
  10.  
  11. class StopSentinel:
  12. pass
  13.  
  14. def list_new_mails_id():
  15. for i in range(NB_MAILS_PER_5_SECONDS):
  16. # fake mailbox msg list
  17. yield str(uuid.uuid1())
  18.  
  19. def mail_worker(msg_ids, event):
  20. pid = os.getpid()
  21. print(f"Starting worker PID = {pid} and queue={msg_ids} queue size={msg_ids.qsize()} ...")
  22. while True:
  23. if event.is_set():
  24. break # terminate
  25. # prevent lost messages by preventing the main
  26. print(f"[{pid}] Waiting a mail to manage...")
  27. msg_id = msg_ids.get()
  28. if isinstance(msg_id, StopSentinel):
  29. break
  30. print(f"[{pid}] managing mail msg_id = {msg_id} ...")
  31. # here should read mail msg_id and remove it from mailbox when finish
  32. print(f"[{pid}] --> fake duration of {MAIL_MANAGEMENT_DURATION_SECONDS}s")
  33. time.sleep(MAIL_MANAGEMENT_DURATION_SECONDS)
  34.  
  35.  
  36. if __name__ == "__main__":
  37. msg_id_queue = Queue()
  38. events = [Event() for _ in range(NB_WORKERS)]
  39. processes = [Process(target=mail_worker, args=(msg_id_queue, events[i])) for i in range(NB_WORKERS)]
  40. for p in processes:
  41. p.start()
  42.  
  43. for iteration_number in range(5): # so we eventually terminate
  44. for msg_id in list_new_mails_id():
  45. msg_id_queue.put(msg_id)
  46. print("\nWaiting for new mails to come...\n")
  47. time.sleep(.2) # Reduced so we do not timeout
  48. # Kill some processes from time to time
  49. if iteration_number == 1:
  50. # Kill the first process
  51. print('terminating process 0')
  52. events[0].set()
  53. elif iteration_number == 2:
  54. # Kill the first process
  55. print('terminating process 1')
  56. events[1].set()
  57.  
  58. # Put enough stop sentinels in the queue (extras are okay)
  59. stop_sentinel = StopSentinel()
  60. for _ in range(NB_WORKERS):
  61. msg_id_queue.put(stop_sentinel)
  62. for p in processes:
  63. p.join()# your code goes here
  64. print('Done!')
Success #stdin #stdout 0.08s 12908KB
stdin
Standard input is empty
stdout
Starting worker PID = 25186 and queue=<multiprocessing.queues.Queue object at 0x14f3f7f09520> queue size=0 ...
[25186] Waiting a mail to manage...
[25186] managing mail msg_id = 58b33cca-c0f6-11ed-9fad-1b0a51668dc0 ...
[25186] --> fake duration of 0.2s
Starting worker PID = 25187 and queue=<multiprocessing.queues.Queue object at 0x14f3f7f09520> queue size=1 ...
[25187] Waiting a mail to manage...
[25187] managing mail msg_id = 5894ac06-c0f6-11ed-9fad-1b0a51668dc0 ...
[25187] --> fake duration of 0.2s
[25187] Waiting a mail to manage...
[25187] managing mail msg_id = 58d1f35e-c0f6-11ed-9fad-1b0a51668dc0 ...
[25187] --> fake duration of 0.2s
Starting worker PID = 25188 and queue=<multiprocessing.queues.Queue object at 0x14f3f7f09520> queue size=2 ...
[25188] Waiting a mail to manage...
[25188] managing mail msg_id = 58948ce4-c0f6-11ed-9fad-1b0a51668dc0 ...
[25188] --> fake duration of 0.2s
[25188] Waiting a mail to manage...
[25188] managing mail msg_id = 58b34116-c0f6-11ed-9fad-1b0a51668dc0 ...
[25188] --> fake duration of 0.2s
[25188] Waiting a mail to manage...
[25188] managing mail msg_id = 58d1f6f6-c0f6-11ed-9fad-1b0a51668dc0 ...
[25188] --> fake duration of 0.2s
[25188] Waiting a mail to manage...
[25188] managing mail msg_id = 58f08eb8-c0f6-11ed-9fad-1b0a51668dc0 ...
[25188] --> fake duration of 0.2s
[25188] Waiting a mail to manage...
[25188] managing mail msg_id = 58f091ec-c0f6-11ed-9fad-1b0a51668dc0 ...
[25188] --> fake duration of 0.2s
[25188] Waiting a mail to manage...
[25188] managing mail msg_id = 590f1c48-c0f6-11ed-9fad-1b0a51668dc0 ...
[25188] --> fake duration of 0.2s
[25188] Waiting a mail to manage...
[25188] managing mail msg_id = 590f2058-c0f6-11ed-9fad-1b0a51668dc0 ...
[25188] --> fake duration of 0.2s
[25188] Waiting a mail to manage...

Waiting for new mails to come...


Waiting for new mails to come...

terminating process 0

Waiting for new mails to come...

terminating process 1

Waiting for new mails to come...


Waiting for new mails to come...

Done!