1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  package eu.ehri.project.persistence;
21  
22  import com.google.common.collect.Sets;
23  import com.tinkerpop.blueprints.Direction;
24  import com.tinkerpop.blueprints.Edge;
25  import com.tinkerpop.blueprints.Vertex;
26  import com.tinkerpop.frames.FramedGraph;
27  import eu.ehri.project.acl.SystemScope;
28  import eu.ehri.project.core.GraphManager;
29  import eu.ehri.project.core.GraphManagerFactory;
30  import eu.ehri.project.definitions.EventTypes;
31  import eu.ehri.project.definitions.Ontology;
32  import eu.ehri.project.exceptions.ItemNotFound;
33  import eu.ehri.project.exceptions.SerializationError;
34  import eu.ehri.project.exceptions.ValidationError;
35  import eu.ehri.project.models.EntityClass;
36  import eu.ehri.project.models.base.Accessible;
37  import eu.ehri.project.models.base.Actioner;
38  import eu.ehri.project.models.base.Entity;
39  import eu.ehri.project.models.events.EventLink;
40  import eu.ehri.project.models.events.SystemEvent;
41  import eu.ehri.project.models.events.SystemEventQueue;
42  import eu.ehri.project.models.events.Version;
43  import org.apache.commons.lang3.tuple.ImmutablePair;
44  import org.apache.commons.lang3.tuple.Pair;
45  import org.joda.time.DateTime;
46  import org.joda.time.Seconds;
47  import org.joda.time.format.ISODateTimeFormat;
48  import org.slf4j.Logger;
49  import org.slf4j.LoggerFactory;
50  
51  import java.util.Collections;
52  import java.util.Iterator;
53  import java.util.Optional;
54  import java.util.Set;
55  
56  
57  
58  
59  
60  
61  
62  
63  
64  
65  
66  
67  
68  
69  
70  
71  
72  
73  
74  
75  
76  
77  
78  
79  
80  
81  
82  
83  
84  
85  
86  
87  public final class ActionManager {
88  
89      
90      
91      private static final Logger logger = LoggerFactory.getLogger(ActionManager.class);
92  
93      public static final String GLOBAL_EVENT_ROOT = "globalEventRoot";
94      public static final String DEBUG_TYPE = "_debugType";
95      public static final String EVENT_LINK = "EventLink";
96      public static final String LINK_TYPE = "_linkType";
97  
98      private final FramedGraph<?> graph;
99      private final GraphManager manager;
100     private final Entity scope;
101     private final Serializer versionSerializer;
102     private final BundleManager dao;
103 
104     
105 
106 
107 
108 
109     public ActionManager(FramedGraph<?> graph, Entity scope) {
110         this.graph = graph;
111         this.manager = GraphManagerFactory.getInstance(graph);
112         this.scope = Optional.ofNullable(scope).orElse(SystemScope.getInstance());
113         this.versionSerializer = new Serializer.Builder(graph).dependentOnly().build();
114         this.dao = new BundleManager(graph);
115     }
116 
117     
118 
119 
120 
121 
122     public ActionManager(FramedGraph<?> graph) {
123         this(graph, SystemScope.getInstance());
124     }
125 
126     
127 
128 
129 
130     public class EventContext {
131         private final Actioner actioner;
132         private final EventTypes actionType;
133         private final Optional<String> logMessage;
134         private final Set<Pair<Entity, Bundle>> toVersion;
135         private final Set<Accessible> subjects;
136         private final String timestamp;
137 
138         
139 
140 
141 
142 
143 
144 
145         EventContext(Actioner actioner,
146                 EventTypes type,
147                 String timestamp, Optional<String> logMessage,
148                 Set<Pair<Entity, Bundle>> toVersion) {
149             this.actionType = type;
150             this.actioner = actioner;
151             this.logMessage = logMessage;
152             this.toVersion = toVersion;
153             this.subjects = Sets.newHashSet();
154             this.timestamp = timestamp;
155         }
156 
157         
158 
159 
160 
161 
162         public Actioner getActioner() {
163             return this.actioner;
164         }
165 
166         
167 
168 
169 
170 
171         public Optional<String> getLogMessage() {
172             return this.logMessage;
173         }
174 
175         
176 
177 
178 
179 
180         public Set<Accessible> getSubjects() {
181             return subjects;
182         }
183 
184         
185 
186 
187 
188 
189 
190         public EventContext createVersion(Entity frame) {
191             try {
192                 Bundle bundle = versionSerializer.entityToBundle(frame);
193                 return createVersion(frame, bundle);
194             } catch (SerializationError serializationError) {
195                 throw new RuntimeException(serializationError);
196             }
197         }
198 
199         
200 
201 
202 
203 
204 
205 
206 
207 
208         public EventContext createVersion(Entity frame, Bundle bundle) {
209             Bundle versionBundle = Bundle.Builder.withClass(EntityClass.VERSION)
210                     .addDataValue(Ontology.VERSION_ENTITY_ID, frame.getId())
211                     .addDataValue(Ontology.VERSION_ENTITY_CLASS, frame.getType())
212                     .addDataValue(Ontology.VERSION_ENTITY_DATA, bundle.toJson())
213                     .build();
214             toVersion.add(new ImmutablePair<>(frame, versionBundle));
215             return this;
216         }
217 
218         
219 
220 
221 
222 
223 
224         public EventContext addSubjects(Accessible... entities) {
225             Collections.addAll(subjects, entities);
226             return this;
227         }
228 
229         
230 
231 
232 
233 
234         public EventTypes getEventType() {
235             return actionType;
236         }
237 
238         
239 
240 
241         public SystemEvent commit() {
242             Vertex vertex = getLinkNode(Ontology.ACTIONER_HAS_LIFECYCLE_ACTION);
243             replaceAtHead(actioner.asVertex(), vertex,
244                     Ontology.ACTIONER_HAS_LIFECYCLE_ACTION,
245                     Ontology.ACTIONER_HAS_LIFECYCLE_ACTION, Direction.OUT);
246             SystemEvent systemEvent = createGlobalEvent(timestamp, actionType, logMessage);
247             addActionerLink(systemEvent.asVertex(), vertex);
248 
249             for (Entity entity : subjects) {
250                 Vertex subjectVertex = getLinkNode(
251                         Ontology.ENTITY_HAS_LIFECYCLE_EVENT);
252                 replaceAtHead(entity.asVertex(), subjectVertex,
253                         Ontology.ENTITY_HAS_LIFECYCLE_EVENT,
254                         Ontology.ENTITY_HAS_LIFECYCLE_EVENT, Direction.OUT);
255                 addSubjectLink(systemEvent.asVertex(), subjectVertex);
256             }
257 
258             
259             if (!toVersion.isEmpty()) {
260                 try {
261                     for (Pair<Entity, Bundle> entityBundle : toVersion) {
262                         Entity subject = entityBundle.getKey();
263                         Bundle version = entityBundle.getValue();
264                         Version ev = dao.create(version, Version.class);
265                         replaceAtHead(subject.asVertex(), ev.asVertex(),
266                                 Ontology.ENTITY_HAS_PRIOR_VERSION,
267                                 Ontology.ENTITY_HAS_PRIOR_VERSION, Direction.OUT);
268                         graph.addEdge(null, ev.asVertex(),
269                                 systemEvent.asVertex(), Ontology.VERSION_HAS_EVENT);
270                     }
271                 } catch (ValidationError validationError) {
272                     throw new RuntimeException(validationError);
273                 }
274             }
275 
276             return systemEvent;
277         }
278     }
279 
280     public SystemEventQueue getEventRoot() {
281         try {
282             return manager.getEntity(GLOBAL_EVENT_ROOT, EntityClass.SYSTEM, SystemEventQueue.class);
283         } catch (ItemNotFound itemNotFound) {
284             throw new RuntimeException("Fatal error: system node (id: 'system') was not found. " +
285                     "Perhaps the graph was incorrectly initialised?");
286         }
287     }
288 
289     
290 
291 
292 
293 
294     public SystemEvent getLatestGlobalEvent() {
295         Iterable<SystemEvent> latest = getEventRoot().getSystemEvents();
296         return latest.iterator().hasNext() ? latest.iterator().next() : null;
297     }
298 
299     
300 
301 
302 
303 
304     public Iterable<SystemEvent> getLatestGlobalEvents() {
305         try {
306             SystemEventQueue queue = manager.getEntity(
307                     GLOBAL_EVENT_ROOT, EntityClass.SYSTEM, SystemEventQueue.class);
308             return queue.getSystemEvents();
309         } catch (ItemNotFound itemNotFound) {
310             throw new RuntimeException("Couldn't find system event queue!");
311         }
312     }
313 
314     
315 
316 
317 
318 
319 
320 
321 
322     public EventContext newEventContext(Actioner user, EventTypes type, Optional<String> logMessage) {
323         return new EventContext(user, type, getTimestamp(), logMessage,
324                 Sets.<Pair<Entity, Bundle>>newHashSet());
325     }
326 
327     
328 
329 
330 
331 
332 
333 
334     public EventContext newEventContext(Actioner user, EventTypes type) {
335         return new EventContext(user, type, getTimestamp(), Optional.<String>empty(),
336                 Sets.<Pair<Entity, Bundle>>newHashSet());
337     }
338 
339     
340 
341 
342 
343 
344 
345 
346 
347     public EventContext newEventContext(Accessible subject, Actioner user,
348             EventTypes type) {
349         return newEventContext(subject, user, type, Optional.<String>empty());
350     }
351 
352     
353 
354 
355 
356 
357 
358 
359 
360 
361     public EventContext newEventContext(Accessible subject, Actioner user,
362             EventTypes type, Optional<String> logMessage) {
363         EventContext context = newEventContext(user, type, logMessage);
364         context.addSubjects(subject);
365         return context;
366     }
367 
368     
369 
370 
371 
372 
373 
374     public ActionManager setScope(Entity frame) {
375         return new ActionManager(graph,
376                 Optional.ofNullable(frame).orElse(SystemScope.getInstance()));
377     }
378 
379 
380     
381 
382 
383 
384 
385 
386 
387 
388 
389 
390 
391 
392 
393 
394 
395 
396 
397 
398 
399     public static boolean canAggregate(SystemEvent event1, SystemEvent event2, int timeDiffInSeconds) {
400         
401         
402         
403         EventTypes eventType1 = event1.getEventType();
404         EventTypes eventType2 = event2.getEventType();
405         if (eventType1 != null && eventType2 != null && !eventType1.equals(eventType2)) {
406             return false;
407         }
408 
409         String logMessage1 = event1.getLogMessage();
410         String logMessage2 = event2.getLogMessage();
411         if (logMessage1 != null && logMessage2 != null && !logMessage1.equals(logMessage2)) {
412             return false;
413         }
414 
415         if (timeDiffInSeconds > -1) {
416             DateTime event1Time = DateTime.parse(event1.getTimestamp());
417             DateTime event2Time = DateTime.parse(event2.getTimestamp());
418             int timeDiff = Seconds.secondsBetween(event1Time, event2Time).getSeconds();
419             if (timeDiff >= timeDiffInSeconds) {
420                 return false;
421             }
422         }
423 
424         Entity eventScope1 = event1.getEventScope();
425         Entity eventScope2 = event2.getEventScope();
426         if (eventScope1 != null && eventScope2 != null && !eventScope1.equals(eventScope2)) {
427             return false;
428         }
429 
430         Accessible entity1 = event1.getFirstSubject();
431         Accessible entity2 = event2.getFirstSubject();
432         if (entity1 != null && entity2 != null && !entity1.equals(entity2)) {
433             return false;
434         }
435 
436         Actioner actioner1 = event1.getActioner();
437         Actioner actioner2 = event2.getActioner();
438         if (actioner1 != null && actioner2 != null && !actioner1.equals(actioner2)) {
439             return false;
440         }
441 
442         
443         return true;
444     }
445 
446     
447 
448 
449 
450 
451 
452 
453 
454     public static boolean sequentialWithSameAccessor(SystemEvent first, SystemEvent second) {
455         Vertex firstVertex = first.asVertex();
456         Vertex secondVertex = second.asVertex();
457 
458         for (Vertex link1 : firstVertex.getVertices(Direction.IN, Ontology.ACTION_HAS_EVENT)) {
459             for (Vertex link2 : secondVertex.getVertices(Direction.IN, Ontology.ACTION_HAS_EVENT)) {
460                 for (Edge chain : link2.getEdges(Direction.OUT, Ontology.ACTIONER_HAS_LIFECYCLE_ACTION)) {
461                     return chain.getVertex(Direction.IN).equals(link1);
462                 }
463             }
464         }
465         return false;
466     }
467 
468     public static boolean sameAs(SystemEvent event1, SystemEvent event2) {
469         return canAggregate(event1, event2, -1);
470     }
471 
472     
473 
474     private SystemEvent createGlobalEvent(String timestamp, EventTypes type, Optional<String> logMessage) {
475         
476         
477         
478         try {
479             logger.trace("Creating global event root");
480             Vertex system = manager.getVertex(GLOBAL_EVENT_ROOT);
481             Bundle ge = Bundle.Builder.withClass(EntityClass.SYSTEM_EVENT)
482                     .addDataValue(Ontology.EVENT_TYPE, type.toString())
483                     .addDataValue(Ontology.EVENT_TIMESTAMP, timestamp)
484                     .addDataValue(Ontology.EVENT_LOG_MESSAGE, logMessage.orElse(""))
485                     .build();
486             SystemEvent ev = dao.create(ge, SystemEvent.class);
487             if (!scope.equals(SystemScope.getInstance())) {
488                 ev.setEventScope(scope);
489             }
490             replaceAtHead(system, ev.asVertex(), Ontology.ACTIONER_HAS_LIFECYCLE_ACTION + "Stream", Ontology.ACTIONER_HAS_LIFECYCLE_ACTION, Direction.OUT);
491             return ev;
492         } catch (ItemNotFound e) {
493             e.printStackTrace();
494             throw new RuntimeException("Fatal error: system node (id: 'system') was not found. " +
495                     "Perhaps the graph was incorrectly initialised?");
496         } catch (ValidationError e) {
497             e.printStackTrace();
498             throw new RuntimeException(
499                     "Unexpected validation error creating action", e);
500         }
501     }
502 
503     
504 
505 
506 
507     private Vertex getLinkNode(String linkType) {
508         try {
509             return dao.create(Bundle.Builder.withClass(EntityClass.EVENT_LINK)
510                             .addDataValue(DEBUG_TYPE, EVENT_LINK)
511                             .addDataValue(LINK_TYPE, linkType).build(),
512                     EventLink.class).asVertex();
513         } catch (ValidationError e) {
514             throw new RuntimeException(e);
515         }
516     }
517 
518     
519 
520 
521 
522 
523 
524     private void addSubjectLink(Vertex event, Vertex subjectLinkNode) {
525         graph.addEdge(null, subjectLinkNode, event, Ontology.ENTITY_HAS_EVENT);
526     }
527 
528     private void addActionerLink(Vertex event, Vertex actionerLinkNode) {
529         graph.addEdge(null, actionerLinkNode, event, Ontology.ACTION_HAS_EVENT);
530     }
531 
532     
533 
534 
535 
536 
537 
538 
539 
540 
541 
542     private void replaceAtHead(Vertex head, Vertex newHead, String headRelation,
543             String relation, Direction direction) {
544         Iterator<Vertex> iter = head.getVertices(direction, headRelation).iterator();
545         if (iter.hasNext()) {
546             Vertex current = iter.next();
547             for (Edge e : head.getEdges(direction, headRelation)) {
548                 graph.removeEdge(e);
549             }
550             graph.addEdge(null, newHead, current, relation);
551 
552         }
553         graph.addEdge(null, head, newHead, headRelation);
554     }
555 
556     
557 
558 
559 
560 
561     public static String getTimestamp() {
562         return ISODateTimeFormat.dateTime().print(DateTime.now());
563     }
564 }