fork(4) download
  1. import java.util.concurrent.*;
  2.  
  3. class CSPExample {
  4.  
  5. public static void main(String[] args) throws InterruptedException, ExecutionException {
  6.  
  7. // First we create a queue with capacity 1. This means that if there
  8. // is already an item in the queue waiting to be consumed, any other
  9. // threads wanting to add an item are blocked until it is consumed.
  10. // Also, if the consumer tries to get an element out of the queue, but
  11. // there aren't any items, the consumer will block.
  12. BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1);
  13.  
  14. // We create an executor service, which runs our number generators and
  15. // our summing process. They all get the same queue so they can
  16. // communicate with each other via messages.
  17. ExecutorService threadPool = Executors.newFixedThreadPool(4);
  18.  
  19. // The NumberGenerators generate different numbers, and wait for different
  20. // periods of time.
  21. threadPool.submit(new NumberGenerator(5, 700, queue));
  22. threadPool.submit(new NumberGenerator(7, 650, queue));
  23. threadPool.submit(new NumberGenerator(11, 400, queue));
  24.  
  25. // The SummingProcess returns the final sum, so we can get a Future<T>
  26. // that represents the answer at a future time and wait for it to finish.
  27. Future<Integer> totalSum = threadPool.submit(new SummingProcess(queue));
  28.  
  29. // Waits for the SummingProcess to finish, after it's sum is > 100
  30. Integer sumResult = totalSum.get();
  31.  
  32. System.out.println("Done! Sum was " + sumResult);
  33.  
  34. // Interrupts the other threads for shutdown. You can also shutdown
  35. // threads more gracefully with shutdown() and awaitTermination(),
  36. // but here we just want them to exit immediately.
  37. threadPool.shutdownNow();
  38. }
  39.  
  40. private static final class NumberGenerator implements Runnable {
  41.  
  42. private final int theNumberToGenerate;
  43. private final int sleepPeriod;
  44. private final BlockingQueue<Integer> queue;
  45.  
  46. public NumberGenerator(int theNumberToGenerate, int sleepPeriod, BlockingQueue<Integer> queue) {
  47. this.theNumberToGenerate = theNumberToGenerate;
  48. this.sleepPeriod = sleepPeriod;
  49. this.queue = queue;
  50. }
  51.  
  52. @Override
  53. public void run() {
  54. try {
  55. // Produce numbers indefinitely
  56. while (true) {
  57. Thread.sleep(sleepPeriod);
  58.  
  59. // puts the integer into the queue, waiting as necessary for
  60. // there to be space.
  61. queue.put(theNumberToGenerate);
  62. }
  63. } catch (InterruptedException e) {
  64. // Allow our thread to be interrupted
  65. Thread.currentThread().interrupt();
  66. }
  67. }
  68. }
  69.  
  70. private static final class SummingProcess implements Callable<Integer> {
  71.  
  72. private final BlockingQueue<Integer> queue;
  73.  
  74. public SummingProcess(BlockingQueue<Integer> queue) {
  75. this.queue = queue;
  76. }
  77.  
  78. @Override
  79. public Integer call() {
  80. try {
  81. int sum = 0;
  82. while (sum < 100) {
  83. // take() gets the next item from the queue, waiting as necessary
  84. // for there to be elements.
  85. int nextInteger = queue.take();
  86. sum += nextInteger;
  87. System.out.println("Got " + nextInteger + ", total is " + sum);
  88. }
  89. return sum;
  90. } catch (InterruptedException e) {
  91. // Allow our thread to be interrupted
  92. Thread.currentThread().interrupt();
  93. return -1; // this will never run, but the compiler needs it
  94. }
  95. }
  96. }
  97. }
  98.  
Success #stdin #stdout 0.13s 321856KB
stdin
Standard input is empty
stdout
Got 11, total is 11
Got 7, total is 18
Got 5, total is 23
Got 11, total is 34
Got 11, total is 45
Got 7, total is 52
Got 5, total is 57
Got 11, total is 68
Got 7, total is 75
Got 11, total is 86
Got 5, total is 91
Got 11, total is 102
Done! Sum was 102