1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package eu.ehri.project.importers.managers;
21
22 import com.google.common.base.Preconditions;
23 import com.tinkerpop.frames.FramedGraph;
24 import eu.ehri.project.definitions.EventTypes;
25 import eu.ehri.project.exceptions.ValidationError;
26 import eu.ehri.project.importers.ImportLog;
27 import eu.ehri.project.importers.base.ItemImporter;
28 import eu.ehri.project.importers.exceptions.InputParseError;
29 import eu.ehri.project.importers.exceptions.ModeViolation;
30 import eu.ehri.project.models.base.Accessible;
31 import eu.ehri.project.models.base.Actioner;
32 import eu.ehri.project.models.base.PermissionScope;
33 import eu.ehri.project.persistence.ActionManager;
34 import eu.ehri.project.persistence.Mutation;
35 import org.apache.commons.compress.archivers.ArchiveEntry;
36 import org.apache.commons.compress.archivers.ArchiveInputStream;
37 import org.apache.commons.io.input.BoundedInputStream;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 import java.io.IOException;
42 import java.io.InputStream;
43 import java.nio.file.Files;
44 import java.nio.file.Paths;
45 import java.util.List;
46 import java.util.Optional;
47
48
49
50
51 public abstract class AbstractImportManager implements ImportManager {
52
53 private static final Logger logger = LoggerFactory.getLogger(AbstractImportManager.class);
54 protected final FramedGraph<?> framedGraph;
55 protected final PermissionScope permissionScope;
56 protected final Actioner actioner;
57 protected final boolean tolerant;
58 protected final boolean allowUpdates;
59
60
61
62 private String currentFile;
63 protected Integer currentPosition;
64 protected final Class<? extends ItemImporter> importerClass;
65
66
67
68
69
70
71
72
73
74
75
76
77
78 public AbstractImportManager(
79 FramedGraph<?> graph,
80 PermissionScope scope, Actioner actioner,
81 boolean tolerant,
82 boolean allowUpdates,
83 Class<? extends ItemImporter> importerClass) {
84 Preconditions.checkNotNull(scope, "Scope cannot be null");
85 this.framedGraph = graph;
86 this.permissionScope = scope;
87 this.actioner = actioner;
88 this.tolerant = tolerant;
89 this.allowUpdates = allowUpdates;
90 this.importerClass = importerClass;
91 }
92
93
94
95
96
97
98 public boolean isTolerant() {
99 return tolerant;
100 }
101
102 @Override
103 public ImportLog importFile(String filePath, String logMessage)
104 throws IOException, InputParseError, ValidationError {
105 try (InputStream ios = Files.newInputStream(Paths.get(filePath))) {
106 return importInputStream(ios, filePath, logMessage);
107 }
108 }
109
110 @Override
111 public ImportLog importInputStream(InputStream stream, String tag, String logMessage)
112 throws IOException, InputParseError, ValidationError {
113
114 Optional<String> msg = getLogMessage(logMessage);
115 ActionManager.EventContext action = new ActionManager(
116 framedGraph, permissionScope).newEventContext(actioner,
117 EventTypes.ingest, msg);
118
119 ImportLog log = new ImportLog(msg.orElse(null));
120
121
122 importInputStream(stream, tag, action, log);
123
124 if (log.hasDoneWork()) {
125 action.commit();
126 }
127
128 return log;
129 }
130
131 @Override
132 public ImportLog importFiles(List<String> filePaths, String logMessage)
133 throws IOException, ValidationError, InputParseError {
134 try {
135
136 Optional<String> msg = getLogMessage(logMessage);
137 ActionManager.EventContext action = new ActionManager(
138 framedGraph, permissionScope).newEventContext(actioner,
139 EventTypes.ingest, msg);
140 ImportLog log = new ImportLog(msg.orElse(null));
141 for (String path : filePaths) {
142 try {
143 currentFile = path;
144 try (InputStream stream = Files.newInputStream(Paths.get(path))) {
145 logger.info("Importing file: {}", path);
146 importInputStream(stream, currentFile, action, log);
147 }
148 } catch (ValidationError e) {
149 log.addError(formatErrorLocation(), e.getMessage());
150 if (!tolerant) {
151 throw e;
152 }
153 }
154 }
155
156
157
158 if (log.hasDoneWork()) {
159 action.commit();
160 }
161
162 return log;
163 } catch (Exception e) {
164 e.printStackTrace();
165 throw new RuntimeException(e);
166 }
167 }
168
169 @Override
170 public ImportLog importArchive(ArchiveInputStream stream, String logMessage)
171 throws IOException, InputParseError, ValidationError {
172 Optional<String> msg = getLogMessage(logMessage);
173 ActionManager.EventContext action = new ActionManager(
174 framedGraph, permissionScope).newEventContext(actioner,
175 EventTypes.ingest, msg);
176 ImportLog log = new ImportLog(msg.orElse(null));
177
178 ArchiveEntry entry;
179 while ((entry = stream.getNextEntry()) != null) {
180 try {
181 if (!entry.isDirectory()) {
182 currentFile = entry.getName();
183 BoundedInputStream boundedInputStream
184 = new BoundedInputStream(stream, entry.getSize());
185 boundedInputStream.setPropagateClose(false);
186 logger.info("Importing file: {}", currentFile);
187 importInputStream(boundedInputStream, currentFile, action, log);
188 }
189 } catch (InputParseError | ValidationError e) {
190 log.addError(formatErrorLocation(), e.getMessage());
191 if (!tolerant) {
192 throw e;
193 }
194 }
195 }
196
197
198
199 if (log.hasDoneWork()) {
200 action.commit();
201 }
202
203 return log;
204 }
205
206
207
208
209
210
211
212
213
214 protected abstract void importInputStream(InputStream stream,
215 String tag, ActionManager.EventContext context, ImportLog log)
216 throws IOException, ValidationError, InputParseError;
217
218
219
220
221
222
223
224
225
226 void defaultImportCallback(ImportLog log, ActionManager.EventContext context, Mutation<? extends Accessible> mutation) {
227 switch (mutation.getState()) {
228 case CREATED:
229 logger.info("Item created: {}", mutation.getNode().getId());
230 context.addSubjects(mutation.getNode());
231 log.addCreated();
232 break;
233 case UPDATED:
234 if (!allowUpdates) {
235 throw new ModeViolation(String.format(
236 "Item '%s' was updated but import manager does not allow updates",
237 mutation.getNode().getId()));
238 }
239 logger.info("Item updated: {}", mutation.getNode().getId());
240 context.addSubjects(mutation.getNode());
241 log.addUpdated();
242 break;
243 default:
244 log.addUnchanged();
245 }
246 }
247
248
249
250
251
252
253
254
255 void defaultErrorCallback(ImportLog log, Exception ex) {
256
257
258
259 if (ex instanceof ValidationError) {
260 ValidationError e = (ValidationError) ex;
261 log.addError(e.getBundle().getId(), e.getErrorSet().toString());
262 if (!isTolerant()) {
263 throw new RuntimeException(e);
264 }
265 } else {
266 throw new RuntimeException(ex);
267 }
268 }
269
270
271
272 private Optional<String> getLogMessage(String msg) {
273 return (msg == null || msg.trim().isEmpty())
274 ? Optional.empty()
275 : Optional.of(msg);
276 }
277
278 private String formatErrorLocation() {
279 return String.format("File: %s, XML document: %d", currentFile,
280 currentPosition);
281 }
282 }