View Javadoc

1   package eu.ehri.project.importers.ead;
2   
3   import com.google.common.base.Preconditions;
4   import com.google.common.collect.BiMap;
5   import com.google.common.collect.HashBiMap;
6   import com.google.common.collect.LinkedHashMultimap;
7   import com.google.common.collect.Lists;
8   import com.google.common.collect.Multimap;
9   import com.google.common.collect.Sets;
10  import com.tinkerpop.frames.FramedGraph;
11  import eu.ehri.project.api.Api;
12  import eu.ehri.project.core.GraphManager;
13  import eu.ehri.project.core.GraphManagerFactory;
14  import eu.ehri.project.definitions.Entities;
15  import eu.ehri.project.definitions.EventTypes;
16  import eu.ehri.project.definitions.Ontology;
17  import eu.ehri.project.exceptions.DeserializationError;
18  import eu.ehri.project.exceptions.ItemNotFound;
19  import eu.ehri.project.exceptions.PermissionDenied;
20  import eu.ehri.project.exceptions.SerializationError;
21  import eu.ehri.project.exceptions.ValidationError;
22  import eu.ehri.project.importers.ImportLog;
23  import eu.ehri.project.importers.exceptions.InputParseError;
24  import eu.ehri.project.importers.managers.ImportManager;
25  import eu.ehri.project.importers.managers.SaxImportManager;
26  import eu.ehri.project.models.Annotation;
27  import eu.ehri.project.models.DocumentaryUnit;
28  import eu.ehri.project.models.Link;
29  import eu.ehri.project.models.Repository;
30  import eu.ehri.project.models.base.Actioner;
31  import eu.ehri.project.models.base.Annotatable;
32  import eu.ehri.project.models.base.PermissionScope;
33  import eu.ehri.project.persistence.ActionManager;
34  import eu.ehri.project.persistence.Bundle;
35  import eu.ehri.project.persistence.Serializer;
36  import org.apache.commons.compress.archivers.ArchiveException;
37  import org.neo4j.helpers.collection.Iterables;
38  import org.slf4j.Logger;
39  import org.slf4j.LoggerFactory;
40  
41  import java.io.IOException;
42  import java.util.List;
43  import java.util.Map;
44  import java.util.Objects;
45  import java.util.Optional;
46  import java.util.Set;
47  import java.util.function.BiFunction;
48  
49  
50  /**
51   * Synchronise items in a repository or fonds from EAD data.
52   */
53  public class EadSync {
54  
55      private static final Logger logger = LoggerFactory.getLogger(EadSync.class);
56  
57      private final FramedGraph<?> graph;
58      private final Api api;
59      private final PermissionScope scope;
60      private final Actioner actioner;
61      private final SaxImportManager importManager;
62      private final GraphManager manager;
63      private final Serializer depSerializer;
64  
65      public EadSync(
66              FramedGraph<?> graph,
67              Api api,
68              PermissionScope scope,
69              Actioner actioner,
70              SaxImportManager importManager) {
71          this.graph = graph;
72          this.api = api;
73          this.scope = scope;
74          this.actioner = actioner;
75          this.importManager = importManager;
76          this.manager = GraphManagerFactory.getInstance(graph);
77          this.depSerializer = api.serializer().withDependentOnly(true);
78      }
79  
80      /**
81       * An operation that performs the actual ingest, given the {@link ImportManager}
82       * and returning an {@link ImportLog}. This is a function because the actual
83       * ingest may depend on situational details of the data of no interest to us
84       * here.
85       */
86      public interface EadIngestOperation {
87          ImportLog runIngest(ImportManager manager) throws ValidationError, DeserializationError, IOException;
88      }
89  
90      /**
91       * Signal that something has gone wrong with the sync operation.
92       */
93      public class EadSyncError extends Exception {
94          EadSyncError(String message, Throwable underlying) {
95              super(message, underlying);
96          }
97  
98          EadSyncError(String message) {
99              super(message);
100         }
101     }
102 
103     /**
104      * Synchronise an archival scope from EAD files. The actual ingest
105      * operation is delegated to the given {@link SaxImportManager} instance.
106      * <p>
107      * For sync to work correctly the local (scoped) item identifiers
108      * <strong>must</strong> be unique in the scope in which sync is
109      * taking place.
110      *
111      * @param op         the ingest operation
112      * @param logMessage a log message that will be attached to the delete event
113      * @return a {@link SyncLog} instance
114      * @throws EadSyncError if local identifiers are not unique
115      */
116     public SyncLog sync(EadIngestOperation op, Set<String> excludes, String logMessage)
117             throws ValidationError, DeserializationError, IOException, EadSyncError {
118 
119         // Get a mapping of graph ID to local ID within the scope,
120         // Pre-sync, ALL of the local IDs must be unique (and
121         // the BiMap will error if they aren't.)
122         BiMap<String, String> oldGraphToLocal = HashBiMap.create();
123         try {
124             for (DocumentaryUnit unit : itemsInScope(scope)) {
125                 oldGraphToLocal.put(unit.getId(), unit.getIdentifier());
126             }
127         } catch (IllegalArgumentException e) {
128             throw new EadSyncError("Local identifiers are not unique", e);
129         }
130 
131         // Remove anything specifically excluded. This would typically
132         // be items in the scope that are not being synced in this operation.
133         excludes.forEach(oldGraphToLocal::remove);
134 
135         logger.debug("Items in scope prior to sync: {}", oldGraphToLocal.size());
136 
137         // Keep a mapping of new graph to local IDs in the ingest operation
138         BiMap<String, String> newGraphToLocal = HashBiMap.create();
139 
140         // Add a callback to the import manager so we can collect the
141         // IDs of new items and run the ingest operation.
142         ImportManager manager = importManager.withCallback(m -> {
143             DocumentaryUnit doc = m.getNode().as(DocumentaryUnit.class);
144             newGraphToLocal.put(doc.getId(), doc.getIdentifier());
145         });
146         // Actually run the ingest...
147         ImportLog log = op.runIngest(manager);
148 
149         // Find moved items... this gets us a map of old graph ID to new graph ID
150         BiMap<String, String> movedGraphIds = findMovedItems(scope, oldGraphToLocal);
151 
152         // All-new items are those in the new set but not the old set.
153         Set<String> allNewGraphIds = Sets.difference(newGraphToLocal.keySet(), oldGraphToLocal.keySet());
154 
155         // Items to be deleted are in the old set but not in the new set.
156         Set<String> allDeletedGraphIds = Sets.difference(oldGraphToLocal.keySet(), newGraphToLocal.keySet());
157 
158         // These are just the created or deleted items, minus those that have moved.
159         Set<String> createdIds = Sets.difference(allNewGraphIds, movedGraphIds.values());
160         Set<String> deletedIds = Sets.difference(allDeletedGraphIds, movedGraphIds.keySet());
161 
162         // Transfer user-generated annotations and links between moved items...
163         transferUserGeneratedContent(movedGraphIds, logMessage);
164 
165         // Delete items that have been deleted or moved...
166         deleteDeadOrMoved(allDeletedGraphIds, logMessage);
167 
168         logger.debug("Created items: {}, Deleted items: {}, Moved items: {}",
169                 createdIds.size(), deletedIds.size(), movedGraphIds.size());
170 
171         return new SyncLog(log, createdIds, deletedIds, movedGraphIds);
172     }
173 
174     private void transferUserGeneratedContent(BiMap<String, String> movedGraphIds, String logMessage) {
175         if (!movedGraphIds.isEmpty()) {
176             try {
177                 int modified = 0;
178                 Api api = this.api.enableLogging(false);
179                 ActionManager actionManager = api.actionManager().setScope(scope);
180                 ActionManager.EventContext ctx = actionManager
181                         .newEventContext(actioner, EventTypes.modification, Optional.ofNullable(logMessage));
182 
183                 for (Map.Entry<String, String> entry : movedGraphIds.entrySet()) {
184                     DocumentaryUnit from = api.detail(entry.getKey(), DocumentaryUnit.class);
185                     DocumentaryUnit to = api.detail(entry.getValue(), DocumentaryUnit.class);
186                     boolean changed = transferUserGeneratedContent(from, to);
187                     if (changed) {
188                         ctx.addSubjects(to);
189                         modified++;
190                     }
191                 }
192 
193                 if (modified > 0) {
194                     ctx.commit();
195                 }
196 
197                 logger.debug("Transferred user-generated content from {} items...", modified);
198             } catch (SerializationError | ItemNotFound e) {
199                 throw new RuntimeException("Unexpected error when transferring user generated content", e);
200             }
201         }
202     }
203 
204     private boolean transferUserGeneratedContent(DocumentaryUnit from, DocumentaryUnit to)
205             throws SerializationError, ItemNotFound {
206         int moved = 0;
207         List<Link> links = Lists.newArrayList(from.getLinks());
208         for (Link link : links) {
209             if (link.getLinkBodies().iterator().hasNext()) {
210                 // Skip links with a body...
211                 continue;
212             }
213             logger.debug("Moving link from {} to {}...", from.getId(), to.getId());
214             to.addLink(link);
215             moved++;
216         }
217         List<Annotation> annotations = Lists.newArrayList(from.getAnnotations());
218         for (Annotation annotation : annotations) {
219             logger.debug("Moving annotation from {} to {}...", from.getId(), to.getId());
220             to.addAnnotation(annotation);
221             for (Annotatable part : annotation.getTargetParts()) {
222                 findPart(part, to).ifPresent(altPart -> {
223                     logger.debug("Found equivalent target part: {}", altPart.getId());
224                     altPart.addAnnotationPart(annotation);
225                 });
226             }
227             moved++;
228         }
229         return moved > 0;
230     }
231 
232     private Optional<Annotatable> findPart(Annotatable orig, DocumentaryUnit newParent)
233             throws SerializationError, ItemNotFound {
234         Bundle newParentBundle = depSerializer.entityToBundle(newParent);
235         Bundle dep = depSerializer.entityToBundle(orig);
236 
237         BiFunction<Bundle, Bundle, Boolean> isEquivalentDescription =
238                 (Bundle a, Bundle b) -> Objects.equals(a.getType(), b.getType())
239                         && Objects.equals(a.getDataValue(Ontology.LANGUAGE), b.getDataValue(Ontology.LANGUAGE))
240                         && Objects.equals(a.getDataValue(Ontology.IDENTIFIER_KEY), b.getDataValue(Ontology.IDENTIFIER_KEY));
241 
242         Optional<Bundle> bundle = newParentBundle.find(b ->
243                 b.equals(dep) || isEquivalentDescription.apply(dep, b));
244         return bundle.isPresent()
245                 ? Optional.of(manager.getEntity(bundle.get().getId(), Annotatable.class))
246                 : Optional.empty();
247     }
248 
249     private void deleteDeadOrMoved(Set<String> toDeleteGraphIds, String logMessage) throws ValidationError {
250         if (!toDeleteGraphIds.isEmpty()) {
251             try {
252                 Api api = this.api.enableLogging(false);
253                 ActionManager actionManager = api.actionManager().setScope(scope);
254                 ActionManager.EventContext ctx = actionManager
255                         .newEventContext(actioner, EventTypes.deletion, Optional.ofNullable(logMessage));
256                 for (String id : toDeleteGraphIds) {
257                     DocumentaryUnit item = api.detail(id, DocumentaryUnit.class);
258                     ctx.addSubjects(item);
259                     ctx.createVersion(item);
260                 }
261                 ctx.commit();
262                 for (String id : toDeleteGraphIds) {
263                     api.delete(id);
264                 }
265                 logger.debug("Finished deleting {} items...", toDeleteGraphIds.size());
266             } catch (ItemNotFound | SerializationError | PermissionDenied e) {
267                 throw new RuntimeException("Unexpected error when deleting item", e);
268             }
269         }
270     }
271 
272     private BiMap<String, String> findMovedItems(PermissionScope scope, Map<String, String> lookup)
273             throws EadSyncError {
274         BiMap<String, String> moved = HashBiMap.create();
275 
276         logger.debug("Starting moved item scan...");
277         long start = System.nanoTime();
278         // NB: This method of finding moved items uses a lot of memory for big
279         // repositories, but is dramatically faster than the alternative, which
280         // is for every item to loop over every _other_ item and check if it has the
281         // same local ID and a different graph one. This becomes impractically slow
282         // with many thousands of items.
283         // Here we use a multimap and look for items where one local ID maps to
284         // two graph IDs.
285         Multimap<String, String> localToGraph = LinkedHashMultimap.create();
286         for (DocumentaryUnit unit : itemsInScope(scope)) {
287             localToGraph.put(unit.getIdentifier(), unit.getId());
288         }
289 
290         localToGraph.asMap().forEach((localId, graphIds) -> {
291             List<String> ids = Lists.newArrayList(graphIds);
292             if (ids.size() > 1) {
293                 Preconditions.checkState(ids.size() == 2,
294                         "Unexpected situation in EAD sync. Item " + localId +
295                                 " cannot be unique since after sync ingest there are it exists in more than two places: " + ids);
296                 String first = ids.get(0);
297                 String second = ids.get(1);
298                 if (lookup.containsKey(first)) {
299                     moved.put(first, second);
300                 } else if (lookup.containsKey(second)) {
301                     moved.put(second, first);
302                 } else {
303                     throw new RuntimeException(
304                             "Unexpected situation: 'moved' item not found in before-set... " + localId);
305                 }
306             }
307         });
308         long end = System.nanoTime();
309         logger.debug("Completed moved item scan in {} milli secs", (end - start) / 1_000_000);
310 
311         return moved;
312     }
313 
314     private Iterable<DocumentaryUnit> itemsInScope(PermissionScope scope) throws EadSyncError {
315         switch (scope.getType()) {
316             case Entities.DOCUMENTARY_UNIT:
317                 // If the scope is a doc unit, the full set of items includes itself
318                 return Iterables.append(scope.as(DocumentaryUnit.class),
319                         scope.as(DocumentaryUnit.class).getAllChildren());
320             case Entities.REPOSITORY:
321                 return scope.as(Repository.class).getAllDocumentaryUnits();
322             default:
323                 throw new EadSyncError("Scope must be a repository or a documentary unit");
324         }
325     }
326 }