package eu.dnetlib.oai.mongo;

import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.mongodb.BasicDBObject;
import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.DBObject;
import com.mongodb.WriteConcern;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.Sorts;
import com.mongodb.client.model.UpdateOptions;
import eu.dnetlib.cql.CqlTranslator;
import eu.dnetlib.oai.PublisherField;
import eu.dnetlib.oai.PublisherStore;
import eu.dnetlib.oai.RecordChangeDetector;
import eu.dnetlib.oai.conf.OAIConfigurationReader;
import eu.dnetlib.oai.info.RecordInfo;
import eu.dnetlib.oai.info.SetInfo;
import eu.dnetlib.oai.parser.PublisherRecordParser;
import eu.dnetlib.oai.sets.MongoSetCollection;
import eu.dnetlib.rmi.provision.OaiPublisherRuntimeException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.bson.conversions.Bson;
import org.bson.types.Binary;

/* loaded from: input_file:eu/dnetlib/oai/mongo/MongoPublisherStore.class */
public class MongoPublisherStore implements PublisherStore<DNetOAIMongoCursor> {
    private static final Log log = LogFactory.getLog(MongoPublisherStore.class);
    private String id;
    private String metadataFormat;
    private String interpretation;
    private String layout;
    private List<PublisherField> mongoFields;
    private MongoCollection<DBObject> collection;
    private MongoCollection<DBObject> discardedCollection;
    private CqlTranslator cqlTranslator;
    private RecordInfoGenerator recordInfoGenerator;
    private MetadataExtractor metadataExtractor;
    private RecordChangeDetector recordChangeDetector;
    private MongoSetCollection mongoSetCollection;
    private String idScheme;
    private String idNamespace;
    private boolean alwaysNewRecord;

    public MongoPublisherStore() {
    }

    public MongoPublisherStore(String str, String str2, String str3, String str4, MongoCollection<DBObject> mongoCollection, List<PublisherField> list, CqlTranslator cqlTranslator, RecordInfoGenerator recordInfoGenerator, String str5, String str6, MetadataExtractor metadataExtractor, RecordChangeDetector recordChangeDetector, boolean z, MongoDatabase mongoDatabase) {
        this.id = str;
        this.metadataFormat = str2;
        this.interpretation = str3;
        this.layout = str4;
        this.collection = mongoCollection;
        this.discardedCollection = mongoDatabase.getCollection("discarded-" + mongoCollection.getNamespace().getCollectionName(), DBObject.class);
        this.mongoFields = list;
        this.cqlTranslator = cqlTranslator;
        this.recordInfoGenerator = recordInfoGenerator;
        this.idScheme = str5;
        this.idNamespace = str6;
        this.metadataExtractor = metadataExtractor;
        this.recordChangeDetector = recordChangeDetector;
        this.alwaysNewRecord = z;
    }

    @Override // eu.dnetlib.oai.PublisherStore
    public RecordInfo getRecord(String str) {
        return this.recordInfoGenerator.transformDBObject((DBObject) this.collection.find(Filters.eq(OAIConfigurationReader.ID_FIELD, str)).first(), true);
    }

    @Override // eu.dnetlib.oai.PublisherStore
    public RecordInfo getRecord(String str, Function<String, String> function) {
        RecordInfo record = getRecord(str);
        if (record != null) {
            record.setMetadata(function.apply(record.getMetadata()));
        }
        return record;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // eu.dnetlib.oai.PublisherStore
    public DNetOAIMongoCursor getRecords(String str, boolean z, int i) {
        return new DNetOAIMongoCursor(loggedFindByQuery(str, i).iterator(), z, this.recordInfoGenerator, this.metadataExtractor);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // eu.dnetlib.oai.PublisherStore
    public DNetOAIMongoCursor getRecords(String str, Function<String, String> function, boolean z, int i) {
        return new DNetOAIMongoCursor(loggedFindByQuery(str, i).iterator(), function, z, this.recordInfoGenerator, this.metadataExtractor);
    }

    private FindIterable<DBObject> loggedFindByQuery(String str, int i) {
        Bson parseQuery = parseQuery(str);
        long currentTimeMillis = System.currentTimeMillis();
        FindIterable<DBObject> limit = this.collection.find(parseQuery).sort(Sorts.orderBy(new Bson[]{Sorts.ascending(new String[]{"_id"})})).limit(i);
        log.debug("Query:" + parseQuery + "\ntime to get mongo iterable (ms): " + (System.currentTimeMillis() - currentTimeMillis));
        return limit;
    }

    private Bson parseQuery(String str) {
        try {
            return this.cqlTranslator.toMongo(str);
        } catch (Exception e) {
            throw new OaiPublisherRuntimeException(e);
        }
    }

    @Override // eu.dnetlib.oai.PublisherStore
    public List<PublisherField> getIndices() {
        return this.mongoFields;
    }

    @Override // eu.dnetlib.oai.PublisherStore
    public void ensureIndices() {
        IndexOptions background = new IndexOptions().background(true);
        Stopwatch createUnstarted = Stopwatch.createUnstarted();
        createUnstarted.start();
        Iterator<PublisherField> it = this.mongoFields.iterator();
        while (it.hasNext()) {
            BasicDBObject basicDBObject = new BasicDBObject(it.next().getFieldName(), 1);
            log.debug("Creating index on store " + this.id + " : " + basicDBObject);
            this.collection.createIndex(basicDBObject, background);
        }
        log.debug("Creating index over : datestamp");
        this.collection.createIndex(new BasicDBObject(OAIConfigurationReader.DATESTAMP_FIELD, 1), background);
        log.debug("Creating index over : lastCollectionDate");
        this.collection.createIndex(new BasicDBObject(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, 1), background);
        createUnstarted.stop();
        log.info("All indexes have been updated in " + createUnstarted.elapsed(TimeUnit.MILLISECONDS) + " milliseconds");
    }

    public void createCompoundIndex(List<String> list) {
        if (list == null || list.isEmpty()) {
            log.fatal("No fields specified for the creation of the compound index");
        }
        BasicDBObjectBuilder start = BasicDBObjectBuilder.start();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            start.add(it.next(), 1);
        }
        BasicDBObject basicDBObject = start.get();
        log.info("Creating index " + basicDBObject + " on " + getId());
        getCollection().createIndex(basicDBObject, new IndexOptions().background(true));
    }

    private void dropDiscarded(String str) {
        if (StringUtils.isBlank(str)) {
            log.debug("Dropping discarded records from publisherStore " + this.id);
            this.discardedCollection.drop();
        } else {
            log.debug("Dropping discarded records for source " + str + " from publisherStore " + this.id);
            this.discardedCollection.deleteMany(Filters.eq(OAIConfigurationReader.SET_FIELD, str));
        }
    }

    @Override // eu.dnetlib.oai.PublisherStore
    public int feed(Iterable<String> iterable, String str) {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(80);
        Object obj = new Object();
        dropDiscarded(str);
        Date date = new Date();
        Thread thread = new Thread(() -> {
            MongoCollection<DBObject> withWriteConcern = this.collection.withWriteConcern(WriteConcern.UNACKNOWLEDGED);
            while (true) {
                try {
                    Object take = arrayBlockingQueue.take();
                    if (take == obj) {
                        return;
                    } else {
                        safeFeedRecord((String) take, str, date, withWriteConcern);
                    }
                } catch (InterruptedException e) {
                    log.fatal("got exception in background thread", e);
                    throw new IllegalStateException(e);
                }
            }
        });
        thread.start();
        long time = date.getTime();
        try {
            log.info("feeding publisherStore " + this.id);
            Iterator<String> it = iterable.iterator();
            while (it.hasNext()) {
                arrayBlockingQueue.put(it.next());
            }
            arrayBlockingQueue.put(obj);
            log.info("finished feeding publisherStore " + this.id);
            thread.join();
            log.fatal("OAI STORE " + this.id + " FEEDING COMPLETED IN " + (System.currentTimeMillis() - time) + "ms");
            setDeletedFlags(date, str);
            if (StringUtils.isNotBlank(str)) {
                upsertSets(Lists.newArrayList(new String[]{str}));
            }
            return count();
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
    }

    private void setDeletedFlags(final Date date, final String str) {
        final MongoCollection withWriteConcern = this.collection.withWriteConcern(WriteConcern.ACKNOWLEDGED);
        Thread thread = new Thread(new Runnable() { // from class: eu.dnetlib.oai.mongo.MongoPublisherStore.1
            @Override // java.lang.Runnable
            public void run() {
                Bson and = Filters.and(new Bson[]{Filters.eq(OAIConfigurationReader.DELETED_FIELD, false), Filters.lt(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, date)});
                if (!StringUtils.isBlank(str)) {
                    and = Filters.and(new Bson[]{and, Filters.eq(OAIConfigurationReader.SET_FIELD, MongoPublisherStore.this.mongoSetCollection.normalizeSetSpec(str))});
                }
                MongoPublisherStore.log.debug("Delete flag query: " + and);
                BasicDBObject basicDBObject = new BasicDBObject("$set", BasicDBObjectBuilder.start(OAIConfigurationReader.DELETED_FIELD, true).append(OAIConfigurationReader.DATESTAMP_FIELD, date).append(OAIConfigurationReader.UPDATED_FIELD, true).get());
                MongoPublisherStore.log.debug("Updating as: " + basicDBObject.toString());
                MongoPublisherStore.log.debug("Deleted flags set for source: " + str + " #records = " + withWriteConcern.updateMany(and, basicDBObject, new UpdateOptions().upsert(false)).getModifiedCount());
            }
        });
        thread.start();
        try {
            thread.join();
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
    }

    @Override // eu.dnetlib.oai.PublisherStore
    public void drop() {
        this.collection.drop();
    }

    @Override // eu.dnetlib.oai.PublisherStore
    public void drop(String str) {
        log.debug("Deleted by query: " + str + " #deleted: " + this.collection.deleteMany(parseQuery(str)).getDeletedCount());
    }

    @Override // eu.dnetlib.oai.PublisherStore
    public int count() {
        return (int) this.collection.count();
    }

    @Override // eu.dnetlib.oai.PublisherStore
    public int count(String str) {
        if (StringUtils.isBlank(str)) {
            return (int) this.collection.count();
        }
        return (int) this.collection.count(parseQuery(str));
    }

    public List<String> getDistinctSetNamesFromRecords() {
        log.info("Going to ask for all distinct sets in the oaistore " + this.id + ": this may take a long time...");
        return Lists.newArrayList(this.collection.distinct(OAIConfigurationReader.SET_FIELD, String.class));
    }

    private boolean safeFeedRecord(String str, String str2, Date date, MongoCollection<DBObject> mongoCollection) {
        try {
            if (str.isEmpty()) {
                return false;
            }
            return feedRecord(str, str2, date, mongoCollection);
        } catch (Throwable th) {
            log.error("Got unhandled exception while parsing record", th);
            this.discardedCollection.insertOne(new BasicDBObject(OAIConfigurationReader.SET_FIELD, str2).append(OAIConfigurationReader.BODY_FIELD, str));
            return false;
        }
    }

    private boolean feedRecord(String str, String str2, Date date, MongoCollection<DBObject> mongoCollection) {
        PublisherRecordParser publisherRecordParser = new PublisherRecordParser(this.mongoFields);
        log.debug("configured parser for fields: " + this.mongoFields);
        Multimap<String, String> parseRecord = publisherRecordParser.parseRecord(str, str2);
        if (!parseRecord.containsKey(OAIConfigurationReader.ID_FIELD)) {
            log.error("parsed record seems invalid -- no identifier property with name: objIdentifier");
            log.error("Extracted property map: \n" + parseRecord);
            log.debug("from: \n" + str);
            this.discardedCollection.insertOne(new BasicDBObject(OAIConfigurationReader.SET_FIELD, str2).append(OAIConfigurationReader.BODY_FIELD, str).append(OAIConfigurationReader.DATESTAMP_FIELD, date));
            return false;
        }
        String oAIIdentifier = getOAIIdentifier((String) parseRecord.get(OAIConfigurationReader.ID_FIELD).iterator().next());
        if (isNewRecord(oAIIdentifier)) {
            feedNew(oAIIdentifier, str, parseRecord, date, mongoCollection);
            return true;
        }
        if (isChanged(oAIIdentifier, str)) {
            updateRecord(oAIIdentifier, str, parseRecord, date, mongoCollection);
            return false;
        }
        handleRecord(oAIIdentifier, date, mongoCollection);
        return false;
    }

    private BasicDBObject createBasicObject(String str, String str2, Multimap<String, String> multimap) {
        BasicDBObject basicDBObject = new BasicDBObject();
        for (String str3 : multimap.keySet()) {
            if (str3.equals(OAIConfigurationReader.ID_FIELD)) {
                basicDBObject.put(str3, str);
            } else {
                Collection collection = multimap.get(str3);
                if (str3.equals(OAIConfigurationReader.SET_FIELD)) {
                    basicDBObject.put(str3, (Iterable) collection.stream().map(str4 -> {
                        return this.mongoSetCollection.normalizeSetSpec(str4);
                    }).collect(Collectors.toList()));
                } else {
                    PublisherField publisherField = (PublisherField) Iterables.find(this.mongoFields, publisherField2 -> {
                        return publisherField2.getFieldName().equals(str3);
                    }, (Object) null);
                    if (publisherField == null) {
                        log.warn("Expected field to index: " + str3 + " could not be found, but we keep going...");
                    }
                    if (publisherField == null || publisherField.isRepeatable()) {
                        basicDBObject.put(str3, collection);
                    } else if (collection != null && !collection.isEmpty()) {
                        basicDBObject.put(str3, collection.iterator().next());
                    }
                }
            }
        }
        try {
            basicDBObject.put(OAIConfigurationReader.BODY_FIELD, createCompressRecord(str2));
            basicDBObject.put(OAIConfigurationReader.DELETED_FIELD, false);
            return basicDBObject;
        } catch (IOException e) {
            throw new OaiPublisherRuntimeException(e);
        }
    }

    public Binary createCompressRecord(String str) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        ZipOutputStream zipOutputStream = new ZipOutputStream(byteArrayOutputStream);
        zipOutputStream.putNextEntry(new ZipEntry(OAIConfigurationReader.BODY_FIELD));
        zipOutputStream.write(str.getBytes());
        zipOutputStream.closeEntry();
        zipOutputStream.flush();
        zipOutputStream.close();
        return new Binary(byteArrayOutputStream.toByteArray());
    }

    private void feedNew(String str, String str2, Multimap<String, String> multimap, Date date, MongoCollection<DBObject> mongoCollection) {
        log.debug("New record received. Assigned oai id: " + str);
        BasicDBObject createBasicObject = createBasicObject(str, str2, multimap);
        createBasicObject.put(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, date);
        createBasicObject.put(OAIConfigurationReader.DATESTAMP_FIELD, date);
        createBasicObject.put(OAIConfigurationReader.UPDATED_FIELD, false);
        mongoCollection.insertOne(createBasicObject);
        upsertSets(multimap.get(OAIConfigurationReader.SET_FIELD));
    }

    private void updateRecord(String str, String str2, Multimap<String, String> multimap, Date date, MongoCollection<DBObject> mongoCollection) {
        log.debug("updating record " + str);
        BasicDBObject createBasicObject = createBasicObject(str, str2, multimap);
        createBasicObject.put(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, date);
        createBasicObject.put(OAIConfigurationReader.DATESTAMP_FIELD, date);
        createBasicObject.put(OAIConfigurationReader.UPDATED_FIELD, true);
        mongoCollection.replaceOne(Filters.eq(OAIConfigurationReader.ID_FIELD, str), createBasicObject, new UpdateOptions().upsert(true));
        upsertSets(multimap.get(OAIConfigurationReader.SET_FIELD));
    }

    public void upsertSets(Iterable<String> iterable) {
        if (iterable != null) {
            for (String str : iterable) {
                if (StringUtils.isNotBlank(str)) {
                    SetInfo setInfo = new SetInfo();
                    String normalizeSetSpec = this.mongoSetCollection.normalizeSetSpec(str);
                    setInfo.setSetSpec(normalizeSetSpec);
                    setInfo.setSetName(str);
                    setInfo.setSetDescription("This set contains metadata records whose provenance is " + str);
                    setInfo.setEnabled(true);
                    setInfo.setQuery("(set = \"" + normalizeSetSpec + "\") ");
                    this.mongoSetCollection.upsertSet(setInfo, false, getCollection().getNamespace().getDatabaseName());
                }
            }
        }
    }

    private void handleRecord(String str, Date date, MongoCollection<DBObject> mongoCollection) {
        log.debug("handling unchanged record " + str);
        mongoCollection.updateOne(Filters.eq(OAIConfigurationReader.ID_FIELD, str), new BasicDBObject("$set", new BasicDBObject(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, date)), new UpdateOptions().upsert(true));
    }

    private boolean isNewRecord(String str) {
        return this.alwaysNewRecord || this.collection.count() == 0 || this.collection.find(Filters.eq(OAIConfigurationReader.ID_FIELD, str)).first() == null;
    }

    private boolean isChanged(String str, String str2) {
        RecordInfo record = getRecord(str);
        return record == null ? StringUtils.isBlank(str2) : this.recordChangeDetector.differs(record.getMetadata(), str2);
    }

    private String getOAIIdentifier(String str) {
        return this.idScheme + ":" + this.idNamespace + ":" + str;
    }

    public int hashCode() {
        return (31 * ((31 * ((31 * ((31 * ((31 * 1) + (this.collection == null ? 0 : this.collection.hashCode()))) + (this.id == null ? 0 : this.id.hashCode()))) + (this.interpretation == null ? 0 : this.interpretation.hashCode()))) + (this.layout == null ? 0 : this.layout.hashCode()))) + (this.metadataFormat == null ? 0 : this.metadataFormat.hashCode());
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || !(obj instanceof MongoPublisherStore)) {
            return false;
        }
        MongoPublisherStore mongoPublisherStore = (MongoPublisherStore) obj;
        if (this.collection == null) {
            if (mongoPublisherStore.collection != null) {
                return false;
            }
        } else if (!this.collection.equals(mongoPublisherStore.collection)) {
            return false;
        }
        if (this.id == null) {
            if (mongoPublisherStore.id != null) {
                return false;
            }
        } else if (!this.id.equals(mongoPublisherStore.id)) {
            return false;
        }
        if (this.interpretation == null) {
            if (mongoPublisherStore.interpretation != null) {
                return false;
            }
        } else if (!this.interpretation.equals(mongoPublisherStore.interpretation)) {
            return false;
        }
        if (this.layout == null) {
            if (mongoPublisherStore.layout != null) {
                return false;
            }
        } else if (!this.layout.equals(mongoPublisherStore.layout)) {
            return false;
        }
        return this.metadataFormat == null ? mongoPublisherStore.metadataFormat == null : this.metadataFormat.equals(mongoPublisherStore.metadataFormat);
    }

    public MongoCollection<DBObject> getCollection() {
        return this.collection;
    }

    public void setCollection(MongoCollection<DBObject> mongoCollection) {
        this.collection = mongoCollection;
    }

    public MongoCollection<DBObject> getDiscardedCollection() {
        return this.discardedCollection;
    }

    public void setDiscardedCollection(MongoCollection<DBObject> mongoCollection) {
        this.discardedCollection = mongoCollection;
    }

    public String getIdScheme() {
        return this.idScheme;
    }

    public void setIdScheme(String str) {
        this.idScheme = str;
    }

    public String getIdNamespace() {
        return this.idNamespace;
    }

    public void setIdNamespace(String str) {
        this.idNamespace = str;
    }

    public RecordInfoGenerator getRecordInfoGenerator() {
        return this.recordInfoGenerator;
    }

    public void setRecordInfoGenerator(RecordInfoGenerator recordInfoGenerator) {
        this.recordInfoGenerator = recordInfoGenerator;
    }

    public MetadataExtractor getMetadataExtractor() {
        return this.metadataExtractor;
    }

    public void setMetadataExtractor(MetadataExtractor metadataExtractor) {
        this.metadataExtractor = metadataExtractor;
    }

    public RecordChangeDetector getRecordChangeDetector() {
        return this.recordChangeDetector;
    }

    public void setRecordChangeDetector(RecordChangeDetector recordChangeDetector) {
        this.recordChangeDetector = recordChangeDetector;
    }

    @Override // eu.dnetlib.oai.PublisherStore
    public String getId() {
        return this.id;
    }

    public void setId(String str) {
        this.id = str;
    }

    @Override // eu.dnetlib.oai.PublisherStore
    public String getMetadataFormat() {
        return this.metadataFormat;
    }

    public void setMetadataFormat(String str) {
        this.metadataFormat = str;
    }

    @Override // eu.dnetlib.oai.PublisherStore
    public String getInterpretation() {
        return this.interpretation;
    }

    public void setInterpretation(String str) {
        this.interpretation = str;
    }

    @Override // eu.dnetlib.oai.PublisherStore
    public String getLayout() {
        return this.layout;
    }

    public void setLayout(String str) {
        this.layout = str;
    }

    public MongoSetCollection getMongoSetCollection() {
        return this.mongoSetCollection;
    }

    public void setMongoSetCollection(MongoSetCollection mongoSetCollection) {
        this.mongoSetCollection = mongoSetCollection;
    }

    public List<PublisherField> getMongoFields() {
        return this.mongoFields;
    }

    public void setMongoFields(List<PublisherField> list) {
        this.mongoFields = list;
    }

    public boolean isAlwaysNewRecord() {
        return this.alwaysNewRecord;
    }

    public void setAlwaysNewRecord(boolean z) {
        this.alwaysNewRecord = z;
    }

    @Override // eu.dnetlib.oai.PublisherStore
    public /* bridge */ /* synthetic */ DNetOAIMongoCursor getRecords(String str, Function function, boolean z, int i) {
        return getRecords(str, (Function<String, String>) function, z, i);
    }
}
