1 package eu.ehri.project.importers.json;
2
3 import com.fasterxml.jackson.databind.RuntimeJsonMappingException;
4 import com.google.common.collect.Lists;
5 import com.tinkerpop.blueprints.CloseableIterable;
6 import com.tinkerpop.frames.FramedGraph;
7 import eu.ehri.project.acl.SystemScope;
8 import eu.ehri.project.core.GraphManager;
9 import eu.ehri.project.core.GraphManagerFactory;
10 import eu.ehri.project.definitions.EventTypes;
11 import eu.ehri.project.exceptions.DeserializationError;
12 import eu.ehri.project.exceptions.ItemNotFound;
13 import eu.ehri.project.exceptions.SerializationError;
14 import eu.ehri.project.exceptions.ValidationError;
15 import eu.ehri.project.importers.ImportCallback;
16 import eu.ehri.project.importers.ImportLog;
17 import eu.ehri.project.models.base.Accessible;
18 import eu.ehri.project.models.base.Actioner;
19 import eu.ehri.project.models.base.Entity;
20 import eu.ehri.project.models.base.PermissionScope;
21 import eu.ehri.project.persistence.ActionManager;
22 import eu.ehri.project.persistence.Bundle;
23 import eu.ehri.project.persistence.BundleManager;
24 import eu.ehri.project.persistence.Mutation;
25 import eu.ehri.project.persistence.Serializer;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 import java.io.InputStream;
30 import java.util.Collections;
31 import java.util.List;
32 import java.util.Optional;
33
34
35
36
37
38 public class BatchOperations {
39 private static final Logger logger = LoggerFactory.getLogger(BatchOperations.class);
40 private final FramedGraph<?> graph;
41 private final ActionManager actionManager;
42 private final BundleManager dao;
43 private final Serializer serializer;
44 private final GraphManager manager;
45 private final PermissionScope scope;
46 private final boolean version;
47 private final boolean tolerant;
48 private final List<ImportCallback> callbacks;
49
50
51
52
53
54
55
56
57
58
59 public BatchOperations(FramedGraph<?> graph, PermissionScope scopeOpt, boolean version, boolean tolerant,
60 List<ImportCallback> callbacks) {
61 this.graph = graph;
62 this.scope = Optional.ofNullable(scopeOpt).orElse(SystemScope.getInstance());
63 this.version = version;
64 this.tolerant = tolerant;
65 this.callbacks = callbacks;
66
67 this.manager = GraphManagerFactory.getInstance(graph);
68 this.actionManager = new ActionManager(graph, scope);
69 this.dao = new BundleManager(graph, scope.idPath());
70 this.serializer = new Serializer.Builder(graph).dependentOnly().build();
71 }
72
73
74
75
76
77
78
79 public BatchOperations(FramedGraph<?> graph) {
80 this(graph, SystemScope.getInstance(), true, false, Collections.emptyList());
81 }
82
83
84
85
86
87
88
89
90 public BatchOperations setTolerant(boolean tolerant) {
91 return new BatchOperations(graph, scope, version, tolerant, callbacks);
92 }
93
94
95
96
97
98
99
100 public BatchOperations setVersioning(boolean versioning) {
101 return new BatchOperations(graph, scope, versioning, tolerant, callbacks);
102 }
103
104
105
106
107
108
109
110 public BatchOperations setScope(PermissionScope scope) {
111 return new BatchOperations(graph, scope, version, tolerant, callbacks);
112 }
113
114
115
116
117
118
119
120
121 public BatchOperations withCallbacks(ImportCallback ...callbacks) {
122 List<ImportCallback> newCallbacks = Lists.newArrayList(callbacks);
123 newCallbacks.addAll(this.callbacks);
124 return new BatchOperations(graph, scope, version, tolerant, newCallbacks);
125 }
126
127
128
129
130
131
132
133
134
135
136
137 public ImportLog batchImport(InputStream inputStream, Actioner actioner, Optional<String> logMessage)
138 throws DeserializationError, ItemNotFound, ValidationError {
139 ActionManager.EventContext ctx = actionManager.newEventContext(actioner,
140 EventTypes.modification, logMessage);
141 ImportLog log = new ImportLog(logMessage.orElse(null));
142 try (CloseableIterable<Bundle> bundleIter = Bundle.bundleStream(inputStream)) {
143 for (Bundle bundle : bundleIter) {
144 try {
145 Mutation<Accessible> mutation = dao.createOrUpdate(bundle, Accessible.class);
146 switch (mutation.getState()) {
147 case UPDATED:
148 log.addUpdated();
149 ctx.addSubjects(mutation.getNode());
150 if (version) {
151 mutation.getPrior().ifPresent(b ->
152 ctx.createVersion(mutation.getNode(), b));
153 }
154 break;
155 case CREATED:
156 log.addCreated();
157 ctx.addSubjects(mutation.getNode());
158 break;
159 default:
160 log.addUnchanged();
161 }
162 for (ImportCallback callback : callbacks) {
163 callback.itemImported(mutation);
164 }
165 } catch (ValidationError e) {
166 if (!tolerant) {
167 throw e;
168 } else {
169 log.addError(bundle.getId(), e.getMessage());
170 logger.warn("Validation error patching {}: {}", bundle.getId(), e);
171 }
172 }
173 }
174 if (log.hasDoneWork()) {
175 ctx.commit();
176 }
177 return log;
178 } catch (RuntimeJsonMappingException e) {
179 throw new DeserializationError("Error reading JSON stream:", e);
180 }
181 }
182
183
184
185
186
187
188
189
190
191
192 public ImportLog batchUpdate(InputStream inputStream, Actioner actioner, Optional<String> logMessage)
193 throws DeserializationError, ItemNotFound, ValidationError {
194 ActionManager.EventContext ctx = actionManager.newEventContext(actioner,
195 EventTypes.modification, logMessage);
196 ImportLog log = new ImportLog(logMessage.orElse(null));
197 try (CloseableIterable<Bundle> bundleIter = Bundle.bundleStream(inputStream)) {
198 for (Bundle bundle : bundleIter) {
199 Entity entity = manager.getEntity(bundle.getId(), bundle.getType().getJavaClass());
200 Bundle oldBundle = serializer.entityToBundle(entity);
201 Bundle newBundle = oldBundle.mergeDataWith(bundle);
202 try {
203 Mutation<Accessible> update = dao.update(newBundle, Accessible.class);
204 switch (update.getState()) {
205 case UPDATED:
206 log.addUpdated();
207 ctx.addSubjects(update.getNode());
208 if (version) {
209 ctx.createVersion(entity, oldBundle);
210 }
211 break;
212 case UNCHANGED:
213 log.addUnchanged();
214 break;
215 default:
216 throw new RuntimeException("Unexpected status in batch update: " + update.getState());
217 }
218 } catch (ValidationError e) {
219 if (!tolerant) {
220 throw e;
221 } else {
222 log.addError(entity.getId(), e.getMessage());
223 logger.warn("Validation error patching {}: {}", entity.getId(), e);
224 }
225 }
226 }
227 if (log.hasDoneWork()) {
228 ctx.commit();
229 }
230 return log;
231 } catch (SerializationError serializationError) {
232 throw new RuntimeException(serializationError);
233 } catch (RuntimeJsonMappingException e) {
234 throw new DeserializationError("Error reading JSON stream:", e);
235 }
236 }
237
238
239
240
241
242
243
244
245
246 public int batchDelete(List<String> ids, Actioner actioner, Optional<String> logMessage)
247 throws ItemNotFound {
248 int done = 0;
249 if (!ids.isEmpty()) {
250 try {
251 ActionManager.EventContext ctx = actionManager.newEventContext(actioner,
252 EventTypes.deletion, logMessage);
253 for (String id : ids) {
254 Entity entity = manager.getEntity(id, Entity.class);
255 ctx = ctx.addSubjects(entity.as(Accessible.class));
256 if (version) {
257 ctx = ctx.createVersion(entity);
258 }
259 }
260 ctx.commit();
261 for (String id : ids) {
262 dao.delete(serializer.entityToBundle(manager.getEntity(id, Entity.class)));
263 done++;
264 }
265 } catch (SerializationError serializationError) {
266 throw new RuntimeException(serializationError);
267 }
268 }
269 return done;
270 }
271 }