/*
 * Decompiled with CFR 0.152.
 */
package org.gcube.common.queueManager.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.gcube.common.core.utils.logging.GCUBELog;
import org.gcube.common.queueManager.QueueItemHandler;
import org.gcube.common.queueManager.QueueType;
import org.gcube.common.queueManager.model.QueueItem;
import org.gcube.common.queueManager.utils.Common;
import org.gcube.common.queueManager.utils.QueueXStream;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class MultiSyncConsumer {
    public static final long DEFAULT_WAIT_FOR_MESSAGE = 3000L;
    private static final GCUBELog logger = new GCUBELog(MultiSyncConsumer.class);
    private Connection connection;
    private String serviceClass;
    private String serviceName;
    private ConcurrentHashMap<String, QueueItemHandler<? extends QueueItem>> topicCallbacks = new ConcurrentHashMap();
    private AtomicInteger roundRobinIndex = new AtomicInteger(0);
    private long waitForMessage = 3000L;
    private QueueType type;

    MultiSyncConsumer(Connection connection, String serviceClass, String serviceName, QueueType type) throws JMSException {
        this.connection = connection;
        this.serviceClass = serviceClass;
        this.serviceName = serviceName;
        this.type = type;
        this.connection.setClientID(this.toString());
        this.connection.start();
    }

    public Set<String> getTopics() {
        return this.topicCallbacks.keySet();
    }

    public Collection<QueueItemHandler<? extends QueueItem>> getCallbacks() {
        return this.topicCallbacks.values();
    }

    public void attachTopic(String topic, QueueItemHandler<? extends QueueItem> callback) {
        this.topicCallbacks.put(topic, callback);
    }

    public int consumeMsg(QueueSelectionPolicy policy) throws Exception {
        String topic = this.selectTopic(policy);
        return this.consumeMsg(topic, this.topicCallbacks.get(topic));
    }

    public int consumeMsg(String topic) throws Exception {
        if (!this.topicCallbacks.contains(topic)) {
            throw new Exception("Topic " + topic + " not defined");
        }
        return this.consumeMsg(topic, this.topicCallbacks.get(topic));
    }

    public void removeTopic(String topic) {
        this.topicCallbacks.remove(topic);
    }

    private String selectTopic(QueueSelectionPolicy policy) throws Exception {
        logger.trace((Object)("Selecting topic, policy :" + (Object)((Object)policy)));
        if (this.topicCallbacks.size() == 0) {
            throw new Exception("No topic defined");
        }
        switch (policy) {
            case ROUND_ROBIN: {
                ArrayList topics = new ArrayList(this.topicCallbacks.keySet());
                return (String)topics.get(this.roundRobinIndex.getAndIncrement() % topics.size());
            }
            case RANDOM: {
                ArrayList topics = new ArrayList(this.topicCallbacks.keySet());
                return (String)topics.get((int)Math.round((double)topics.size() * Math.random()));
            }
        }
        throw new Exception("Policy not yet implemented");
    }

    private <T extends QueueItem> int consumeMsg(String topic, QueueItemHandler<T> handler) throws Exception {
        Session session = null;
        int executed = 0;
        String toUseTopic = Common.formTopic(this.serviceClass, this.serviceName, this.type, topic);
        try {
            session = this.connection.createSession(false, 2);
            MessageConsumer consumer = session.createConsumer((Destination)session.createQueue(toUseTopic));
            logger.trace((Object)("Requesting message to queue " + toUseTopic + ", max wait time : " + this.waitForMessage));
            System.out.println("Requesting message to queue " + toUseTopic + ", max wait time : " + this.waitForMessage);
            Message msg = consumer.receive(this.waitForMessage);
            if (msg != null) {
                logger.trace((Object)("Received msg :" + msg));
                QueueItem item = null;
                try {
                    item = (QueueItem)QueueXStream.get().fromXML(((TextMessage)msg).getText());
                    handler.handleQueueItem(item);
                    msg.acknowledge();
                    ++executed;
                }
                catch (JMSException e) {
                    logger.error((Object)("Unable to get item from message " + msg), (Throwable)e);
                    throw e;
                }
                catch (ClassCastException e) {
                    logger.error((Object)("Unexpected type of item " + item), (Throwable)e);
                    throw e;
                }
                catch (Exception e) {
                    logger.error((Object)("Unable to handle item from message " + item), (Throwable)e);
                    throw e;
                }
            }
            int n = executed;
            return n;
        }
        catch (Exception e) {
            logger.error((Object)"Unexpected Exception ", (Throwable)e);
            throw e;
        }
        finally {
            if (session != null) {
                session.close();
            }
        }
    }

    public void setWaitForMessage(long waitForMessage) {
        this.waitForMessage = waitForMessage;
    }

    public long getWaitForMessage() {
        return this.waitForMessage;
    }

    public void close() throws JMSException {
        for (QueueItemHandler<? extends QueueItem> handler : this.topicCallbacks.values()) {
            handler.close();
        }
        this.connection.stop();
        this.connection.close();
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    public static enum QueueSelectionPolicy {
        RANDOM,
        ROUND_ROBIN,
        MOST_UNLOAD,
        MOST_LOAD;

    }
}

