1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package eu.ehri.project.core.impl;
21
22 import eu.ehri.project.core.Tx;
23 import eu.ehri.project.core.TxGraph;
24 import eu.ehri.project.core.impl.neo4j.Neo4j2Graph;
25 import org.neo4j.graphdb.GraphDatabaseService;
26 import org.neo4j.graphdb.Transaction;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30
31
32
33
34 public class TxNeo4jGraph extends Neo4j2Graph implements TxGraph {
35
36 public static class UnderlyingTxRemovedError extends RuntimeException {
37 UnderlyingTxRemovedError(String msg) {
38 super(msg);
39 }
40 }
41
42 public static final Logger logger = LoggerFactory.getLogger(TxNeo4jGraph.class);
43
44 public TxNeo4jGraph(String directory) {
45 super(directory);
46 }
47
48 public TxNeo4jGraph(GraphDatabaseService rawGraph) {
49 super(rawGraph);
50 }
51
52 private ThreadLocal<Neo4jTx> etx = new ThreadLocal<Neo4jTx>() {
53 @Override
54 public Neo4jTx initialValue() {
55 return null;
56 }
57 };
58
59 public Tx beginTx() {
60 logger.trace("Begin tx: {}", Thread.currentThread().getName());
61 if (this.tx.get() != null) {
62 RuntimeException e = new RuntimeException("Tried to begin a TX when one is already open.");
63 e.printStackTrace();
64 throw e;
65 }
66 Transaction tx = getRawGraph().beginTx();
67 this.tx.set(tx);
68 Neo4jTx t = new Neo4jTx();
69 etx.set(t);
70 return t;
71 }
72
73 @Override
74 public void commit() {
75 if (tx.get() == null) {
76 RuntimeException e = new RuntimeException("Attempting to commit null tx on: " + Thread.currentThread().getName());
77 e.printStackTrace();
78 throw e;
79 }
80 logger.trace("Committing TX on graph: {}", Thread.currentThread());
81 super.commit();
82 if (etx.get() != null) {
83 logger.warn("Restarting Neo4j TX on {}", Thread.currentThread());
84
85
86 tx.set(getRawGraph().beginTx());
87 }
88 }
89
90 @Override
91 public void shutdown() {
92 getRawGraph().shutdown();
93 }
94
95
96
97
98
99
100 @Override
101 public void autoStartTransaction(boolean forWrite) {
102
103 }
104
105
106
107
108
109
110
111
112 @Override
113 public void init() {
114
115 }
116
117
118
119
120
121
122 public boolean isInTransaction() {
123 return tx.get() != null;
124 }
125
126 public class Neo4jTx implements Tx {
127
128
129
130
131
132
133 Transaction underlying() {
134 return tx.get();
135 }
136
137 public void success() {
138 logger.trace("Successful TX {} on: {}", this, Thread.currentThread());
139 Transaction transaction = tx.get();
140 if (transaction == null) {
141 throw new UnderlyingTxRemovedError("Underlying transaction removed!");
142 }
143 transaction.success();
144 }
145
146 public void close() {
147 logger.trace("Closing TX {} on: {}", this, Thread.currentThread());
148 Transaction transaction = tx.get();
149 if (transaction == null) {
150 throw new UnderlyingTxRemovedError("Underlying transaction removed!");
151 }
152 transaction.close();
153 tx.remove();
154 etx.remove();
155 }
156
157 public void failure() {
158 logger.trace("Failed TX {} on: {}", this, Thread.currentThread());
159 Transaction transaction = tx.get();
160 if (transaction == null) {
161 throw new UnderlyingTxRemovedError("Underlying transaction removed!");
162 }
163 transaction.failure();
164 }
165 }
166 }