package org.gcube.data.harmonization.occurrence.queue;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.gcube.common.core.utils.logging.GCUBELog;
import org.gcube.data.harmonization.occurrence.services.ServiceContext;

/* loaded from: input_file:org/gcube/data/harmonization/occurrence/queue/QueueReceiver.class */
public abstract class QueueReceiver extends Thread {
    private static GCUBELog logger = new GCUBELog(QueueReceiver.class);
    private static final boolean AUTO_ACK = false;
    private static final int PREFETCH_COUNT = 1;
    protected AtomicBoolean continueListening = new AtomicBoolean(true);
    protected AtomicInteger readCount = new AtomicInteger(AUTO_ACK);
    protected AtomicInteger errorCount = new AtomicInteger(AUTO_ACK);
    private Channel channel;
    private String routingKey;
    protected String currentMessage;

    public void bind(Channel channel, String str) throws IOException {
        this.channel = channel;
        this.routingKey = str;
        this.channel.exchangeDeclare(getExchangeName(), "topic");
        this.channel.queueDeclare(getExchangeName(), true, false, false, (Map) null);
        this.channel.queueBind(getExchangeName(), getExchangeName(), this.routingKey);
        this.channel.basicQos(PREFETCH_COUNT);
    }

    public String getExchangeName() {
        return ServiceContext.getContext().getName();
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        logger.debug("Started listening to " + this.routingKey);
        QueueingConsumer queueingConsumer = new QueueingConsumer(this.channel);
        try {
            this.channel.basicConsume(getExchangeName(), false, queueingConsumer);
            while (this.continueListening.get()) {
                try {
                    try {
                        QueueingConsumer.Delivery nextDelivery = queueingConsumer.nextDelivery();
                        this.currentMessage = new String(nextDelivery.getBody());
                        handleMessage();
                        this.channel.basicAck(nextDelivery.getEnvelope().getDeliveryTag(), false);
                        cleanup();
                        this.readCount.incrementAndGet();
                    } catch (Throwable th) {
                        cleanup();
                        this.readCount.incrementAndGet();
                        throw th;
                    }
                } catch (Exception e) {
                    rollback();
                    this.errorCount.incrementAndGet();
                    cleanup();
                    this.readCount.incrementAndGet();
                }
            }
        } catch (Exception e2) {
            logger.fatal("Unable to consume messages from queue ", e2);
        }
    }

    public void stopListening() {
        this.continueListening.set(false);
    }

    public int getComputedCount() {
        return this.readCount.intValue();
    }

    public abstract void handleMessage() throws Exception;

    public abstract void rollback() throws Exception;

    public abstract void cleanup();
}
