/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.broker.matchers;

import eu.dnetlib.broker.common.elasticsearch.Event;
import eu.dnetlib.broker.common.elasticsearch.Notification;
import eu.dnetlib.broker.common.elasticsearch.NotificationRepository;
import eu.dnetlib.broker.common.properties.ElasticSearchProperties;
import eu.dnetlib.broker.common.subscriptions.MapCondition;
import eu.dnetlib.broker.common.subscriptions.Subscription;
import eu.dnetlib.broker.common.subscriptions.SubscriptionRepository;
import eu.dnetlib.broker.events.output.DispatcherManager;
import eu.dnetlib.broker.utils.LbsQueue;
import eu.dnetlib.broker.utils.QueueManager;
import eu.dnetlib.broker.utils.ThreadManager;
import java.util.ArrayList;
import java.util.Date;
import java.util.Objects;
import javax.annotation.PostConstruct;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.lucene.search.join.ScoreMode;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.Operator;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.SearchHit;
import org.springframework.data.elasticsearch.core.SearchHitsIterator;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.stereotype.Component;

@Profile(value={"!openaire"})
@Component
public class SubscriptionEventMatcher
implements Runnable {
    @Autowired
    private QueueManager queueManager;
    @Autowired
    private DispatcherManager dispatcherManager;
    @Autowired
    private SubscriptionRepository subscriptionRepo;
    @Autowired
    private ElasticSearchProperties elasticSearchProperties;
    private LbsQueue<Subscription, Subscription> queue;
    @Autowired
    private ElasticsearchOperations esOperations;
    @Autowired
    private NotificationRepository notificationRepository;
    @Autowired
    private ThreadManager threadManager;
    private static final Log log = LogFactory.getLog(SubscriptionEventMatcher.class);

    @PostConstruct
    public void init() {
        this.queue = this.queueManager.newQueue("subscr-events-matcher-queue", Subscription::isReady);
        this.threadManager.newThread("subscr-events-matcher", (Runnable)this);
    }

    public void startMatching(Subscription s) {
        if (this.queue.offer((Object)s)) {
            log.info((Object)("Matching of subscription " + s.getSubscriptionId() + " in queue"));
        } else {
            log.info((Object)("Subscription " + s.getSubscriptionId() + " not queued"));
        }
    }

    @Override
    public void run() {
        log.info((Object)("SubscriptionEventMatcher started: " + Thread.currentThread().getName()));
        while (true) {
            try {
                while (true) {
                    Subscription s;
                    if ((s = (Subscription)this.queue.takeOne()) == null || !Subscription.isReady((Subscription)s)) {
                        continue;
                    }
                    this.startSubscriptionEventsMatcher(s);
                    s.setLastNotificationDate(new Date());
                    this.subscriptionRepo.save((Object)s);
                }
            }
            catch (Throwable e) {
                log.error((Object)"Error iterating matching queue", e);
                continue;
            }
            break;
        }
    }

    private void startSubscriptionEventsMatcher(Subscription s) {
        log.info((Object)("Start matching subscription: " + s));
        try {
            BoolQueryBuilder mapQuery = QueryBuilders.boolQuery();
            s.getConditionsAsList().stream().map(MapCondition::asQueryBuilder).filter(Objects::nonNull).forEach(arg_0 -> ((BoolQueryBuilder)mapQuery).must(arg_0));
            NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery((QueryBuilder)QueryBuilders.boolQuery().must((QueryBuilder)QueryBuilders.matchQuery((String)"topic", (Object)s.getTopic()).operator(Operator.AND)).must((QueryBuilder)QueryBuilders.rangeQuery((String)"creationDate").from((Object)(s.getLastNotificationDate() != null ? s.getLastNotificationDate().getTime() : 0L))).must((QueryBuilder)QueryBuilders.nestedQuery((String)"map", (QueryBuilder)mapQuery, (ScoreMode)ScoreMode.None))).withSearchType(SearchType.DEFAULT).withPageable((Pageable)PageRequest.of((int)0, (int)10)).build();
            ArrayList<Event> events = new ArrayList<Event>();
            SearchHitsIterator it = this.esOperations.searchForStream((Query)searchQuery, Event.class, IndexCoordinates.of((String[])new String[]{this.elasticSearchProperties.getEventsIndexName()}));
            while (it.hasNext()) {
                Event e = (Event)((SearchHit)it.next()).getContent();
                Notification n = new Notification(s, e);
                if (!this.isNotAlreadyNotified(n)) continue;
                this.notificationRepository.save((Object)n);
                events.add(e);
            }
            log.info((Object)("End matching subscription: " + s.getSubscriptionId()));
            this.dispatcherManager.dispatch(s, events.toArray(new Event[events.size()]));
        }
        catch (Throwable e) {
            log.error((Object)("Error matching subscription: " + s.getSubscriptionId()), e);
        }
    }

    private boolean isNotAlreadyNotified(Notification n) {
        return !this.notificationRepository.findById((Object)n.getNotificationId()).isPresent();
    }
}

