View Javadoc

1   /*
2    * Copyright 2015 Data Archiving and Networked Services (an institute of
3    * Koninklijke Nederlandse Akademie van Wetenschappen), King's College London,
4    * Georg-August-Universitaet Goettingen Stiftung Oeffentlichen Rechts
5    *
6    * Licensed under the EUPL, Version 1.1 or – as soon they will be approved by
7    * the European Commission - subsequent versions of the EUPL (the "Licence");
8    * You may not use this work except in compliance with the Licence.
9    * You may obtain a copy of the Licence at:
10   *
11   * https://joinup.ec.europa.eu/software/page/eupl
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the Licence is distributed on an "AS IS" basis,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the Licence for the specific language governing
17   * permissions and limitations under the Licence.
18   */
19  
20  package eu.ehri.project.importers.managers;
21  
22  import com.google.common.base.Preconditions;
23  import com.tinkerpop.frames.FramedGraph;
24  import eu.ehri.project.definitions.EventTypes;
25  import eu.ehri.project.exceptions.ValidationError;
26  import eu.ehri.project.importers.ImportLog;
27  import eu.ehri.project.importers.base.ItemImporter;
28  import eu.ehri.project.importers.exceptions.InputParseError;
29  import eu.ehri.project.importers.exceptions.ModeViolation;
30  import eu.ehri.project.models.base.Accessible;
31  import eu.ehri.project.models.base.Actioner;
32  import eu.ehri.project.models.base.PermissionScope;
33  import eu.ehri.project.persistence.ActionManager;
34  import eu.ehri.project.persistence.Mutation;
35  import org.apache.commons.compress.archivers.ArchiveEntry;
36  import org.apache.commons.compress.archivers.ArchiveInputStream;
37  import org.apache.commons.io.input.BoundedInputStream;
38  import org.slf4j.Logger;
39  import org.slf4j.LoggerFactory;
40  
41  import java.io.IOException;
42  import java.io.InputStream;
43  import java.nio.file.Files;
44  import java.nio.file.Paths;
45  import java.util.List;
46  import java.util.Optional;
47  
48  /**
49   * Base ImportManager.
50   */
51  public abstract class AbstractImportManager implements ImportManager {
52  
53      private static final Logger logger = LoggerFactory.getLogger(AbstractImportManager.class);
54      protected final FramedGraph<?> framedGraph;
55      protected final PermissionScope permissionScope;
56      protected final Actioner actioner;
57      protected final boolean tolerant;
58      protected final boolean allowUpdates;
59  
60      // Ugly stateful variables for tracking import state
61      // and reporting errors usefully...
62      private String currentFile;
63      protected Integer currentPosition;
64      protected final Class<? extends ItemImporter> importerClass;
65  
66      /**
67       * Constructor.
68       *
69       * @param graph         the framed graph
70       * @param scope         the permission scope
71       * @param actioner      the actioner
72       * @param tolerant      allow individual items to fail validation without
73       *                      failing an entire batch
74       * @param allowUpdates  allow this import manager to update data items as well
75       *                      as create them
76       * @param importerClass the class of the item importer object
77       */
78      public AbstractImportManager(
79              FramedGraph<?> graph,
80              PermissionScope scope, Actioner actioner,
81              boolean tolerant,
82              boolean allowUpdates,
83              Class<? extends ItemImporter> importerClass) {
84          Preconditions.checkNotNull(scope, "Scope cannot be null");
85          this.framedGraph = graph;
86          this.permissionScope = scope;
87          this.actioner = actioner;
88          this.tolerant = tolerant;
89          this.allowUpdates = allowUpdates;
90          this.importerClass = importerClass;
91      }
92  
93      /**
94       * Determine if the importer is in tolerant mode.
95       *
96       * @return a boolean value
97       */
98      public boolean isTolerant() {
99          return tolerant;
100     }
101 
102     @Override
103     public ImportLog importFile(String filePath, String logMessage)
104             throws IOException, InputParseError, ValidationError {
105         try (InputStream ios = Files.newInputStream(Paths.get(filePath))) {
106             return importInputStream(ios, filePath, logMessage);
107         }
108     }
109 
110     @Override
111     public ImportLog importInputStream(InputStream stream, String tag, String logMessage)
112             throws IOException, InputParseError, ValidationError {
113         // Create a new action for this import
114         Optional<String> msg = getLogMessage(logMessage);
115         ActionManager.EventContext action = new ActionManager(
116                 framedGraph, permissionScope).newEventContext(actioner,
117                 EventTypes.ingest, msg);
118         // Create a manifest to store the results of the import.
119         ImportLog log = new ImportLog(msg.orElse(null));
120 
121         // Do the import...
122         importInputStream(stream, tag, action, log);
123         // If nothing was imported, remove the action...
124         if (log.hasDoneWork()) {
125             action.commit();
126         }
127 
128         return log;
129     }
130 
131     @Override
132     public ImportLog importFiles(List<String> filePaths, String logMessage)
133             throws IOException, ValidationError, InputParseError {
134         try {
135 
136             Optional<String> msg = getLogMessage(logMessage);
137             ActionManager.EventContext action = new ActionManager(
138                     framedGraph, permissionScope).newEventContext(actioner,
139                     EventTypes.ingest, msg);
140             ImportLog log = new ImportLog(msg.orElse(null));
141             for (String path : filePaths) {
142                 try {
143                     currentFile = path;
144                     try (InputStream stream = Files.newInputStream(Paths.get(path))) {
145                         logger.info("Importing file: {}", path);
146                         importInputStream(stream, currentFile, action, log);
147                     }
148                 } catch (ValidationError e) {
149                     log.addError(formatErrorLocation(), e.getMessage());
150                     if (!tolerant) {
151                         throw e;
152                     }
153                 }
154             }
155 
156             // Only mark the transaction successful if we're
157             // actually accomplished something.
158             if (log.hasDoneWork()) {
159                 action.commit();
160             }
161 
162             return log;
163         } catch (Exception e) {
164             e.printStackTrace();
165             throw new RuntimeException(e);
166         }
167     }
168 
169     @Override
170     public ImportLog importArchive(ArchiveInputStream stream, String logMessage)
171             throws IOException, InputParseError, ValidationError {
172         Optional<String> msg = getLogMessage(logMessage);
173         ActionManager.EventContext action = new ActionManager(
174                 framedGraph, permissionScope).newEventContext(actioner,
175                 EventTypes.ingest, msg);
176         ImportLog log = new ImportLog(msg.orElse(null));
177 
178         ArchiveEntry entry;
179         while ((entry = stream.getNextEntry()) != null) {
180             try {
181                 if (!entry.isDirectory()) {
182                     currentFile = entry.getName();
183                     BoundedInputStream boundedInputStream
184                             = new BoundedInputStream(stream, entry.getSize());
185                     boundedInputStream.setPropagateClose(false);
186                     logger.info("Importing file: {}", currentFile);
187                     importInputStream(boundedInputStream, currentFile, action, log);
188                 }
189             } catch (InputParseError | ValidationError e) {
190                 log.addError(formatErrorLocation(), e.getMessage());
191                 if (!tolerant) {
192                     throw e;
193                 }
194             }
195         }
196 
197         // Only mark the transaction successful if we're
198         // actually accomplished something.
199         if (log.hasDoneWork()) {
200             action.commit();
201         }
202 
203         return log;
204     }
205 
206     /**
207      * Import an InputStream with an event context.
208      *
209      * @param stream  the InputStream to import
210      * @param tag        an optional tag identifying the source of the stream
211      * @param context the event that this import is part of
212      * @param log     an import log to write to
213      */
214     protected abstract void importInputStream(InputStream stream,
215             String tag, ActionManager.EventContext context, ImportLog log)
216             throws IOException, ValidationError, InputParseError;
217 
218     /**
219      * A default handler for import callbacks which adds the item to the
220      * log and event context.
221      *
222      * @param log      an import log
223      * @param context  an event context
224      * @param mutation the item mutation
225      */
226     void defaultImportCallback(ImportLog log, ActionManager.EventContext context, Mutation<? extends Accessible> mutation) {
227         switch (mutation.getState()) {
228             case CREATED:
229                 logger.info("Item created: {}", mutation.getNode().getId());
230                 context.addSubjects(mutation.getNode());
231                 log.addCreated();
232                 break;
233             case UPDATED:
234                 if (!allowUpdates) {
235                     throw new ModeViolation(String.format(
236                             "Item '%s' was updated but import manager does not allow updates",
237                             mutation.getNode().getId()));
238                 }
239                 logger.info("Item updated: {}", mutation.getNode().getId());
240                 context.addSubjects(mutation.getNode());
241                 log.addUpdated();
242                 break;
243             default:
244                 log.addUnchanged();
245         }
246     }
247 
248     /**
249      * A default handler for error callbacks which adds the error to
250      * the log and throws it if the importer is not in tolerant mode.
251      *
252      * @param log an import log
253      * @param ex  the propagated exception
254      */
255     void defaultErrorCallback(ImportLog log, Exception ex) {
256         // Otherwise, check if we had a validation error that was
257         // thrown for an individual item and only re-throw if
258         // tolerant is off.
259         if (ex instanceof ValidationError) {
260             ValidationError e = (ValidationError) ex;
261             log.addError(e.getBundle().getId(), e.getErrorSet().toString());
262             if (!isTolerant()) {
263                 throw new RuntimeException(e);
264             }
265         } else {
266             throw new RuntimeException(ex);
267         }
268     }
269 
270     // Helpers
271 
272     private Optional<String> getLogMessage(String msg) {
273         return (msg == null || msg.trim().isEmpty())
274                 ? Optional.empty()
275                 : Optional.of(msg);
276     }
277 
278     private String formatErrorLocation() {
279         return String.format("File: %s, XML document: %d", currentFile,
280                 currentPosition);
281     }
282 }