1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package eu.ehri.project.persistence;
21
22 import com.google.common.collect.Sets;
23 import com.tinkerpop.blueprints.Direction;
24 import com.tinkerpop.blueprints.Edge;
25 import com.tinkerpop.blueprints.Vertex;
26 import com.tinkerpop.frames.FramedGraph;
27 import eu.ehri.project.acl.SystemScope;
28 import eu.ehri.project.core.GraphManager;
29 import eu.ehri.project.core.GraphManagerFactory;
30 import eu.ehri.project.definitions.EventTypes;
31 import eu.ehri.project.definitions.Ontology;
32 import eu.ehri.project.exceptions.ItemNotFound;
33 import eu.ehri.project.exceptions.SerializationError;
34 import eu.ehri.project.exceptions.ValidationError;
35 import eu.ehri.project.models.EntityClass;
36 import eu.ehri.project.models.base.Accessible;
37 import eu.ehri.project.models.base.Actioner;
38 import eu.ehri.project.models.base.Entity;
39 import eu.ehri.project.models.events.EventLink;
40 import eu.ehri.project.models.events.SystemEvent;
41 import eu.ehri.project.models.events.SystemEventQueue;
42 import eu.ehri.project.models.events.Version;
43 import org.apache.commons.lang3.tuple.ImmutablePair;
44 import org.apache.commons.lang3.tuple.Pair;
45 import org.joda.time.DateTime;
46 import org.joda.time.Seconds;
47 import org.joda.time.format.ISODateTimeFormat;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 import java.util.Collections;
52 import java.util.Iterator;
53 import java.util.Optional;
54 import java.util.Set;
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87 public final class ActionManager {
88
89
90
91 private static final Logger logger = LoggerFactory.getLogger(ActionManager.class);
92
93 public static final String GLOBAL_EVENT_ROOT = "globalEventRoot";
94 public static final String DEBUG_TYPE = "_debugType";
95 public static final String EVENT_LINK = "EventLink";
96 public static final String LINK_TYPE = "_linkType";
97
98 private final FramedGraph<?> graph;
99 private final GraphManager manager;
100 private final Entity scope;
101 private final Serializer versionSerializer;
102 private final BundleManager dao;
103
104
105
106
107
108
109 public ActionManager(FramedGraph<?> graph, Entity scope) {
110 this.graph = graph;
111 this.manager = GraphManagerFactory.getInstance(graph);
112 this.scope = Optional.ofNullable(scope).orElse(SystemScope.getInstance());
113 this.versionSerializer = new Serializer.Builder(graph).dependentOnly().build();
114 this.dao = new BundleManager(graph);
115 }
116
117
118
119
120
121
122 public ActionManager(FramedGraph<?> graph) {
123 this(graph, SystemScope.getInstance());
124 }
125
126
127
128
129
130 public class EventContext {
131 private final Actioner actioner;
132 private final EventTypes actionType;
133 private final Optional<String> logMessage;
134 private final Set<Pair<Entity, Bundle>> toVersion;
135 private final Set<Accessible> subjects;
136 private final String timestamp;
137
138
139
140
141
142
143
144
145 EventContext(Actioner actioner,
146 EventTypes type,
147 String timestamp, Optional<String> logMessage,
148 Set<Pair<Entity, Bundle>> toVersion) {
149 this.actionType = type;
150 this.actioner = actioner;
151 this.logMessage = logMessage;
152 this.toVersion = toVersion;
153 this.subjects = Sets.newHashSet();
154 this.timestamp = timestamp;
155 }
156
157
158
159
160
161
162 public Actioner getActioner() {
163 return this.actioner;
164 }
165
166
167
168
169
170
171 public Optional<String> getLogMessage() {
172 return this.logMessage;
173 }
174
175
176
177
178
179
180 public Set<Accessible> getSubjects() {
181 return subjects;
182 }
183
184
185
186
187
188
189
190 public EventContext createVersion(Entity frame) {
191 try {
192 Bundle bundle = versionSerializer.entityToBundle(frame);
193 return createVersion(frame, bundle);
194 } catch (SerializationError serializationError) {
195 throw new RuntimeException(serializationError);
196 }
197 }
198
199
200
201
202
203
204
205
206
207
208 public EventContext createVersion(Entity frame, Bundle bundle) {
209 Bundle versionBundle = Bundle.Builder.withClass(EntityClass.VERSION)
210 .addDataValue(Ontology.VERSION_ENTITY_ID, frame.getId())
211 .addDataValue(Ontology.VERSION_ENTITY_CLASS, frame.getType())
212 .addDataValue(Ontology.VERSION_ENTITY_DATA, bundle.toJson())
213 .build();
214 toVersion.add(new ImmutablePair<>(frame, versionBundle));
215 return this;
216 }
217
218
219
220
221
222
223
224 public EventContext addSubjects(Accessible... entities) {
225 Collections.addAll(subjects, entities);
226 return this;
227 }
228
229
230
231
232
233
234 public EventTypes getEventType() {
235 return actionType;
236 }
237
238
239
240
241 public SystemEvent commit() {
242 Vertex vertex = getLinkNode(Ontology.ACTIONER_HAS_LIFECYCLE_ACTION);
243 replaceAtHead(actioner.asVertex(), vertex,
244 Ontology.ACTIONER_HAS_LIFECYCLE_ACTION,
245 Ontology.ACTIONER_HAS_LIFECYCLE_ACTION, Direction.OUT);
246 SystemEvent systemEvent = createGlobalEvent(timestamp, actionType, logMessage);
247 addActionerLink(systemEvent.asVertex(), vertex);
248
249 for (Entity entity : subjects) {
250 Vertex subjectVertex = getLinkNode(
251 Ontology.ENTITY_HAS_LIFECYCLE_EVENT);
252 replaceAtHead(entity.asVertex(), subjectVertex,
253 Ontology.ENTITY_HAS_LIFECYCLE_EVENT,
254 Ontology.ENTITY_HAS_LIFECYCLE_EVENT, Direction.OUT);
255 addSubjectLink(systemEvent.asVertex(), subjectVertex);
256 }
257
258
259 if (!toVersion.isEmpty()) {
260 try {
261 for (Pair<Entity, Bundle> entityBundle : toVersion) {
262 Entity subject = entityBundle.getKey();
263 Bundle version = entityBundle.getValue();
264 Version ev = dao.create(version, Version.class);
265 replaceAtHead(subject.asVertex(), ev.asVertex(),
266 Ontology.ENTITY_HAS_PRIOR_VERSION,
267 Ontology.ENTITY_HAS_PRIOR_VERSION, Direction.OUT);
268 graph.addEdge(null, ev.asVertex(),
269 systemEvent.asVertex(), Ontology.VERSION_HAS_EVENT);
270 }
271 } catch (ValidationError validationError) {
272 throw new RuntimeException(validationError);
273 }
274 }
275
276 return systemEvent;
277 }
278 }
279
280 public SystemEventQueue getEventRoot() {
281 try {
282 return manager.getEntity(GLOBAL_EVENT_ROOT, EntityClass.SYSTEM, SystemEventQueue.class);
283 } catch (ItemNotFound itemNotFound) {
284 throw new RuntimeException("Fatal error: system node (id: 'system') was not found. " +
285 "Perhaps the graph was incorrectly initialised?");
286 }
287 }
288
289
290
291
292
293
294 public SystemEvent getLatestGlobalEvent() {
295 Iterable<SystemEvent> latest = getEventRoot().getSystemEvents();
296 return latest.iterator().hasNext() ? latest.iterator().next() : null;
297 }
298
299
300
301
302
303
304 public Iterable<SystemEvent> getLatestGlobalEvents() {
305 try {
306 SystemEventQueue queue = manager.getEntity(
307 GLOBAL_EVENT_ROOT, EntityClass.SYSTEM, SystemEventQueue.class);
308 return queue.getSystemEvents();
309 } catch (ItemNotFound itemNotFound) {
310 throw new RuntimeException("Couldn't find system event queue!");
311 }
312 }
313
314
315
316
317
318
319
320
321
322 public EventContext newEventContext(Actioner user, EventTypes type, Optional<String> logMessage) {
323 return new EventContext(user, type, getTimestamp(), logMessage,
324 Sets.<Pair<Entity, Bundle>>newHashSet());
325 }
326
327
328
329
330
331
332
333
334 public EventContext newEventContext(Actioner user, EventTypes type) {
335 return new EventContext(user, type, getTimestamp(), Optional.<String>empty(),
336 Sets.<Pair<Entity, Bundle>>newHashSet());
337 }
338
339
340
341
342
343
344
345
346
347 public EventContext newEventContext(Accessible subject, Actioner user,
348 EventTypes type) {
349 return newEventContext(subject, user, type, Optional.<String>empty());
350 }
351
352
353
354
355
356
357
358
359
360
361 public EventContext newEventContext(Accessible subject, Actioner user,
362 EventTypes type, Optional<String> logMessage) {
363 EventContext context = newEventContext(user, type, logMessage);
364 context.addSubjects(subject);
365 return context;
366 }
367
368
369
370
371
372
373
374 public ActionManager setScope(Entity frame) {
375 return new ActionManager(graph,
376 Optional.ofNullable(frame).orElse(SystemScope.getInstance()));
377 }
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399 public static boolean canAggregate(SystemEvent event1, SystemEvent event2, int timeDiffInSeconds) {
400
401
402
403 EventTypes eventType1 = event1.getEventType();
404 EventTypes eventType2 = event2.getEventType();
405 if (eventType1 != null && eventType2 != null && !eventType1.equals(eventType2)) {
406 return false;
407 }
408
409 String logMessage1 = event1.getLogMessage();
410 String logMessage2 = event2.getLogMessage();
411 if (logMessage1 != null && logMessage2 != null && !logMessage1.equals(logMessage2)) {
412 return false;
413 }
414
415 if (timeDiffInSeconds > -1) {
416 DateTime event1Time = DateTime.parse(event1.getTimestamp());
417 DateTime event2Time = DateTime.parse(event2.getTimestamp());
418 int timeDiff = Seconds.secondsBetween(event1Time, event2Time).getSeconds();
419 if (timeDiff >= timeDiffInSeconds) {
420 return false;
421 }
422 }
423
424 Entity eventScope1 = event1.getEventScope();
425 Entity eventScope2 = event2.getEventScope();
426 if (eventScope1 != null && eventScope2 != null && !eventScope1.equals(eventScope2)) {
427 return false;
428 }
429
430 Accessible entity1 = event1.getFirstSubject();
431 Accessible entity2 = event2.getFirstSubject();
432 if (entity1 != null && entity2 != null && !entity1.equals(entity2)) {
433 return false;
434 }
435
436 Actioner actioner1 = event1.getActioner();
437 Actioner actioner2 = event2.getActioner();
438 if (actioner1 != null && actioner2 != null && !actioner1.equals(actioner2)) {
439 return false;
440 }
441
442
443 return true;
444 }
445
446
447
448
449
450
451
452
453
454 public static boolean sequentialWithSameAccessor(SystemEvent first, SystemEvent second) {
455 Vertex firstVertex = first.asVertex();
456 Vertex secondVertex = second.asVertex();
457
458 for (Vertex link1 : firstVertex.getVertices(Direction.IN, Ontology.ACTION_HAS_EVENT)) {
459 for (Vertex link2 : secondVertex.getVertices(Direction.IN, Ontology.ACTION_HAS_EVENT)) {
460 for (Edge chain : link2.getEdges(Direction.OUT, Ontology.ACTIONER_HAS_LIFECYCLE_ACTION)) {
461 return chain.getVertex(Direction.IN).equals(link1);
462 }
463 }
464 }
465 return false;
466 }
467
468 public static boolean sameAs(SystemEvent event1, SystemEvent event2) {
469 return canAggregate(event1, event2, -1);
470 }
471
472
473
474 private SystemEvent createGlobalEvent(String timestamp, EventTypes type, Optional<String> logMessage) {
475
476
477
478 try {
479 logger.trace("Creating global event root");
480 Vertex system = manager.getVertex(GLOBAL_EVENT_ROOT);
481 Bundle ge = Bundle.Builder.withClass(EntityClass.SYSTEM_EVENT)
482 .addDataValue(Ontology.EVENT_TYPE, type.toString())
483 .addDataValue(Ontology.EVENT_TIMESTAMP, timestamp)
484 .addDataValue(Ontology.EVENT_LOG_MESSAGE, logMessage.orElse(""))
485 .build();
486 SystemEvent ev = dao.create(ge, SystemEvent.class);
487 if (!scope.equals(SystemScope.getInstance())) {
488 ev.setEventScope(scope);
489 }
490 replaceAtHead(system, ev.asVertex(), Ontology.ACTIONER_HAS_LIFECYCLE_ACTION + "Stream", Ontology.ACTIONER_HAS_LIFECYCLE_ACTION, Direction.OUT);
491 return ev;
492 } catch (ItemNotFound e) {
493 e.printStackTrace();
494 throw new RuntimeException("Fatal error: system node (id: 'system') was not found. " +
495 "Perhaps the graph was incorrectly initialised?");
496 } catch (ValidationError e) {
497 e.printStackTrace();
498 throw new RuntimeException(
499 "Unexpected validation error creating action", e);
500 }
501 }
502
503
504
505
506
507 private Vertex getLinkNode(String linkType) {
508 try {
509 return dao.create(Bundle.Builder.withClass(EntityClass.EVENT_LINK)
510 .addDataValue(DEBUG_TYPE, EVENT_LINK)
511 .addDataValue(LINK_TYPE, linkType).build(),
512 EventLink.class).asVertex();
513 } catch (ValidationError e) {
514 throw new RuntimeException(e);
515 }
516 }
517
518
519
520
521
522
523
524 private void addSubjectLink(Vertex event, Vertex subjectLinkNode) {
525 graph.addEdge(null, subjectLinkNode, event, Ontology.ENTITY_HAS_EVENT);
526 }
527
528 private void addActionerLink(Vertex event, Vertex actionerLinkNode) {
529 graph.addEdge(null, actionerLinkNode, event, Ontology.ACTION_HAS_EVENT);
530 }
531
532
533
534
535
536
537
538
539
540
541
542 private void replaceAtHead(Vertex head, Vertex newHead, String headRelation,
543 String relation, Direction direction) {
544 Iterator<Vertex> iter = head.getVertices(direction, headRelation).iterator();
545 if (iter.hasNext()) {
546 Vertex current = iter.next();
547 for (Edge e : head.getEdges(direction, headRelation)) {
548 graph.removeEdge(e);
549 }
550 graph.addEdge(null, newHead, current, relation);
551
552 }
553 graph.addEdge(null, head, newHead, headRelation);
554 }
555
556
557
558
559
560
561 public static String getTimestamp() {
562 return ISODateTimeFormat.dateTime().print(DateTime.now());
563 }
564 }