fork download
  1. /* package whatever; // don't place package name! */
  2.  
  3. import java.util.concurrent.atomic.AtomicInteger;
  4. import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
  5.  
  6. /* special for pr */
  7. class ConcurrentNonBlockingQueue<T> {
  8.  
  9. static class Node<T> {
  10. private volatile T obj;
  11. private volatile Node<T> next;
  12.  
  13. private static final AtomicReferenceFieldUpdater<Node, Node> nextUpd = AtomicReferenceFieldUpdater.newUpdater(
  14. Node.class, Node.class, "next");
  15.  
  16. private static final AtomicReferenceFieldUpdater<Node, Object> objUpd = AtomicReferenceFieldUpdater.newUpdater(
  17. Node.class, Object.class, "obj");
  18.  
  19. boolean updateNext(Node<T> expected, Node<T> update) {
  20. return nextUpd.compareAndSet(this, expected, update);
  21. }
  22.  
  23. void setObj(T obj) {
  24. objUpd.set(this, obj);
  25. }
  26.  
  27. public Node(T obj, Node<T> next) {
  28. this.obj = obj;
  29. this.next = next;
  30. }
  31.  
  32. Node<T> next() {
  33. return next;
  34. }
  35.  
  36. T getObj() {
  37. return obj;
  38. }
  39. }
  40.  
  41. private volatile Node<T> head = new Node<T>(null, null);
  42. private volatile Node<T> tail = head;
  43.  
  44. private static final AtomicReferenceFieldUpdater<ConcurrentNonBlockingQueue, Node> tailUpdater = AtomicReferenceFieldUpdater
  45. .newUpdater(ConcurrentNonBlockingQueue.class, Node.class, "tail");
  46.  
  47. private static final AtomicReferenceFieldUpdater<ConcurrentNonBlockingQueue, Node> headUpdater = AtomicReferenceFieldUpdater
  48. .newUpdater(ConcurrentNonBlockingQueue.class, Node.class, "head");
  49.  
  50. public ConcurrentNonBlockingQueue() {}
  51.  
  52. public boolean add(T t) {
  53. Node<T> newNode = new Node<T>(t, null);
  54. while (true) {
  55. Node<T> tailnode = this.tail;
  56. Node<T> tailnext = tailnode.next();
  57. if (tailnode == this.tail && tailnext == null) {
  58. if (tailnode.updateNext(tailnext, newNode)) {
  59. this.tail = newNode;
  60. return true;
  61. }
  62. }
  63. }
  64. }
  65.  
  66. public T pop() {
  67. while(true) {
  68. Node<T> headnode = this.head;
  69. Node<T> tailnode = this.tail;
  70. Node<T> headnext = head.next();
  71. if ( headnode == this.head && headnode != tailnode ) {
  72. if ( headUpdater.compareAndSet(this, headnode, headnext) ) {
  73. T obj = headnext.getObj();
  74. headnext.setObj(null);
  75. return obj;
  76. }
  77. } else {
  78. if (headnext == null)
  79. return null;
  80. else
  81. tailUpdater.compareAndSet(this, tailnode, headnext);
  82. }
  83. }
  84. }
  85.  
  86. public static void main(String[] args) throws InterruptedException {
  87.  
  88. final AtomicInteger inc = new AtomicInteger(0);
  89. final ConcurrentNonBlockingQueue<Integer> queue = new ConcurrentNonBlockingQueue<Integer>();
  90.  
  91. Thread t1 = new Thread() {
  92. public void run() {
  93. for ( int i = 0; i < 10; i++ ) {
  94. queue.add(inc.incrementAndGet());
  95. }
  96. }
  97. };
  98.  
  99. Thread t2 = new Thread() {
  100. public void run() {
  101. for ( int i = 0; i < 10; i++ ) {
  102. queue.add(inc.incrementAndGet());
  103. }
  104. }
  105. };
  106.  
  107. Thread t3 = new Thread() {
  108. public void run() {
  109. for ( int i = 0; i < 20; i++ ) {
  110. System.out.println(queue.pop());
  111. }
  112. }
  113. };
  114.  
  115. Thread t4 = new Thread() {
  116. public void run() {
  117. for ( int i = 0; i < 20; i++ ) {
  118. System.out.println(queue.pop());
  119. }
  120. }
  121. };
  122.  
  123. t1.start();
  124. t1.join();
  125. t3.start();
  126. t3.join();
  127. t4.start();
  128. t2.start();
  129. }
  130. }
Success #stdin #stdout 0.08s 380864KB
stdin
Standard input is empty
stdout
1
2
3
4
5
6
7
8
9
10
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null