package eu.dnetlib.lbs.matchers;

import eu.dnetlib.lbs.elasticsearch.Event;
import eu.dnetlib.lbs.elasticsearch.Notification;
import eu.dnetlib.lbs.elasticsearch.NotificationRepository;
import eu.dnetlib.lbs.events.output.DispatcherManager;
import eu.dnetlib.lbs.properties.ElasticSearchProperties;
import eu.dnetlib.lbs.subscriptions.Subscription;
import eu.dnetlib.lbs.subscriptions.SubscriptionRepository;
import eu.dnetlib.lbs.utils.LbsQueue;
import eu.dnetlib.lbs.utils.QueueManager;
import eu.dnetlib.lbs.utils.ThreadManager;
import java.util.ArrayList;
import java.util.Date;
import java.util.Objects;
import java.util.stream.Stream;
import javax.annotation.PostConstruct;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.lucene.search.join.ScoreMode;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.SearchHit;
import org.springframework.data.elasticsearch.core.SearchHitsIterator;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:eu/dnetlib/lbs/matchers/SubscriptionEventMatcher.class */
public class SubscriptionEventMatcher implements Runnable {

    @Autowired
    private QueueManager queueManager;

    @Autowired
    private DispatcherManager dispatcherManager;

    @Autowired
    private SubscriptionRepository subscriptionRepo;

    @Autowired
    private ElasticSearchProperties elasticSearchProperties;
    private LbsQueue<Subscription, Subscription> queue;

    @Autowired
    private ElasticsearchOperations esOperations;

    @Autowired
    private NotificationRepository notificationRepository;

    @Autowired
    private ThreadManager threadManager;
    private static final Log log = LogFactory.getLog(SubscriptionEventMatcher.class);

    @PostConstruct
    public void init() {
        this.queue = this.queueManager.newQueue("subscr-events-matcher-queue", Subscription::isReady);
        this.threadManager.newThread("subscr-events-matcher", this);
    }

    public void startMatching(Subscription subscription) {
        if (this.queue.offer(subscription)) {
            log.info("Matching of subscription " + subscription.getSubscriptionId() + " in queue");
        } else {
            log.info("Subscription " + subscription.getSubscriptionId() + " not queued");
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        log.info("SubscriptionEventMatcher started: " + Thread.currentThread().getName());
        while (true) {
            try {
                Subscription takeOne = this.queue.takeOne();
                if (takeOne != null && Subscription.isReady(takeOne)) {
                    startSubscriptionEventsMatcher(takeOne);
                    takeOne.setLastNotificationDate(new Date());
                    this.subscriptionRepo.save(takeOne);
                }
            } catch (Throwable th) {
                log.error("Error iterating matching queue", th);
            }
        }
    }

    private void startSubscriptionEventsMatcher(Subscription subscription) {
        log.info("Start matching subscription: " + subscription);
        try {
            BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
            Stream filter = subscription.getConditionsAsList().stream().map((v0) -> {
                return v0.asQueryBuilder();
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            });
            boolQuery.getClass();
            filter.forEach(boolQuery::must);
            NativeSearchQuery build = new NativeSearchQueryBuilder().withQuery(QueryBuilders.boolQuery().must(QueryBuilders.matchQuery("topic", subscription.getTopic())).must(QueryBuilders.rangeQuery("creationDate").from(Long.valueOf(subscription.getLastNotificationDate() != null ? subscription.getLastNotificationDate().getTime() : 0L))).must(QueryBuilders.nestedQuery("map", boolQuery, ScoreMode.None))).withSearchType(SearchType.DEFAULT).withPageable(PageRequest.of(0, 10)).build();
            ArrayList arrayList = new ArrayList();
            SearchHitsIterator searchForStream = this.esOperations.searchForStream(build, Event.class, IndexCoordinates.of(new String[]{this.elasticSearchProperties.getEventsIndexName()}));
            while (searchForStream.hasNext()) {
                Event event = (Event) ((SearchHit) searchForStream.next()).getContent();
                Notification notification = new Notification(subscription, event);
                if (isNotAlreadyNotified(notification)) {
                    this.notificationRepository.save(notification);
                    arrayList.add(event);
                }
            }
            log.info("End matching subscription: " + subscription.getSubscriptionId());
            this.dispatcherManager.dispatch(subscription, (Event[]) arrayList.toArray(new Event[arrayList.size()]));
        } catch (Throwable th) {
            log.error("Error matching subscription: " + subscription.getSubscriptionId(), th);
        }
    }

    private boolean isNotAlreadyNotified(Notification notification) {
        return !this.notificationRepository.findById(notification.getNotificationId()).isPresent();
    }
}
