1 package eu.ehri.project.utils.pipes;
2
3 import com.google.common.collect.Lists;
4 import com.tinkerpop.pipes.AbstractPipe;
5 import com.tinkerpop.pipes.transform.TransformPipe;
6
7 import java.util.List;
8 import java.util.NoSuchElementException;
9
10
11
12
13
14 public class AggregatorPipe<E> extends AbstractPipe<E, List<E>> implements
15 TransformPipe<E, List<E>> {
16
17 public interface AggregatorFunction<T> {
18 boolean aggregate(T a, T b, int aggregateCount);
19 }
20
21 private final AggregatorFunction<E> function;
22
23 public AggregatorPipe(AggregatorFunction<E> function) {
24 this.function = function;
25 }
26
27 private final List<E> buffer = Lists.newArrayList();
28
29 @Override
30 protected List<E> processNextStart() throws NoSuchElementException {
31 while (true) {
32 try {
33 E next = this.starts.next();
34 if (buffer.isEmpty()) {
35 buffer.add(next);
36 } else {
37 int size = buffer.size();
38 if (function.aggregate(buffer.get(size - 1), next, size)) {
39 buffer.add(next);
40 } else {
41 List<E> copy = Lists.newArrayList(buffer);
42 buffer.clear();
43 buffer.add(next);
44 return copy;
45 }
46 }
47 } catch (NoSuchElementException e) {
48 if (!buffer.isEmpty()) {
49 List<E> copy = Lists.newArrayList(buffer);
50 buffer.clear();
51 return copy;
52 } else {
53 throw e;
54 }
55 }
56 }
57 }
58
59 @Override
60 public void reset() {
61 buffer.clear();
62 super.reset();
63 }
64 }