package eu.dnetlib.message;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:eu/dnetlib/message/MessageManager.class */
public class MessageManager {
    private static final Logger log = LoggerFactory.getLogger(MessageManager.class);
    private final String messageHost;
    private final String username;
    private final String password;
    private Connection connection;
    private Map<String, Channel> channels = new HashMap();
    private boolean durable;
    private boolean autodelete;
    private final LinkedBlockingQueue<Message> queueMessages;

    public MessageManager(String str, String str2, String str3, LinkedBlockingQueue<Message> linkedBlockingQueue) {
        this.queueMessages = linkedBlockingQueue;
        this.messageHost = str;
        this.username = str2;
        this.password = str3;
    }

    public MessageManager(String str, String str2, String str3, boolean z, boolean z2, LinkedBlockingQueue<Message> linkedBlockingQueue) {
        this.queueMessages = linkedBlockingQueue;
        this.messageHost = str;
        this.username = str2;
        this.password = str3;
        this.durable = z;
        this.autodelete = z2;
    }

    private Connection createConnection() throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(this.messageHost);
        connectionFactory.setUsername(this.username);
        connectionFactory.setPassword(this.password);
        return connectionFactory.newConnection();
    }

    private Channel createChannel(Connection connection, String str, boolean z, boolean z2) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("x-message-ttl", 10000);
        Channel createChannel = connection.createChannel();
        createChannel.queueDeclare(str, z, false, this.autodelete, hashMap);
        return createChannel;
    }

    private Channel getOrCreateChannel(String str, boolean z, boolean z2) throws Exception {
        log.debug("Get channel for queue: " + str);
        if (this.channels.containsKey(str)) {
            log.debug("already initialized channel" + str);
            return this.channels.get(str);
        }
        if (this.connection == null) {
            this.connection = createConnection();
        }
        this.channels.put(str, createChannel(this.connection, str, z, z2));
        this.channels.get(str).basicQos(1);
        return this.channels.get(str);
    }

    public void close() throws IOException {
        this.channels.values().forEach(channel -> {
            try {
                channel.close();
            } catch (Exception e) {
                log.info("Error on closing channel", e);
            }
        });
        this.connection.close();
    }

    public boolean sendMessage(Message message, String str) throws Exception {
        try {
            log.info("Sending messages: " + message);
            getOrCreateChannel(str, this.durable, this.autodelete).basicPublish("", str, (AMQP.BasicProperties) null, message.toString().getBytes());
            return true;
        } catch (Throwable th) {
            log.error("Error on sending messages: ", th);
            throw new RuntimeException(th);
        }
    }

    public boolean sendMessage(Message message, String str, boolean z, boolean z2) throws Exception {
        try {
            getOrCreateChannel(str, z, z2).basicPublish("", str, (AMQP.BasicProperties) null, message.toString().getBytes());
            return true;
        } catch (Throwable th) {
            log.error("Error on sending messages: ", th);
            throw new RuntimeException(th);
        }
    }

    public void startConsumingMessage(String str, boolean z, boolean z2) throws Exception {
        Channel createChannel = createChannel(createConnection(), str, z, z2);
        createChannel.basicConsume(str, true, new MessageConsumer(createChannel, this.queueMessages));
    }
}
