package eu.dnetlib.lbs.events.output;

import eu.dnetlib.lbs.elasticsearch.Event;
import eu.dnetlib.lbs.subscriptions.Subscription;
import eu.dnetlib.lbs.utils.LbsQueue;
import eu.dnetlib.lbs.utils.QueueManager;
import eu.dnetlib.lbs.utils.ThreadManager;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.PostConstruct;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:eu/dnetlib/lbs/events/output/AbstractNotificationDispatcher.class */
public abstract class AbstractNotificationDispatcher<T> implements NotificationDispatcher, BeanNameAware, Runnable {
    private String dispatcherName;

    @Autowired
    private QueueManager queueManager;

    @Autowired
    private ThreadManager threadManager;
    private LbsQueue<T, T> queue;
    private final AtomicLong count = new AtomicLong(0);
    private static final Log log = LogFactory.getLog(AbstractNotificationDispatcher.class);

    @PostConstruct
    public void init() {
        this.queue = this.queueManager.newQueue(this.dispatcherName + "-queue");
        this.threadManager.newThread(this.dispatcherName, this);
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            for (T t : this.queue.takeList()) {
                if (t != null) {
                    try {
                        performAction(t);
                        this.count.incrementAndGet();
                    } catch (Throwable th) {
                        log.error("Error sending notification", th);
                        this.queue.offer(t);
                    }
                }
            }
        }
    }

    @Override // eu.dnetlib.lbs.events.output.NotificationDispatcher
    public void sendNotification(Subscription subscription, Event... eventArr) {
        try {
            this.queue.offer(prepareAction(subscription, eventArr));
        } catch (Exception e) {
            log.error("Error sending notification", e);
        }
    }

    protected abstract T prepareAction(Subscription subscription, Event... eventArr) throws Exception;

    protected abstract void performAction(T t) throws Exception;

    @Override // eu.dnetlib.lbs.events.output.NotificationDispatcher
    public String getDispatcherName() {
        return this.dispatcherName;
    }

    public void setDispatcherName(String str) {
        this.dispatcherName = str;
    }

    @Override // eu.dnetlib.lbs.events.output.NotificationDispatcher
    public long count() {
        return this.count.get();
    }

    @Override // eu.dnetlib.lbs.events.output.NotificationDispatcher
    public void resetCount() {
        this.count.set(0L);
    }

    public void setBeanName(String str) {
        if (StringUtils.isBlank(getDispatcherName())) {
            setDispatcherName(str);
        }
    }
}
