import java.util.concurrent.*;
class CSPExample {
// First we create a queue with capacity 1. This means that if there
// is already an item in the queue waiting to be consumed, any other
// threads wanting to add an item are blocked until it is consumed.
// Also, if the consumer tries to get an element out of the queue, but
// there aren't any items, the consumer will block.
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1);
// We create an executor service, which runs our number generators and
// our summing process. They all get the same queue so they can
// communicate with each other via messages.
ExecutorService threadPool = Executors.newFixedThreadPool(4);
// The NumberGenerators generate different numbers, and wait for different
// periods of time.
threadPool.submit(new NumberGenerator(5, 700, queue));
threadPool.submit(new NumberGenerator(7, 650, queue));
threadPool.submit(new NumberGenerator(11, 400, queue));
// The SummingProcess returns the final sum, so we can get a Future<T>
// that represents the answer at a future time and wait for it to finish.
Future<Integer> totalSum = threadPool.submit(new SummingProcess(queue));
// Waits for the SummingProcess to finish, after it's sum is > 100
Integer sumResult
= totalSum.
get();
System.
out.
println("Done! Sum was " + sumResult
);
// Interrupts the other threads for shutdown. You can also shutdown
// threads more gracefully with shutdown() and awaitTermination(),
// but here we just want them to exit immediately.
threadPool.shutdownNow();
}
private static final class NumberGenerator
implements Runnable {
private final int theNumberToGenerate;
private final int sleepPeriod;
private final BlockingQueue<Integer> queue;
public NumberGenerator(int theNumberToGenerate, int sleepPeriod, BlockingQueue<Integer> queue) {
this.theNumberToGenerate = theNumberToGenerate;
this.sleepPeriod = sleepPeriod;
this.queue = queue;
}
@Override
public void run() {
try {
// Produce numbers indefinitely
while (true) {
// puts the integer into the queue, waiting as necessary for
// there to be space.
queue.put(theNumberToGenerate);
}
// Allow our thread to be interrupted
Thread.
currentThread().
interrupt(); }
}
}
private static final class SummingProcess implements Callable<Integer> {
private final BlockingQueue<Integer> queue;
public SummingProcess(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
try {
int sum = 0;
while (sum < 100) {
// take() gets the next item from the queue, waiting as necessary
// for there to be elements.
int nextInteger = queue.take();
sum += nextInteger;
System.
out.
println("Got " + nextInteger
+ ", total is " + sum
); }
return sum;
// Allow our thread to be interrupted
Thread.
currentThread().
interrupt(); return -1; // this will never run, but the compiler needs it
}
}
}
}