package org.gcube.data.access.queueManager.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.gcube.common.core.utils.logging.GCUBELog;
import org.gcube.data.access.queueManager.FactoryConfiguration;
import org.gcube.data.access.queueManager.QueueItemHandler;
import org.gcube.data.access.queueManager.QueueType;
import org.gcube.data.access.queueManager.model.QueueItem;
import org.gcube.data.access.queueManager.utils.Common;
import org.gcube.data.access.queueManager.utils.QueueXStream;

/* loaded from: input_file:org/gcube/data/access/queueManager/impl/MultiSyncConsumer.class */
public class MultiSyncConsumer {
    public static final long DEFAULT_WAIT_FOR_MESSAGE = 3000;
    private static final GCUBELog logger = new GCUBELog(MultiSyncConsumer.class);
    private Connection connection;
    private String serviceClass;
    private String serviceName;
    private ConcurrentHashMap<String, QueueItemHandler<? extends QueueItem>> topicCallbacks = new ConcurrentHashMap<>();
    private AtomicInteger roundRobinIndex = new AtomicInteger(0);
    private long waitForMessage = DEFAULT_WAIT_FOR_MESSAGE;
    private QueueType type;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.gcube.data.access.queueManager.impl.MultiSyncConsumer$1, reason: invalid class name */
    /* loaded from: input_file:org/gcube/data/access/queueManager/impl/MultiSyncConsumer$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$gcube$data$access$queueManager$impl$MultiSyncConsumer$QueueSelectionPolicy = new int[QueueSelectionPolicy.values().length];

        static {
            try {
                $SwitchMap$org$gcube$data$access$queueManager$impl$MultiSyncConsumer$QueueSelectionPolicy[QueueSelectionPolicy.ROUND_ROBIN.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$gcube$data$access$queueManager$impl$MultiSyncConsumer$QueueSelectionPolicy[QueueSelectionPolicy.RANDOM.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* loaded from: input_file:org/gcube/data/access/queueManager/impl/MultiSyncConsumer$QueueSelectionPolicy.class */
    public enum QueueSelectionPolicy {
        RANDOM,
        ROUND_ROBIN,
        MOST_UNLOAD,
        MOST_LOAD
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MultiSyncConsumer(Connection connection, String str, String str2, QueueType queueType) throws JMSException {
        this.connection = connection;
        this.serviceClass = str;
        this.serviceName = str2;
        this.type = queueType;
        this.connection.setClientID(toString());
        this.connection.start();
    }

    public Set<String> getTopics() {
        return this.topicCallbacks.keySet();
    }

    public Collection<QueueItemHandler<? extends QueueItem>> getCallbacks() {
        return this.topicCallbacks.values();
    }

    public void attachTopic(String str, QueueItemHandler<? extends QueueItem> queueItemHandler) {
        this.topicCallbacks.put(str, queueItemHandler);
    }

    public int consumeMsg(QueueSelectionPolicy queueSelectionPolicy) throws Exception {
        String selectTopic = selectTopic(queueSelectionPolicy);
        return consumeMsg(selectTopic, this.topicCallbacks.get(selectTopic));
    }

    public int consumeMsg(String str) throws Exception {
        if (this.topicCallbacks.contains(str)) {
            return consumeMsg(str, this.topicCallbacks.get(str));
        }
        throw new Exception("Topic " + str + " not defined");
    }

    public void removeTopic(String str) {
        this.topicCallbacks.remove(str);
    }

    private String selectTopic(QueueSelectionPolicy queueSelectionPolicy) throws Exception {
        logger.trace("Selecting topic, policy :" + queueSelectionPolicy);
        if (this.topicCallbacks.size() == 0) {
            throw new Exception("No topic defined");
        }
        switch (AnonymousClass1.$SwitchMap$org$gcube$data$access$queueManager$impl$MultiSyncConsumer$QueueSelectionPolicy[queueSelectionPolicy.ordinal()]) {
            case FactoryConfiguration.DEFAULT_USE_EXPONENTIAL_BACKOFF /* 1 */:
                ArrayList arrayList = new ArrayList(this.topicCallbacks.keySet());
                return (String) arrayList.get(this.roundRobinIndex.getAndIncrement() % arrayList.size());
            case 2:
                return (String) new ArrayList(this.topicCallbacks.keySet()).get((int) Math.round(r0.size() * Math.random()));
            default:
                throw new Exception("Policy not yet implemented");
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v34, types: [org.gcube.data.access.queueManager.model.QueueItem] */
    private <T extends QueueItem> int consumeMsg(String str, QueueItemHandler<T> queueItemHandler) throws Exception {
        Session session = null;
        int i = 0;
        String formTopic = Common.formTopic(this.serviceClass, this.serviceName, this.type, str);
        try {
            try {
                Session createSession = this.connection.createSession(false, 2);
                MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(formTopic));
                logger.trace("Requesting message to queue " + formTopic + ", max wait time : " + this.waitForMessage);
                System.out.println("Requesting message to queue " + formTopic + ", max wait time : " + this.waitForMessage);
                TextMessage receive = createConsumer.receive(this.waitForMessage);
                if (receive != null) {
                    logger.trace("Received msg :" + receive);
                    T t = null;
                    try {
                        t = (QueueItem) QueueXStream.get().fromXML(receive.getText());
                        queueItemHandler.handleQueueItem(t);
                        receive.acknowledge();
                        i = 0 + 1;
                    } catch (JMSException e) {
                        logger.error("Unable to get item from message " + receive, e);
                        throw e;
                    } catch (ClassCastException e2) {
                        logger.error("Unexpected type of item " + t, e2);
                        throw e2;
                    } catch (Exception e3) {
                        logger.error("Unable to handle item from message " + t, e3);
                        throw e3;
                    }
                }
                int i2 = i;
                if (createSession != null) {
                    createSession.close();
                }
                return i2;
            } catch (Exception e4) {
                logger.error("Unexpected Exception ", e4);
                throw e4;
            }
        } catch (Throwable th) {
            if (0 != 0) {
                session.close();
            }
            throw th;
        }
    }

    public void setWaitForMessage(long j) {
        this.waitForMessage = j;
    }

    public long getWaitForMessage() {
        return this.waitForMessage;
    }

    public void close() throws JMSException {
        Iterator<QueueItemHandler<? extends QueueItem>> it = this.topicCallbacks.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.connection.stop();
        this.connection.close();
    }
}
