package com.urbanairship.octobot;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.surftools.BeanstalkClient.BeanstalkException;
import com.surftools.BeanstalkClient.Job;
import com.surftools.BeanstalkClientImpl.ClientImpl;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import org.apache.hadoop.util.ProcfsBasedProcessTree;
import org.apache.log4j.Logger;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

/* loaded from: input_file:WEB-INF/lib/octobot.jar:com/urbanairship/octobot/QueueConsumer.class */
public class QueueConsumer implements Runnable {
    Queue queue;
    Channel channel = null;
    Connection connection = null;
    QueueingConsumer consumer = null;
    private final Logger logger = Logger.getLogger("Queue Consumer");
    private boolean enableEmailErrors = Settings.getAsBoolean("Octobot", "email_enabled");

    public QueueConsumer(Queue queue) {
        this.queue = null;
        this.queue = queue;
    }

    @Override // java.lang.Runnable
    public void run() {
        if (this.queue.queueType.equals("amqp")) {
            this.channel = getAMQPChannel(this.queue);
            consumeFromAMQP();
        } else if (this.queue.queueType.equals("beanstalk")) {
            consumeFromBeanstalk();
        } else if (this.queue.queueType.equals("redis")) {
            consumeFromRedis();
        } else {
            this.logger.error("Invalid queue type specified: " + this.queue.queueType);
        }
    }

    private void consumeFromAMQP() {
        while (true) {
            try {
                QueueingConsumer.Delivery nextDelivery = this.consumer.nextDelivery();
                if (nextDelivery != null && nextDelivery.getBody() != null) {
                    invokeTask(new String(nextDelivery.getBody()));
                    try {
                        this.channel.basicAck(nextDelivery.getEnvelope().getDeliveryTag(), false);
                    } catch (IOException e) {
                        this.logger.error("Error ack'ing message.", e);
                    }
                }
            } catch (Exception e2) {
                this.logger.error("Error in AMQP connection; reconnecting.", e2);
                this.channel = getAMQPChannel(this.queue);
            }
        }
    }

    private void consumeFromBeanstalk() {
        ClientImpl clientImpl = new ClientImpl(this.queue.host, this.queue.port.intValue());
        clientImpl.watch(this.queue.queueName);
        clientImpl.useTube(this.queue.queueName);
        this.logger.info("Connected to Beanstalk; waiting for jobs.");
        while (true) {
            try {
                Job reserve = clientImpl.reserve(1);
                if (reserve != null) {
                    try {
                        invokeTask(new String(reserve.getData()));
                    } catch (Exception e) {
                        this.logger.error("Error handling message.", e);
                    }
                    try {
                        clientImpl.delete(reserve.getJobId());
                    } catch (BeanstalkException e2) {
                        this.logger.error("Error sending message receipt.", e2);
                        clientImpl = Beanstalk.getBeanstalkChannel(this.queue.host, this.queue.port, this.queue.queueName);
                    }
                }
            } catch (BeanstalkException e3) {
                this.logger.error("Beanstalk connection error.", e3);
                clientImpl = Beanstalk.getBeanstalkChannel(this.queue.host, this.queue.port, this.queue.queueName);
            }
        }
    }

    private void consumeFromRedis() {
        this.logger.info("Connecting to Redis...");
        Jedis jedis = new Jedis(this.queue.host, this.queue.port.intValue());
        try {
            jedis.connect();
        } catch (IOException e) {
            this.logger.error("Unable to connect to Redis.", e);
        }
        this.logger.info("Connected to Redis.");
        jedis.subscribe(new JedisPubSub() { // from class: com.urbanairship.octobot.QueueConsumer.1
            @Override // redis.clients.jedis.JedisPubSub
            public void onMessage(String str, String str2) {
                QueueConsumer.this.invokeTask(str2);
            }

            @Override // redis.clients.jedis.JedisPubSub
            public void onPMessage(String str, String str2, String str3) {
                QueueConsumer.this.logger.info("onPMessage Triggered - Not implemented.");
            }

            @Override // redis.clients.jedis.JedisPubSub
            public void onSubscribe(String str, int i) {
                QueueConsumer.this.logger.info("onSubscribe called - Not implemented.");
            }

            @Override // redis.clients.jedis.JedisPubSub
            public void onUnsubscribe(String str, int i) {
                QueueConsumer.this.logger.info("onUnsubscribe Called - Not implemented.");
            }

            @Override // redis.clients.jedis.JedisPubSub
            public void onPUnsubscribe(String str, int i) {
                QueueConsumer.this.logger.info("onPUnsubscribe called - Not implemented.");
            }

            @Override // redis.clients.jedis.JedisPubSub
            public void onPSubscribe(String str, int i) {
                QueueConsumer.this.logger.info("onPSubscribe Triggered - Not implemented.");
            }
        }, this.queue.queueName);
    }

    public boolean invokeTask(String str) {
        String str2 = "";
        int i = 0;
        long j = 0;
        long nanoTime = System.nanoTime();
        Throwable th = null;
        boolean z = false;
        while (i < j + 1) {
            if (i > 0) {
                this.logger.info("Retrying task. Attempt " + i + " of " + j);
            }
            try {
                JSONObject jSONObject = (JSONObject) JSONValue.parse(str);
                str2 = (String) jSONObject.get("task");
                if (jSONObject.containsKey("retries")) {
                    j = ((Long) jSONObject.get("retries")).longValue();
                }
                try {
                    TaskExecutor.execute(str2, jSONObject);
                    z = true;
                } catch (ClassNotFoundException e) {
                    th = e;
                    this.logger.error("Error: Task requested not found: " + str2);
                } catch (NoClassDefFoundError e2) {
                    th = e2;
                    this.logger.error("Error: Task requested not found: " + str2, e2);
                } catch (NoSuchMethodException e3) {
                    th = e3;
                    this.logger.error("Error: Task requested does not have a static run method.");
                } catch (Throwable th2) {
                    th = th2;
                    this.logger.error("An error occurred while running the task.", th2);
                }
                if (z) {
                    break;
                }
                i++;
            } catch (Exception e4) {
                this.logger.error("Error: Invalid message received: " + str, e4);
                return z;
            }
        }
        if (this.enableEmailErrors && !z) {
            try {
                MailQueue.put("Error running task: " + str2 + ".\n\nAttempted executing " + i + " times as specified.\n\nThe original input was: \n\n" + str + "\n\nHere's the error that resulted while running the task:\n\n" + stackToString(th));
            } catch (InterruptedException e5) {
            }
        }
        Metrics.update(str2, System.nanoTime() - nanoTime, z, i);
        return z;
    }

    private Channel getAMQPChannel(Queue queue) {
        int i = 0;
        this.logger.info("Opening connection to AMQP " + queue.vhost + " " + queue.queueName + "...");
        while (true) {
            i++;
            this.logger.debug("Attempt #" + i);
            try {
                this.connection = new RabbitMQ(queue).getConnection();
                this.channel = this.connection.createChannel();
                this.consumer = new QueueingConsumer(this.channel);
                this.channel.exchangeDeclare(queue.queueName, "direct", true);
                this.channel.queueDeclare(queue.queueName, true, false, false, null);
                this.channel.queueBind(queue.queueName, queue.queueName, queue.queueName);
                this.channel.basicConsume(queue.queueName, false, this.consumer);
                this.logger.info("Connected to RabbitMQ");
                return this.channel;
            } catch (Exception e) {
                this.logger.error("Cannot connect to AMQP. Retrying in 5 sec.", e);
                try {
                    Thread.sleep(ProcfsBasedProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
                } catch (InterruptedException e2) {
                }
            }
        }
    }

    public String stackToString(Throwable th) {
        if (th == null) {
            return "(Null)";
        }
        StringWriter stringWriter = new StringWriter();
        th.printStackTrace(new PrintWriter(stringWriter));
        return stringWriter.toString();
    }
}
