fork download
  1. import os, pickle
  2. from multiprocessing import Pool, Lock, synchronize, get_context, Process
  3. import multiprocessing.queues
  4. import sys
  5. _is_windows= sys.platform == 'win32'
  6. if _is_windows:
  7. import _winapi
  8.  
  9. def work(q, reply_q):
  10. print("Worker: Main says", q.get())
  11. reply_q.put('haha')
  12.  
  13.  
  14. class DupSemLockHandle(object):
  15. """
  16. Picklable wrapper for a handle. Attempts to mirror how PipeConnection objects are pickled using appropriate api
  17. """
  18.  
  19. def __init__(self, handle, pid=None):
  20. if pid is None:
  21. # We just duplicate the handle in the current process and
  22. # let the receiving process steal the handle.
  23. pid = os.getpid()
  24. if _is_windows:
  25. proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
  26. try:
  27. self._handle = _winapi.DuplicateHandle(
  28. _winapi.GetCurrentProcess(),
  29. handle, proc, 0, False, _winapi.DUPLICATE_SAME_ACCESS)
  30. finally:
  31. _winapi.CloseHandle(proc)
  32. else:
  33. self._handle = handle
  34. self._pid = pid
  35.  
  36. def detach(self):
  37. """
  38. Get the handle, typically from another process
  39. """
  40. # retrieve handle from process which currently owns it
  41. if self._pid == os.getpid():
  42. # The handle has already been duplicated for this process.
  43. return self._handle
  44.  
  45. if not _is_windows:
  46. return self._handle
  47.  
  48. # We must steal the handle from the process whose pid is self._pid.
  49. proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
  50. self._pid)
  51. try:
  52. return _winapi.DuplicateHandle(
  53. proc, self._handle, _winapi.GetCurrentProcess(),
  54. 0, False, _winapi.DUPLICATE_CLOSE_SOURCE | _winapi.DUPLICATE_SAME_ACCESS)
  55. finally:
  56. _winapi.CloseHandle(proc)
  57.  
  58.  
  59. def reduce_lock_connection(self):
  60. sl = self._semlock
  61. dh = DupSemLockHandle(sl.handle)
  62. return rebuild_lock_connection, (dh, type(self), (sl.kind, sl.maxvalue, sl.name))
  63.  
  64.  
  65. def rebuild_lock_connection(dh, t, state):
  66. handle = dh.detach() # Duplicated handle valid in current process's context
  67.  
  68. # Create a new instance without calling __init__ because we'll supply the state ourselves
  69. lck = t.__new__(t)
  70. lck.__setstate__((handle,)+state)
  71. return lck
  72.  
  73.  
  74. # Add our own reduce function to pickle SemLock and it's child classes
  75. synchronize.SemLock.__reduce__ = reduce_lock_connection
  76.  
  77.  
  78. class PicklableQueue(multiprocessing.queues.Queue):
  79. """
  80. A picklable Queue that skips the call to context.assert_spawning because it's no longer needed
  81. """
  82.  
  83. def __init__(self, *args, **kwargs):
  84. ctx = get_context()
  85. super().__init__(*args, **kwargs, ctx=ctx)
  86.  
  87. def __getstate__(self):
  88.  
  89. return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
  90. self._rlock, self._wlock, self._sem, self._opid)
  91.  
  92. def is_locked(l):
  93. """
  94. Returns whether the given lock is acquired or not.
  95. """
  96. locked = l.acquire(block=False)
  97. if locked is False:
  98. return True
  99. else:
  100. l.release()
  101. return False
  102.  
  103.  
  104. if __name__ == '__main__':
  105. from multiprocessing import set_start_method
  106. set_start_method('spawn')
  107.  
  108. print(f'The platform is {sys.platform}.')
  109.  
  110. # Example that shows that you can now pickle/unpickle locks and they'll still point towards the same object
  111. l1 = Lock()
  112. p = pickle.dumps(l1)
  113. l2 = pickle.loads(p)
  114. print('before acquiring, l1 locked:', is_locked(l1), 'l2 locked', is_locked(l2))
  115. l2.acquire()
  116. print('after acquiring l1 locked:', is_locked(l1), 'l2 locked', is_locked(l2))
  117.  
  118. # Putting a queue to a queue:
  119. q1 = PicklableQueue()
  120. q1.put('Putting a queue to a queue!')
  121. q2 = PicklableQueue()
  122. q2.put(q1)
  123. print(q2.get().get())
  124.  
  125. # This should still work:
  126. q = PicklableQueue()
  127. reply_q = PicklableQueue()
  128. q.put('It still works!')
  129. p = Process(target=work, args=(q, reply_q))
  130. p.start()
  131. print(reply_q.get())
  132. p.join()
  133.  
  134. # Example that shows how you can pass a queue to Pool and it will work:
  135. pool = Pool(8)
  136. for _ in range(8):
  137. q.put('laugh')
  138. for _ in range(8):
  139. pool.apply_async(work, (q, reply_q))
  140. for _ in range(8):
  141. print(reply_q.get())
  142. pool.close()
  143. pool.join()
  144. print('All pool-submitted tasks are complete!')
Success #stdin #stdout 0.59s 15256KB
stdin
Standard input is empty
stdout
The platform is linux.
before acquiring, l1 locked: False l2 locked False
after acquiring l1 locked: True l2 locked True
Putting a queue to a queue!
Worker: Main says It still works!
haha
Worker: Main says laugh
Worker: Main says laugh
Worker: Main says laugh
Worker: Main says laugh
Worker: Main says laugh
Worker: Main says laugh
Worker: Main says laugh
Worker: Main says laugh
haha
haha
haha
haha
haha
haha
haha
haha
All pool-submitted tasks are complete!