1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package eu.ehri.project.oaipmh;
21
22 import com.google.common.base.Joiner;
23 import com.google.common.base.Predicate;
24 import com.google.common.base.Splitter;
25 import com.google.common.collect.ImmutableList;
26 import com.google.common.collect.Iterables;
27 import eu.ehri.project.api.Api;
28 import eu.ehri.project.api.QueryApi;
29 import eu.ehri.project.definitions.Ontology;
30 import eu.ehri.project.exceptions.ItemNotFound;
31 import eu.ehri.project.models.Country;
32 import eu.ehri.project.models.DocumentaryUnit;
33 import eu.ehri.project.models.EntityClass;
34 import eu.ehri.project.models.Repository;
35 import eu.ehri.project.models.base.Accessible;
36 import eu.ehri.project.models.events.SystemEvent;
37 import eu.ehri.project.models.events.Version;
38 import eu.ehri.project.oaipmh.errors.OaiPmhError;
39 import eu.ehri.project.utils.LanguageHelpers;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 import java.time.ZonedDateTime;
44 import java.util.List;
45 import java.util.Optional;
46 import java.util.stream.Stream;
47 import java.util.stream.StreamSupport;
48
49
50
51
52
53 public class OaiPmhData {
54
55 private static final Logger log = LoggerFactory.getLogger(OaiPmhData.class);
56 private static final Splitter hierarchySplitter = Splitter.on('-');
57 private static final Joiner hierarchyJoiner = Joiner.on('-');
58 private static final Splitter setSpecSplitter = Splitter.on(':');
59 private static final Joiner setSpecJoiner = Joiner.on(':');
60
61 private final Api api;
62
63 private OaiPmhData(Api api) {
64 this.api = api;
65 }
66
67 public static OaiPmhData create(Api api) {
68 return new OaiPmhData(api);
69 }
70
71 QueryApi.Page<OaiPmhSet> getSets(OaiPmhState state) {
72 Stream<OaiPmhSet> setStream = getSets();
73 long count = state.hasLimit() ? getSets().count() : -1L;
74
75 Stream<OaiPmhSet> offsetStream = setStream.skip(state.getOffset());
76 Stream<OaiPmhSet> limitedSetStream = state.hasLimit()
77 ? offsetStream.limit(state.getLimit())
78 : offsetStream;
79 return new QueryApi.Page<>(limitedSetStream::iterator,
80 state.getOffset(), state.getLimit(), count);
81 }
82
83 QueryApi.Page<DocumentaryUnit> getFilteredDocumentaryUnits(OaiPmhState state) throws OaiPmhError {
84 String defaultTimestamp = api.actionManager().getEventRoot().getTimestamp();
85 Iterable<DocumentaryUnit> filtered = Iterables.filter(
86 getDocumentaryUnits(state.getSetSpec()), timeFilterItems(state.getFrom(), state.getUntil(), defaultTimestamp));
87 return api.query().setOffset(state.getOffset()).setLimit(state.getLimit())
88 .page(filtered, DocumentaryUnit.class);
89 }
90
91 OaiPmhRecordResult getRecord(OaiPmhState state) throws OaiPmhError {
92 try {
93 return OaiPmhRecordResult.of(api.detail(state.getIdentifier(), DocumentaryUnit.class));
94 } catch (ItemNotFound e) {
95 Optional<Version> deletedOpt = api.versionManager().versionAtDeletion(state.getIdentifier());
96 if (deletedOpt.isPresent()) {
97 return OaiPmhRecordResult.deleted(getDeletedRecord(deletedOpt.get()));
98 } else {
99 return OaiPmhRecordResult.invalid();
100 }
101 }
102 }
103
104 Iterable<OaiPmhDeleted> getFilteredDeletedDocumentaryUnits(final OaiPmhState state) {
105 String setSpec = state.getSetSpec();
106 if (setSpec == null || setSpec.trim().isEmpty()) {
107 return getDeletedDocumentaryUnits(state.getFrom(), state.getUntil());
108 } else {
109 List<String> specParts = Splitter.on(':').splitToList(setSpec);
110 return Iterables.filter(getDeletedDocumentaryUnits(state.getFrom(), state.getUntil()), d -> {
111 List<String> sets = d.getSets();
112 if (specParts.size() == 1 && sets.get(0).equals(specParts.get(0))) {
113 return true;
114 } else if (specParts.size() == 2 && sets.get(1).equals(specParts.get(1))) {
115 return true;
116 }
117 return false;
118 });
119 }
120 }
121
122 private OaiPmhDeleted getDeletedRecord(Version version) {
123
124
125 log.trace("Calculating deleted item for {}", version.getEntityId());
126 List<String> countryAndRepo = hierarchySplitter.limit(2).splitToList(version.getEntityId());
127 List<String> sets = ImmutableList.of(countryAndRepo.get(0),
128 setSpecJoiner.join(countryAndRepo.get(0), hierarchyJoiner.join(countryAndRepo)));
129 ZonedDateTime deletedAt = ZonedDateTime.parse(version.getTriggeringEvent().getTimestamp());
130 return new OaiPmhDeleted(version.getEntityId(), deletedAt, sets);
131 }
132
133 String getEarliestTimestamp() {
134 return api.actionManager().getEventRoot().getTimestamp();
135 }
136
137 private Iterable<DocumentaryUnit> getDocumentaryUnits(String setSpec) throws OaiPmhError {
138 return (setSpec == null || setSpec.trim().isEmpty())
139 ? getDocumentaryUnits()
140 : getDocumentaryUnitsFromSpecs(setSpec);
141 }
142
143 private Iterable<DocumentaryUnit> getDocumentaryUnitsFromSpecs(String setSpec) throws OaiPmhError {
144 try {
145 assert setSpec != null;
146 List<String> specParts = setSpecSplitter.splitToList(setSpec);
147 return specParts.size() == 1
148 ? api.detail(specParts.get(0), Country.class).getTopLevelDocumentaryUnits()
149 : api.detail(specParts.get(1), Repository.class).getTopLevelDocumentaryUnits();
150 } catch (ItemNotFound e) {
151
152 throw new OaiPmhError(ErrorCode.badArgument, "Invalid set spec: " + setSpec);
153 }
154 }
155
156 private Iterable<DocumentaryUnit> getDocumentaryUnits() {
157 QueryApi.Page<Country> countries = getQuery().page(EntityClass.COUNTRY, Country.class);
158 return Iterables.concat(Iterables.transform(countries, Country::getTopLevelDocumentaryUnits));
159 }
160
161 private QueryApi getQuery() {
162 return api.query().setStream(true).setLimit(-1)
163 .orderBy(Ontology.IDENTIFIER_KEY, QueryApi.Sort.ASC);
164 }
165
166 private Iterable<OaiPmhDeleted> getDeletedDocumentaryUnits(String from, String until) {
167 Iterable<OaiPmhDeleted> transform = Iterables.transform(
168 api.versionManager().versionsAtDeletion(EntityClass.DOCUMENTARY_UNIT, from, until),
169 this::getDeletedRecord);
170 return transform;
171 }
172
173 private Stream<OaiPmhSet> getSets() {
174 Stream<Country> stream = streamOf(getQuery().page(EntityClass.COUNTRY, Country.class));
175 return stream
176 .filter(c -> c.getTopLevelDocumentaryUnits().iterator().hasNext())
177 .flatMap(ct -> {
178 String countryId = ct.getId();
179 String countryName = LanguageHelpers.countryCodeToName(countryId);
180 OaiPmhSet countrySet = new OaiPmhSet(countryId, countryName,
181 "All items in repositories within country: " + countryName);
182 Stream<OaiPmhSet> repoSets = streamOf(getQuery().page(ct.getRepositories(), Repository.class))
183 .filter(r -> r.getTopLevelDocumentaryUnits().iterator().hasNext())
184 .map(r -> {
185 String repoName = r.getDescriptions().iterator().hasNext()
186 ? r.getDescriptions().iterator().next().getName()
187 : null;
188 return new OaiPmhSet(countryId + ":" + r.getId(), repoName, "All items within repository: " + repoName);
189 });
190 return Stream.concat(Stream.of(countrySet), repoSets);
191 });
192 }
193
194
195
196 private static <T> Stream<T> streamOf(Iterable<T> it) {
197 return StreamSupport.stream(it.spliterator(), false);
198 }
199
200 private static <E extends Accessible> Predicate<E> timeFilterItems(String from, String until, String defaultTimestamp) {
201 return d -> {
202 String ts = Optional.ofNullable(d.getLatestEvent())
203 .map(SystemEvent::getTimestamp).orElse(defaultTimestamp);
204 return filterByTimestamp(from, until, ts);
205 };
206 }
207
208 private static boolean filterByTimestamp(String from, String until, String timestamp) {
209 return (from == null || from.compareTo(timestamp) < 0)
210 && (until == null || until.compareTo(timestamp) >= 0);
211 }
212 }