/*
 * Decompiled with CFR 0.152.
 */
package org.gcube.data.harmonization.occurrence.queue;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.gcube.common.core.utils.logging.GCUBELog;
import org.gcube.data.harmonization.occurrence.services.ServiceContext;

public abstract class QueueReceiver
extends Thread {
    private static GCUBELog logger = new GCUBELog(QueueReceiver.class);
    private static final boolean AUTO_ACK = false;
    private static final int PREFETCH_COUNT = 1;
    protected AtomicBoolean continueListening = new AtomicBoolean(true);
    protected AtomicInteger readCount = new AtomicInteger(0);
    protected AtomicInteger errorCount = new AtomicInteger(0);
    private Channel channel;
    private String routingKey;
    protected String currentMessage;

    public void bind(Channel channel, String routingKey) throws IOException {
        this.channel = channel;
        this.routingKey = routingKey;
        this.channel.exchangeDeclare(this.getExchangeName(), "topic");
        this.channel.queueDeclare(this.getExchangeName(), true, false, false, null);
        this.channel.queueBind(this.getExchangeName(), this.getExchangeName(), this.routingKey);
        this.channel.basicQos(1);
    }

    public String getExchangeName() {
        return ServiceContext.getContext().getName();
    }

    @Override
    public void run() {
        logger.debug((Object)("Started listening to " + this.routingKey));
        QueueingConsumer consumer = new QueueingConsumer(this.channel);
        try {
            this.channel.basicConsume(this.getExchangeName(), false, (Consumer)consumer);
            while (this.continueListening.get()) {
                try {
                    try {
                        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                        this.currentMessage = new String(delivery.getBody());
                        this.handleMessage();
                        this.channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    }
                    catch (Exception e) {
                        this.rollback();
                        this.errorCount.incrementAndGet();
                        this.cleanup();
                        this.readCount.incrementAndGet();
                        continue;
                    }
                }
                catch (Throwable throwable) {
                    this.cleanup();
                    this.readCount.incrementAndGet();
                    throw throwable;
                }
                this.cleanup();
                this.readCount.incrementAndGet();
            }
        }
        catch (Exception e) {
            logger.fatal((Object)"Unable to consume messages from queue ", (Throwable)e);
        }
    }

    public void stopListening() {
        this.continueListening.set(false);
    }

    public int getComputedCount() {
        return this.readCount.intValue();
    }

    public abstract void handleMessage() throws Exception;

    public abstract void rollback() throws Exception;

    public abstract void cleanup();
}

