View Javadoc

1   package eu.ehri.project.utils.pipes;
2   
3   import com.google.common.collect.Lists;
4   import com.tinkerpop.pipes.AbstractPipe;
5   import com.tinkerpop.pipes.transform.TransformPipe;
6   
7   import java.util.List;
8   import java.util.NoSuchElementException;
9   
10  /**
11   * Pipe processor to aggregate item streams into lists given a comparator
12   * function that can compare neighbouring items.
13   */
14  public class AggregatorPipe<E> extends AbstractPipe<E, List<E>> implements
15          TransformPipe<E, List<E>> {
16  
17      public interface AggregatorFunction<T> {
18          boolean aggregate(T a, T b, int aggregateCount);
19      }
20  
21      private final AggregatorFunction<E> function;
22  
23      public AggregatorPipe(AggregatorFunction<E> function) {
24          this.function = function;
25      }
26  
27      private final List<E> buffer = Lists.newArrayList();
28  
29      @Override
30      protected List<E> processNextStart() throws NoSuchElementException {
31          while (true) {
32              try {
33                  E next = this.starts.next();
34                  if (buffer.isEmpty()) {
35                      buffer.add(next);
36                  } else {
37                      int size = buffer.size();
38                      if (function.aggregate(buffer.get(size - 1), next, size)) {
39                          buffer.add(next);
40                      } else {
41                          List<E> copy = Lists.newArrayList(buffer);
42                          buffer.clear();
43                          buffer.add(next);
44                          return copy;
45                      }
46                  }
47              } catch (NoSuchElementException e) {
48                  if (!buffer.isEmpty()) {
49                      List<E> copy = Lists.newArrayList(buffer);
50                      buffer.clear();
51                      return copy;
52                  } else {
53                      throw e;
54                  }
55              }
56          }
57      }
58  
59      @Override
60      public void reset() {
61          buffer.clear();
62          super.reset();
63      }
64  }