View Javadoc

1   package eu.ehri.project.importers.json;
2   
3   import com.fasterxml.jackson.databind.RuntimeJsonMappingException;
4   import com.google.common.collect.Lists;
5   import com.tinkerpop.blueprints.CloseableIterable;
6   import com.tinkerpop.frames.FramedGraph;
7   import eu.ehri.project.acl.SystemScope;
8   import eu.ehri.project.core.GraphManager;
9   import eu.ehri.project.core.GraphManagerFactory;
10  import eu.ehri.project.definitions.EventTypes;
11  import eu.ehri.project.exceptions.DeserializationError;
12  import eu.ehri.project.exceptions.ItemNotFound;
13  import eu.ehri.project.exceptions.SerializationError;
14  import eu.ehri.project.exceptions.ValidationError;
15  import eu.ehri.project.importers.ImportCallback;
16  import eu.ehri.project.importers.ImportLog;
17  import eu.ehri.project.models.base.Accessible;
18  import eu.ehri.project.models.base.Actioner;
19  import eu.ehri.project.models.base.Entity;
20  import eu.ehri.project.models.base.PermissionScope;
21  import eu.ehri.project.persistence.ActionManager;
22  import eu.ehri.project.persistence.Bundle;
23  import eu.ehri.project.persistence.BundleManager;
24  import eu.ehri.project.persistence.Mutation;
25  import eu.ehri.project.persistence.Serializer;
26  import org.slf4j.Logger;
27  import org.slf4j.LoggerFactory;
28  
29  import java.io.InputStream;
30  import java.util.Collections;
31  import java.util.List;
32  import java.util.Optional;
33  
34  
35  /**
36   * Batch operations for JSON data in Bundle format.
37   */
38  public class BatchOperations {
39      private static final Logger logger = LoggerFactory.getLogger(BatchOperations.class);
40      private final FramedGraph<?> graph;
41      private final ActionManager actionManager;
42      private final BundleManager dao;
43      private final Serializer serializer;
44      private final GraphManager manager;
45      private final PermissionScope scope;
46      private final boolean version;
47      private final boolean tolerant;
48      private final List<ImportCallback> callbacks;
49  
50      /**
51       * Constructor.
52       *
53       * @param graph    the graph object
54       * @param scopeOpt a nullable scope entity
55       * @param version  whether to created versioned for changed items
56       * @param tolerant whether to allow individual validation errors
57       *                 without failing the entire batch
58       */
59      public BatchOperations(FramedGraph<?> graph, PermissionScope scopeOpt, boolean version, boolean tolerant,
60              List<ImportCallback> callbacks) {
61          this.graph = graph;
62          this.scope = Optional.ofNullable(scopeOpt).orElse(SystemScope.getInstance());
63          this.version = version;
64          this.tolerant = tolerant;
65          this.callbacks = callbacks;
66  
67          this.manager = GraphManagerFactory.getInstance(graph);
68          this.actionManager = new ActionManager(graph, scope);
69          this.dao = new BundleManager(graph, scope.idPath());
70          this.serializer = new Serializer.Builder(graph).dependentOnly().build();
71      }
72  
73      /**
74       * Simple constructor, with system scope, version creation
75       * activated and tolerant mode off.
76       *
77       * @param graph the graph object
78       */
79      public BatchOperations(FramedGraph<?> graph) {
80          this(graph, SystemScope.getInstance(), true, false, Collections.emptyList());
81      }
82  
83      /**
84       * Toggle tolerant mode, which will prevent the entire batch
85       * failing if a single item fails to validate.
86       *
87       * @param tolerant true or false
88       * @return a new batch operation manager
89       */
90      public BatchOperations setTolerant(boolean tolerant) {
91          return new BatchOperations(graph, scope, version, tolerant, callbacks);
92      }
93  
94      /**
95       * Toggle versioning for item updates.
96       *
97       * @param versioning true or false
98       * @return a new batch operation manager
99       */
100     public BatchOperations setVersioning(boolean versioning) {
101         return new BatchOperations(graph, scope, versioning, tolerant, callbacks);
102     }
103 
104     /**
105      * Set the permission scope.
106      *
107      * @param scope a permission scope frame
108      * @return a new batch operation manager
109      */
110     public BatchOperations setScope(PermissionScope scope) {
111         return new BatchOperations(graph, scope, version, tolerant, callbacks);
112     }
113 
114     /**
115      * Add import callbacks to the importer. Note: order of execution
116      * is undefined.
117      *
118      * @param callbacks one or more ImportCallback instances
119      * @return a new batch operation manager
120      */
121     public BatchOperations withCallbacks(ImportCallback ...callbacks) {
122         List<ImportCallback> newCallbacks = Lists.newArrayList(callbacks);
123         newCallbacks.addAll(this.callbacks);
124         return new BatchOperations(graph,  scope, version, tolerant, newCallbacks);
125     }
126 
127     /**
128      * Create or update a batch of items.
129      *
130      * @param inputStream an input stream containing a JSON list of
131      *                    bundles corresponding to the items to update
132      *                    or create.
133      * @param actioner    the current user
134      * @param logMessage  a log message
135      * @return an import log
136      */
137     public ImportLog batchImport(InputStream inputStream, Actioner actioner, Optional<String> logMessage)
138             throws DeserializationError, ItemNotFound, ValidationError {
139         ActionManager.EventContext ctx = actionManager.newEventContext(actioner,
140                 EventTypes.modification, logMessage);
141         ImportLog log = new ImportLog(logMessage.orElse(null));
142         try (CloseableIterable<Bundle> bundleIter = Bundle.bundleStream(inputStream)) {
143             for (Bundle bundle : bundleIter) {
144                 try {
145                     Mutation<Accessible> mutation = dao.createOrUpdate(bundle, Accessible.class);
146                     switch (mutation.getState()) {
147                         case UPDATED:
148                             log.addUpdated();
149                             ctx.addSubjects(mutation.getNode());
150                             if (version) {
151                                 mutation.getPrior().ifPresent(b ->
152                                         ctx.createVersion(mutation.getNode(), b));
153                             }
154                             break;
155                         case CREATED:
156                             log.addCreated();
157                             ctx.addSubjects(mutation.getNode());
158                             break;
159                         default:
160                             log.addUnchanged();
161                     }
162                     for (ImportCallback callback : callbacks) {
163                         callback.itemImported(mutation);
164                     }
165                 } catch (ValidationError e) {
166                     if (!tolerant) {
167                         throw e;
168                     } else {
169                         log.addError(bundle.getId(), e.getMessage());
170                         logger.warn("Validation error patching {}: {}", bundle.getId(), e);
171                     }
172                 }
173             }
174             if (log.hasDoneWork()) {
175                 ctx.commit();
176             }
177             return log;
178         } catch (RuntimeJsonMappingException e) {
179             throw new DeserializationError("Error reading JSON stream:", e);
180         }
181     }
182 
183     /**
184      * Update a batch of items.
185      *
186      * @param inputStream an input stream containing a JSON list of
187      *                    bundles corresponding to the items to update.
188      * @param actioner    the current user
189      * @param logMessage  a log message
190      * @return an import log
191      */
192     public ImportLog batchUpdate(InputStream inputStream, Actioner actioner, Optional<String> logMessage)
193             throws DeserializationError, ItemNotFound, ValidationError {
194         ActionManager.EventContext ctx = actionManager.newEventContext(actioner,
195                 EventTypes.modification, logMessage);
196         ImportLog log = new ImportLog(logMessage.orElse(null));
197         try (CloseableIterable<Bundle> bundleIter = Bundle.bundleStream(inputStream)) {
198             for (Bundle bundle : bundleIter) {
199                 Entity entity = manager.getEntity(bundle.getId(), bundle.getType().getJavaClass());
200                 Bundle oldBundle = serializer.entityToBundle(entity);
201                 Bundle newBundle = oldBundle.mergeDataWith(bundle);
202                 try {
203                     Mutation<Accessible> update = dao.update(newBundle, Accessible.class);
204                     switch (update.getState()) {
205                         case UPDATED:
206                             log.addUpdated();
207                             ctx.addSubjects(update.getNode());
208                             if (version) {
209                                 ctx.createVersion(entity, oldBundle);
210                             }
211                             break;
212                         case UNCHANGED:
213                             log.addUnchanged();
214                             break;
215                         default:
216                             throw new RuntimeException("Unexpected status in batch update: " + update.getState());
217                     }
218                 } catch (ValidationError e) {
219                     if (!tolerant) {
220                         throw e;
221                     } else {
222                         log.addError(entity.getId(), e.getMessage());
223                         logger.warn("Validation error patching {}: {}", entity.getId(), e);
224                     }
225                 }
226             }
227             if (log.hasDoneWork()) {
228                 ctx.commit();
229             }
230             return log;
231         } catch (SerializationError serializationError) {
232             throw new RuntimeException(serializationError);
233         } catch (RuntimeJsonMappingException e) {
234             throw new DeserializationError("Error reading JSON stream:", e);
235         }
236     }
237 
238     /**
239      * Delete a batch of items by ID.
240      *
241      * @param ids        a list of item IDs
242      * @param actioner   the current user
243      * @param logMessage an optional log message
244      * @return the number of items deleted
245      */
246     public int batchDelete(List<String> ids, Actioner actioner, Optional<String> logMessage)
247             throws ItemNotFound {
248         int done = 0;
249         if (!ids.isEmpty()) {
250             try {
251                 ActionManager.EventContext ctx = actionManager.newEventContext(actioner,
252                         EventTypes.deletion, logMessage);
253                 for (String id : ids) {
254                     Entity entity = manager.getEntity(id, Entity.class);
255                     ctx = ctx.addSubjects(entity.as(Accessible.class));
256                     if (version) {
257                         ctx = ctx.createVersion(entity);
258                     }
259                 }
260                 ctx.commit();
261                 for (String id : ids) {
262                     dao.delete(serializer.entityToBundle(manager.getEntity(id, Entity.class)));
263                     done++;
264                 }
265             } catch (SerializationError serializationError) {
266                 throw new RuntimeException(serializationError);
267             }
268         }
269         return done;
270     }
271 }