fork(1) download
  1. import os, pickle
  2. from multiprocessing import Pool, Lock, synchronize, get_context, Process
  3. import multiprocessing.queues
  4. import sys
  5. _is_windows= sys.platform == 'win32'
  6. if _is_windows:
  7. import _winapi
  8.  
  9. def work(q, reply_q):
  10. print("Worker: Main says", q.get())
  11. reply_q.put('haha')
  12.  
  13.  
  14. class DupSemLockHandle(object):
  15. """
  16. Picklable wrapper for a handle. Attempts to mirror how PipeConnection objects are pickled using appropriate api
  17. """
  18.  
  19. def __init__(self, handle, pid=None):
  20. if pid is None:
  21. # We just duplicate the handle in the current process and
  22. # let the receiving process steal the handle.
  23. pid = os.getpid()
  24. if _is_windows:
  25. proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
  26. try:
  27. self._handle = _winapi.DuplicateHandle(
  28. _winapi.GetCurrentProcess(),
  29. handle, proc, 0, False, _winapi.DUPLICATE_SAME_ACCESS)
  30. finally:
  31. _winapi.CloseHandle(proc)
  32. else:
  33. self._handle = handle
  34. self._pid = pid
  35.  
  36. def detach(self):
  37. """
  38. Get the handle, typically from another process
  39. """
  40. # retrieve handle from process which currently owns it
  41. if self._pid == os.getpid():
  42. # The handle has already been duplicated for this process.
  43. return self._handle
  44.  
  45. if not _is_windows:
  46. return self._handle
  47.  
  48. # We must steal the handle from the process whose pid is self._pid.
  49. proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
  50. self._pid)
  51. try:
  52. return _winapi.DuplicateHandle(
  53. proc, self._handle, _winapi.GetCurrentProcess(),
  54. 0, False, _winapi.DUPLICATE_CLOSE_SOURCE | _winapi.DUPLICATE_SAME_ACCESS)
  55. finally:
  56. _winapi.CloseHandle(proc)
  57.  
  58.  
  59. def reduce_lock_connection(self):
  60. sl = self._semlock
  61. dh = DupSemLockHandle(sl.handle)
  62. return rebuild_lock_connection, (dh, type(self), (sl.kind, sl.maxvalue, sl.name))
  63.  
  64.  
  65. def rebuild_lock_connection(dh, t, state):
  66. handle = dh.detach() # Duplicated handle valid in current process's context
  67.  
  68. # Create a new instance without calling __init__ because we'll supply the state ourselves
  69. lck = t.__new__(t)
  70. lck.__setstate__((handle,)+state)
  71. return lck
  72.  
  73.  
  74. # Add our own reduce function to pickle SemLock and it's child classes
  75. synchronize.SemLock.__reduce__ = reduce_lock_connection
  76.  
  77.  
  78. class PicklableQueue(multiprocessing.queues.Queue):
  79. """
  80. A picklable Queue that skips the call to context.assert_spawning because it's no longer needed
  81. """
  82.  
  83. def __init__(self, *args, **kwargs):
  84. ctx = get_context()
  85. super().__init__(*args, **kwargs, ctx=ctx)
  86.  
  87. def __getstate__(self):
  88.  
  89. return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
  90. self._rlock, self._wlock, self._sem, self._opid)
  91.  
  92. def is_locked(l):
  93. """
  94. Returns whether the given lock is acquired or not.
  95. """
  96. locked = l.acquire(block=False)
  97. if locked is False:
  98. return True
  99. else:
  100. l.release()
  101. return False
  102.  
  103.  
  104. if __name__ == '__main__':
  105. """
  106. from multiprocessing import set_start_method
  107. set_start_method('spawn')
  108. """
  109.  
  110. print(f'The platform is {sys.platform}.')
  111.  
  112. # Example that shows that you can now pickle/unpickle locks and they'll still point towards the same object
  113. l1 = Lock()
  114. p = pickle.dumps(l1)
  115. l2 = pickle.loads(p)
  116. print('before acquiring, l1 locked:', is_locked(l1), 'l2 locked', is_locked(l2))
  117. l2.acquire()
  118. print('after acquiring l1 locked:', is_locked(l1), 'l2 locked', is_locked(l2))
  119.  
  120. # Putting a queue to a queue:
  121. q1 = PicklableQueue()
  122. q1.put('Putting a queue to a queue!')
  123. q2 = PicklableQueue()
  124. q2.put(q1)
  125. print(q2.get().get())
  126.  
  127. # This should still work:
  128. q = PicklableQueue()
  129. reply_q = PicklableQueue()
  130. q.put('It still works!')
  131. p = Process(target=work, args=(q, reply_q))
  132. p.start()
  133. print(reply_q.get())
  134. p.join()
  135.  
  136. # Example that shows that you can now pickle/unpickle locks and they'll still point towards the same object
  137. # Example that shows how you can pass a queue to Pool and it will work
  138. with Pool(1) as pool:
  139. q.put('laugh')
  140. pool.apply(work, (q, reply_q))
  141. print(reply_q.get())
Success #stdin #stdout 0.07s 15128KB
stdin
Standard input is empty
stdout
The platform is linux.
before acquiring, l1 locked: False l2 locked False
after acquiring l1 locked: True l2 locked True
Putting a queue to a queue!
Worker: Main says It still works!
haha
Worker: Main says laugh
haha