View Javadoc

1   /*
2    * Copyright 2015 Data Archiving and Networked Services (an institute of
3    * Koninklijke Nederlandse Akademie van Wetenschappen), King's College London,
4    * Georg-August-Universitaet Goettingen Stiftung Oeffentlichen Rechts
5    *
6    * Licensed under the EUPL, Version 1.1 or – as soon they will be approved by
7    * the European Commission - subsequent versions of the EUPL (the "Licence");
8    * You may not use this work except in compliance with the Licence.
9    * You may obtain a copy of the Licence at:
10   *
11   * https://joinup.ec.europa.eu/software/page/eupl
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the Licence is distributed on an "AS IS" basis,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the Licence for the specific language governing
17   * permissions and limitations under the Licence.
18   */
19  
20  package eu.ehri.project.persistence;
21  
22  import com.google.common.collect.Sets;
23  import com.tinkerpop.blueprints.Direction;
24  import com.tinkerpop.blueprints.Edge;
25  import com.tinkerpop.blueprints.Vertex;
26  import com.tinkerpop.frames.FramedGraph;
27  import eu.ehri.project.acl.SystemScope;
28  import eu.ehri.project.core.GraphManager;
29  import eu.ehri.project.core.GraphManagerFactory;
30  import eu.ehri.project.definitions.EventTypes;
31  import eu.ehri.project.definitions.Ontology;
32  import eu.ehri.project.exceptions.ItemNotFound;
33  import eu.ehri.project.exceptions.SerializationError;
34  import eu.ehri.project.exceptions.ValidationError;
35  import eu.ehri.project.models.EntityClass;
36  import eu.ehri.project.models.base.Accessible;
37  import eu.ehri.project.models.base.Actioner;
38  import eu.ehri.project.models.base.Entity;
39  import eu.ehri.project.models.events.EventLink;
40  import eu.ehri.project.models.events.SystemEvent;
41  import eu.ehri.project.models.events.SystemEventQueue;
42  import eu.ehri.project.models.events.Version;
43  import org.apache.commons.lang3.tuple.ImmutablePair;
44  import org.apache.commons.lang3.tuple.Pair;
45  import org.joda.time.DateTime;
46  import org.joda.time.Seconds;
47  import org.joda.time.format.ISODateTimeFormat;
48  import org.slf4j.Logger;
49  import org.slf4j.LoggerFactory;
50  
51  import java.util.Collections;
52  import java.util.Iterator;
53  import java.util.Optional;
54  import java.util.Set;
55  
56  /**
57   * Class for dealing with actions.
58   * <p>
59   * Events are captured as a linked list with new events placed
60   * at the head. The head event is connected to a node called the
61   * {@link eu.ehri.project.models.events.SystemEventQueue}. Events
62   * can have subjects (the thing the event is happening to) and
63   * actioners (the person initiating the event.) A subject's events
64   * and an actioner's actions likewise for a linked list so it is
65   * possible to fetch new events easily and prevent having to sort
66   * by timestamp, etc. Schematically, the graph thus formed looks
67   * something like:
68   * <p>
69   * <pre>
70   * <code>
71   * Actioner              SystemEventQueue             Subject
72   * \/                        \/                      \/
73   * [lifecycleAction]     [lifecycleActionStream]     [lifecycleEvent]
74   * |                         |                       |
75   * e3--[actionHasEvent]-&gt;-- Event 3 ---[hasEvent]--&lt;--e3
76   * \/                        \/                      \/
77   * [lifecycleAction]         [lifecycleAction]       [lifecycleEvent]
78   * |                         |                       |
79   * e2--[actionHasEvent]-&gt;-- Event 2 ---[hasEvent]--&lt;--e2
80   * \/                        \/                      \/
81   * [lifecycleAction]         [lifecycleAction]       [lifecycleEvent]
82   * |                         |                       |
83   * e1--[actionHasEvent]-&gt;-- Event 1 ---[hasEvent]--&lt;--e1
84   * </code>
85   * </pre>
86   */
87  public final class ActionManager {
88  
89      // Name of the global event root node, from whence event
90      // streams propagate.
91      private static final Logger logger = LoggerFactory.getLogger(ActionManager.class);
92  
93      public static final String GLOBAL_EVENT_ROOT = "globalEventRoot";
94      public static final String DEBUG_TYPE = "_debugType";
95      public static final String EVENT_LINK = "EventLink";
96      public static final String LINK_TYPE = "_linkType";
97  
98      private final FramedGraph<?> graph;
99      private final GraphManager manager;
100     private final Entity scope;
101     private final Serializer versionSerializer;
102     private final BundleManager dao;
103 
104     /**
105      * Constructor with scope.
106      *
107      * @param graph The framed graph
108      */
109     public ActionManager(FramedGraph<?> graph, Entity scope) {
110         this.graph = graph;
111         this.manager = GraphManagerFactory.getInstance(graph);
112         this.scope = Optional.ofNullable(scope).orElse(SystemScope.getInstance());
113         this.versionSerializer = new Serializer.Builder(graph).dependentOnly().build();
114         this.dao = new BundleManager(graph);
115     }
116 
117     /**
118      * Constructor.
119      *
120      * @param graph The framed graph
121      */
122     public ActionManager(FramedGraph<?> graph) {
123         this(graph, SystemScope.getInstance());
124     }
125 
126     /**
127      * EventContext is a handle to a particular action to which additional
128      * subjects can be added.
129      */
130     public class EventContext {
131         private final Actioner actioner;
132         private final EventTypes actionType;
133         private final Optional<String> logMessage;
134         private final Set<Pair<Entity, Bundle>> toVersion;
135         private final Set<Accessible> subjects;
136         private final String timestamp;
137 
138         /**
139          * Create a new event context.
140          *
141          * @param actioner   The actioner
142          * @param type       The event type
143          * @param logMessage An optional log message
144          */
145         EventContext(Actioner actioner,
146                 EventTypes type,
147                 String timestamp, Optional<String> logMessage,
148                 Set<Pair<Entity, Bundle>> toVersion) {
149             this.actionType = type;
150             this.actioner = actioner;
151             this.logMessage = logMessage;
152             this.toVersion = toVersion;
153             this.subjects = Sets.newHashSet();
154             this.timestamp = timestamp;
155         }
156 
157         /**
158          * Get the event actioner.
159          *
160          * @return The actioner
161          */
162         public Actioner getActioner() {
163             return this.actioner;
164         }
165 
166         /**
167          * Get the event context log message.
168          *
169          * @return The optional log message
170          */
171         public Optional<String> getLogMessage() {
172             return this.logMessage;
173         }
174 
175         /**
176          * Return the subjects of this event.
177          *
178          * @return a set of subjects
179          */
180         public Set<Accessible> getSubjects() {
181             return subjects;
182         }
183 
184         /**
185          * Create a snapshot of the given node's data.
186          *
187          * @param frame The subject node
188          * @return This event context
189          */
190         public EventContext createVersion(Entity frame) {
191             try {
192                 Bundle bundle = versionSerializer.entityToBundle(frame);
193                 return createVersion(frame, bundle);
194             } catch (SerializationError serializationError) {
195                 throw new RuntimeException(serializationError);
196             }
197         }
198 
199         /**
200          * Create a snapshot of the given node using the provided data. This is
201          * useful when the node has already been changed at this point the snapshot
202          * is taken.
203          *
204          * @param frame  The subject node
205          * @param bundle A bundle of the node's data
206          * @return This event context
207          */
208         public EventContext createVersion(Entity frame, Bundle bundle) {
209             Bundle versionBundle = Bundle.Builder.withClass(EntityClass.VERSION)
210                     .addDataValue(Ontology.VERSION_ENTITY_ID, frame.getId())
211                     .addDataValue(Ontology.VERSION_ENTITY_CLASS, frame.getType())
212                     .addDataValue(Ontology.VERSION_ENTITY_DATA, bundle.toJson())
213                     .build();
214             toVersion.add(new ImmutablePair<>(frame, versionBundle));
215             return this;
216         }
217 
218         /**
219          * Add subjects to an event.
220          *
221          * @param entities A set of event subjects
222          * @return This event context
223          */
224         public EventContext addSubjects(Accessible... entities) {
225             Collections.addAll(subjects, entities);
226             return this;
227         }
228 
229         /**
230          * Get the type of this event context.
231          *
232          * @return The event context type
233          */
234         public EventTypes getEventType() {
235             return actionType;
236         }
237 
238         /**
239          * Flush this event log to the graph.
240          */
241         public SystemEvent commit() {
242             Vertex vertex = getLinkNode(Ontology.ACTIONER_HAS_LIFECYCLE_ACTION);
243             replaceAtHead(actioner.asVertex(), vertex,
244                     Ontology.ACTIONER_HAS_LIFECYCLE_ACTION,
245                     Ontology.ACTIONER_HAS_LIFECYCLE_ACTION, Direction.OUT);
246             SystemEvent systemEvent = createGlobalEvent(timestamp, actionType, logMessage);
247             addActionerLink(systemEvent.asVertex(), vertex);
248 
249             for (Entity entity : subjects) {
250                 Vertex subjectVertex = getLinkNode(
251                         Ontology.ENTITY_HAS_LIFECYCLE_EVENT);
252                 replaceAtHead(entity.asVertex(), subjectVertex,
253                         Ontology.ENTITY_HAS_LIFECYCLE_EVENT,
254                         Ontology.ENTITY_HAS_LIFECYCLE_EVENT, Direction.OUT);
255                 addSubjectLink(systemEvent.asVertex(), subjectVertex);
256             }
257 
258             // Create the version.
259             if (!toVersion.isEmpty()) {
260                 try {
261                     for (Pair<Entity, Bundle> entityBundle : toVersion) {
262                         Entity subject = entityBundle.getKey();
263                         Bundle version = entityBundle.getValue();
264                         Version ev = dao.create(version, Version.class);
265                         replaceAtHead(subject.asVertex(), ev.asVertex(),
266                                 Ontology.ENTITY_HAS_PRIOR_VERSION,
267                                 Ontology.ENTITY_HAS_PRIOR_VERSION, Direction.OUT);
268                         graph.addEdge(null, ev.asVertex(),
269                                 systemEvent.asVertex(), Ontology.VERSION_HAS_EVENT);
270                     }
271                 } catch (ValidationError validationError) {
272                     throw new RuntimeException(validationError);
273                 }
274             }
275 
276             return systemEvent;
277         }
278     }
279 
280     public SystemEventQueue getEventRoot() {
281         try {
282             return manager.getEntity(GLOBAL_EVENT_ROOT, EntityClass.SYSTEM, SystemEventQueue.class);
283         } catch (ItemNotFound itemNotFound) {
284             throw new RuntimeException("Fatal error: system node (id: 'system') was not found. " +
285                     "Perhaps the graph was incorrectly initialised?");
286         }
287     }
288 
289     /**
290      * Get the latest global event.
291      *
292      * @return The latest event node
293      */
294     public SystemEvent getLatestGlobalEvent() {
295         Iterable<SystemEvent> latest = getEventRoot().getSystemEvents();
296         return latest.iterator().hasNext() ? latest.iterator().next() : null;
297     }
298 
299     /**
300      * Get an iterable of global events in most-recent-first order.
301      *
302      * @return A iterable of event nodes
303      */
304     public Iterable<SystemEvent> getLatestGlobalEvents() {
305         try {
306             SystemEventQueue queue = manager.getEntity(
307                     GLOBAL_EVENT_ROOT, EntityClass.SYSTEM, SystemEventQueue.class);
308             return queue.getSystemEvents();
309         } catch (ItemNotFound itemNotFound) {
310             throw new RuntimeException("Couldn't find system event queue!");
311         }
312     }
313 
314     /**
315      * Create an action node describing something that user U has done.
316      *
317      * @param user       The actioner
318      * @param type       The event type
319      * @param logMessage An optional log message
320      * @return An EventContext object
321      */
322     public EventContext newEventContext(Actioner user, EventTypes type, Optional<String> logMessage) {
323         return new EventContext(user, type, getTimestamp(), logMessage,
324                 Sets.<Pair<Entity, Bundle>>newHashSet());
325     }
326 
327     /**
328      * Create an action node describing something that user U has done.
329      *
330      * @param user The actioner
331      * @param type The event type
332      * @return An EventContext object
333      */
334     public EventContext newEventContext(Actioner user, EventTypes type) {
335         return new EventContext(user, type, getTimestamp(), Optional.<String>empty(),
336                 Sets.<Pair<Entity, Bundle>>newHashSet());
337     }
338 
339     /**
340      * Create an action for the given subject, user, and type.
341      *
342      * @param subject The subject node
343      * @param user    The actioner
344      * @param type    The event type
345      * @return An EventContext object
346      */
347     public EventContext newEventContext(Accessible subject, Actioner user,
348             EventTypes type) {
349         return newEventContext(subject, user, type, Optional.<String>empty());
350     }
351 
352     /**
353      * Create an action node that describes what user U has done with subject S
354      * via logMessage log.
355      *
356      * @param subject    The subjject node
357      * @param user       The actioner
358      * @param logMessage A log message
359      * @return An EventContext object
360      */
361     public EventContext newEventContext(Accessible subject, Actioner user,
362             EventTypes type, Optional<String> logMessage) {
363         EventContext context = newEventContext(user, type, logMessage);
364         context.addSubjects(subject);
365         return context;
366     }
367 
368     /**
369      * Set the scope of this action.
370      *
371      * @param frame The current permission scope
372      * @return A new ActionManager instance.
373      */
374     public ActionManager setScope(Entity frame) {
375         return new ActionManager(graph,
376                 Optional.ofNullable(frame).orElse(SystemScope.getInstance()));
377     }
378 
379 
380     /**
381      * Determine if two events are the same according to the following
382      * definition:
383      * <p>
384      * <ol>
385      * <li>They have the same scope and subject</li>
386      * <li>They have the same actioner</li>
387      * <li>They are both of the same type</li>
388      * <li>They have the same log message, if any</li>
389      * </ol>
390      * <p>
391      * This function allows filtering an event stream for duplicates,
392      * like someone repeatedly updating the same item.
393      *
394      * @param event1            the first event
395      * @param event2            the second event
396      * @param timeDiffInSeconds the elapsed time between the events
397      * @return whether or not the events are effectively the same.
398      */
399     public static boolean canAggregate(SystemEvent event1, SystemEvent event2, int timeDiffInSeconds) {
400         // NB: Fetching all these props and relations is potentially quite
401         // costly, so we want to short-circuit and return early is possible,
402         // starting with the least-costly to fetch attributes.
403         EventTypes eventType1 = event1.getEventType();
404         EventTypes eventType2 = event2.getEventType();
405         if (eventType1 != null && eventType2 != null && !eventType1.equals(eventType2)) {
406             return false;
407         }
408 
409         String logMessage1 = event1.getLogMessage();
410         String logMessage2 = event2.getLogMessage();
411         if (logMessage1 != null && logMessage2 != null && !logMessage1.equals(logMessage2)) {
412             return false;
413         }
414 
415         if (timeDiffInSeconds > -1) {
416             DateTime event1Time = DateTime.parse(event1.getTimestamp());
417             DateTime event2Time = DateTime.parse(event2.getTimestamp());
418             int timeDiff = Seconds.secondsBetween(event1Time, event2Time).getSeconds();
419             if (timeDiff >= timeDiffInSeconds) {
420                 return false;
421             }
422         }
423 
424         Entity eventScope1 = event1.getEventScope();
425         Entity eventScope2 = event2.getEventScope();
426         if (eventScope1 != null && eventScope2 != null && !eventScope1.equals(eventScope2)) {
427             return false;
428         }
429 
430         Accessible entity1 = event1.getFirstSubject();
431         Accessible entity2 = event2.getFirstSubject();
432         if (entity1 != null && entity2 != null && !entity1.equals(entity2)) {
433             return false;
434         }
435 
436         Actioner actioner1 = event1.getActioner();
437         Actioner actioner2 = event2.getActioner();
438         if (actioner1 != null && actioner2 != null && !actioner1.equals(actioner2)) {
439             return false;
440         }
441 
442         // Okay, fall through...
443         return true;
444     }
445 
446     /**
447      * Test if two events are sequential with the same actioner. This is used
448      * for aggregating repeated events.
449      *
450      * @param first  the first temporal event
451      * @param second the second temporal event
452      * @return whether events can be aggregated by user
453      */
454     public static boolean sequentialWithSameAccessor(SystemEvent first, SystemEvent second) {
455         Vertex firstVertex = first.asVertex();
456         Vertex secondVertex = second.asVertex();
457 
458         for (Vertex link1 : firstVertex.getVertices(Direction.IN, Ontology.ACTION_HAS_EVENT)) {
459             for (Vertex link2 : secondVertex.getVertices(Direction.IN, Ontology.ACTION_HAS_EVENT)) {
460                 for (Edge chain : link2.getEdges(Direction.OUT, Ontology.ACTIONER_HAS_LIFECYCLE_ACTION)) {
461                     return chain.getVertex(Direction.IN).equals(link1);
462                 }
463             }
464         }
465         return false;
466     }
467 
468     public static boolean sameAs(SystemEvent event1, SystemEvent event2) {
469         return canAggregate(event1, event2, -1);
470     }
471 
472     // Helpers.
473 
474     private SystemEvent createGlobalEvent(String timestamp, EventTypes type, Optional<String> logMessage) {
475         // Create a global event and insert it at the head of the system queue. The
476         // relationship from the *system* node to the new latest action is
477         // *type* Stream.
478         try {
479             logger.trace("Creating global event root");
480             Vertex system = manager.getVertex(GLOBAL_EVENT_ROOT);
481             Bundle ge = Bundle.Builder.withClass(EntityClass.SYSTEM_EVENT)
482                     .addDataValue(Ontology.EVENT_TYPE, type.toString())
483                     .addDataValue(Ontology.EVENT_TIMESTAMP, timestamp)
484                     .addDataValue(Ontology.EVENT_LOG_MESSAGE, logMessage.orElse(""))
485                     .build();
486             SystemEvent ev = dao.create(ge, SystemEvent.class);
487             if (!scope.equals(SystemScope.getInstance())) {
488                 ev.setEventScope(scope);
489             }
490             replaceAtHead(system, ev.asVertex(), Ontology.ACTIONER_HAS_LIFECYCLE_ACTION + "Stream", Ontology.ACTIONER_HAS_LIFECYCLE_ACTION, Direction.OUT);
491             return ev;
492         } catch (ItemNotFound e) {
493             e.printStackTrace();
494             throw new RuntimeException("Fatal error: system node (id: 'system') was not found. " +
495                     "Perhaps the graph was incorrectly initialised?");
496         } catch (ValidationError e) {
497             e.printStackTrace();
498             throw new RuntimeException(
499                     "Unexpected validation error creating action", e);
500         }
501     }
502 
503     /**
504      * Create a link vertex. This we stamp with a descriptive
505      * type purely for debugging purposes.
506      */
507     private Vertex getLinkNode(String linkType) {
508         try {
509             return dao.create(Bundle.Builder.withClass(EntityClass.EVENT_LINK)
510                             .addDataValue(DEBUG_TYPE, EVENT_LINK)
511                             .addDataValue(LINK_TYPE, linkType).build(),
512                     EventLink.class).asVertex();
513         } catch (ValidationError e) {
514             throw new RuntimeException(e);
515         }
516     }
517 
518     /**
519      * Add a subjectLinkNode node to an event.
520      *
521      * @param event           The event node
522      * @param subjectLinkNode The subjectLinkNode node
523      */
524     private void addSubjectLink(Vertex event, Vertex subjectLinkNode) {
525         graph.addEdge(null, subjectLinkNode, event, Ontology.ENTITY_HAS_EVENT);
526     }
527 
528     private void addActionerLink(Vertex event, Vertex actionerLinkNode) {
529         graph.addEdge(null, actionerLinkNode, event, Ontology.ACTION_HAS_EVENT);
530     }
531 
532     /**
533      * Given a vertex <em>head</em> that forms that start of a chain <em>relation</em> with
534      * direction <em>direction</em>, insert vertex <em>insert</em> <strong>after</strong>
535      * the head of the chain.
536      *
537      * @param head      The current vertex queue head node
538      * @param newHead   The replacement head node
539      * @param relation  The relationship between them
540      * @param direction The direction of the relationship
541      */
542     private void replaceAtHead(Vertex head, Vertex newHead, String headRelation,
543             String relation, Direction direction) {
544         Iterator<Vertex> iter = head.getVertices(direction, headRelation).iterator();
545         if (iter.hasNext()) {
546             Vertex current = iter.next();
547             for (Edge e : head.getEdges(direction, headRelation)) {
548                 graph.removeEdge(e);
549             }
550             graph.addEdge(null, newHead, current, relation);
551 
552         }
553         graph.addEdge(null, head, newHead, headRelation);
554     }
555 
556     /**
557      * Get the current time as a timestamp.
558      *
559      * @return The current time in ISO DateTime format.
560      */
561     public static String getTimestamp() {
562         return ISODateTimeFormat.dateTime().print(DateTime.now());
563     }
564 }