[tor-commits] [metrics-lib/master] Throw out Java's BlockingQueue implementation.

karsten at torproject.org karsten at torproject.org
Fri Dec 16 08:00:05 UTC 2011


commit 9acd48f52eac5f21831b27a8f94cbf59610ee7cc
Author: Karsten Loesing <karsten.loesing at gmx.net>
Date:   Fri Dec 16 08:57:54 2011 +0100

    Throw out Java's BlockingQueue implementation.
    
    The problem is that Java's BlockingQueue implementation doesn't support
    end-of-stream objects to signal that the producer won't add more objects
    to the queue.  We need to implement that ourselves, and it turns out it's
    harder than expected to get it right.  We can just implement our own
    thread handling.
---
 .../descriptor/impl/BlockingIteratorImpl.java      |   65 +++++++++-----------
 1 files changed, 30 insertions(+), 35 deletions(-)

diff --git a/src/org/torproject/descriptor/impl/BlockingIteratorImpl.java b/src/org/torproject/descriptor/impl/BlockingIteratorImpl.java
index 3dddf9e..ed79797 100644
--- a/src/org/torproject/descriptor/impl/BlockingIteratorImpl.java
+++ b/src/org/torproject/descriptor/impl/BlockingIteratorImpl.java
@@ -2,78 +2,73 @@
  * See LICENSE for licensing information */
 package org.torproject.descriptor.impl;
 
+import java.util.LinkedList;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.Queue;
 
 /* Provide an iterator for a queue of objects and block when there are
  * currently no objects in the queue.  Allow the producer to signal that
  * there won't be further objects and unblock any waiting consumers. */
 public class BlockingIteratorImpl<T> implements Iterator<T> {
 
-  /* Define an internal class encapsulating queue elements as a workaround
-   * for adding an end-of-stream object (containing a null reference) to
-   * the queue when the producer runs out of objects. */
-  private static class QueueElement<T> {
-    private T object;
-  }
-
-  /* Blocking queue containing produced elemnts (or the end-of-stream
-   * object) waiting for consumers. */
-  private BlockingQueue<QueueElement<T>> queue =
-      new LinkedBlockingQueue<QueueElement<T>>();
+  /* Queue containing produced elemnts waiting for consumers. */
+  private Queue<T> queue = new LinkedList<T>();
 
   /* Restrict object construction to the impl package. */
   protected BlockingIteratorImpl() {
   }
 
   /* Add an object to the queue. */
-  protected void add(T object) {
-    /* TODO This check is invalid.  But should we make sure that no
-     * objects get added after sending the end-of-stream object somehow?
+  protected synchronized void add(T object) {
     if (this.outOfDescriptors) {
       throw new IllegalStateException("Internal erorr: Adding results to "
           + "descriptor queue not allowed after sending end-of-stream "
           + "object.");
-    }*/
-    QueueElement<T> element = new QueueElement<T>();
-    element.object = object;
-    this.queue.add(element);
+    }
+    this.queue.offer(object);
+    notifyAll();
   }
 
   /* Signalize that there won't be any further objects to be enqueued. */
   private boolean outOfDescriptors = false;
-  protected void setOutOfDescriptors() {
+  protected synchronized void setOutOfDescriptors() {
+    if (this.outOfDescriptors) {
+      throw new IllegalStateException("Internal erorr: Sending "
+          + "end-of-stream object only permitted once.");
+    }
     this.outOfDescriptors = true;
-    this.add(null);
+    notifyAll();
   }
 
   /* Return whether there are more objects.  Block if there are currently
    * no objects, but the producer hasn't signalized that there won't be
    * further objects. */
-  public boolean hasNext() {
-    QueueElement<T> nextElement = this.queue.peek();
-    return ((nextElement != null && nextElement.object != null) ||
-        (nextElement == null && !this.outOfDescriptors));
+  public synchronized boolean hasNext() {
+    while (!this.outOfDescriptors && this.queue.isEmpty()) {
+      try {
+        wait();
+      } catch (InterruptedException e) {
+      }
+    }
+    return this.queue.peek() != null;
   }
 
   /* Return the next object in the queue or throw an exception when there
    * are no further objects.  Block if there are currently no objects, but
    * the producer hasn't signalized that there won't be further
    * objects. */
-  public T next() {
-    QueueElement<T> nextElement = this.queue.peek();
-    try {
-      nextElement = this.queue.take();
-    } catch (InterruptedException e) {
-      /* TODO How should we handle this? */
+  public synchronized T next() {
+    while (!this.outOfDescriptors && this.queue.isEmpty()) {
+      try {
+        wait();
+      } catch (InterruptedException e) {
+      }
     }
-    if (nextElement == null || nextElement.object == null) {
+    if (this.queue.peek() == null) {
       throw new NoSuchElementException();
     }
-    T result = nextElement.object;
-    return result;
+    return this.queue.remove();
   }
 
   /* Don't support explicitly removing objects.  They are removed



More information about the tor-commits mailing list