package com.netflix.astyanax.recipes.queue;

import com.google.common.collect.Lists;
import com.netflix.astyanax.ColumnListMutation;
import com.netflix.astyanax.Execution;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.ColumnList;
import com.netflix.astyanax.model.Equality;
import com.netflix.astyanax.model.RangeEndpoint;
import com.netflix.astyanax.recipes.locks.BusyLockException;
import com.netflix.astyanax.recipes.queue.triggers.Trigger;
import com.netflix.astyanax.util.RangeBuilder;
import com.netflix.astyanax.util.TimeUUIDUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netflix/astyanax/recipes/queue/MessageConsumerImpl.class */
class MessageConsumerImpl implements MessageConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(MessageConsumerImpl.class);
    private final ShardedDistributedMessageQueue queue;

    public MessageConsumerImpl(ShardedDistributedMessageQueue shardedDistributedMessageQueue) {
        this.queue = shardedDistributedMessageQueue;
    }

    @Override // com.netflix.astyanax.recipes.queue.MessageConsumer
    public List<MessageContext> readMessages(int i) throws MessageQueueException, BusyLockException, InterruptedException {
        return readMessages(i, 0L, null);
    }

    @Override // com.netflix.astyanax.recipes.queue.MessageConsumer
    public List<MessageContext> readMessages(int i, long j, TimeUnit timeUnit) throws MessageQueueException, BusyLockException, InterruptedException {
        long currentTimeMillis = j == 0 ? 0L : System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(j, timeUnit);
        List<MessageContext> list = null;
        while (true) {
            MessageQueueShard nextShard = this.queue.shardReaderPolicy.nextShard();
            if (nextShard != null) {
                try {
                    list = readAndReturnShard(nextShard, i);
                    if (list != null && !list.isEmpty()) {
                        this.queue.shardReaderPolicy.releaseShard(nextShard, list == null ? 0 : list.size());
                        return list;
                    }
                    this.queue.shardReaderPolicy.releaseShard(nextShard, list == null ? 0 : list.size());
                } catch (Throwable th) {
                    this.queue.shardReaderPolicy.releaseShard(nextShard, list == null ? 0 : list.size());
                    throw th;
                }
            }
            if (currentTimeMillis != 0 && System.currentTimeMillis() > currentTimeMillis) {
                return Lists.newLinkedList();
            }
            Thread.sleep(this.queue.shardReaderPolicy.getPollInterval());
        }
    }

    @Override // com.netflix.astyanax.recipes.queue.MessageConsumer
    public List<Message> peekMessages(int i) throws MessageQueueException {
        return this.queue.peekMessages(i);
    }

    private List<MessageContext> readAndReturnShard(MessageQueueShard messageQueueShard, int i) throws MessageQueueException, BusyLockException, InterruptedException {
        List<MessageContext> list = null;
        try {
            list = readMessagesFromShard(messageQueueShard.getName(), i);
            if (list == null || list.isEmpty()) {
                this.queue.stats.incEmptyPartitionCount();
            }
            return list;
        } catch (Throwable th) {
            if (list == null || list.isEmpty()) {
                this.queue.stats.incEmptyPartitionCount();
            }
            throw th;
        }
    }

    @Override // com.netflix.astyanax.recipes.queue.MessageConsumer
    public List<MessageContext> readMessagesFromShard(String str, int i) throws MessageQueueException, BusyLockException {
        return this.queue.lockManager != null ? readMessagesFromShardUsingLockManager(str, i) : readMessagesFromShardUsingDefaultLock(str, i);
    }

    List<MessageContext> readMessagesFromShardUsingLockManager(String str, int i) throws MessageQueueException, BusyLockException {
        ShardLock shardLock = null;
        try {
            try {
                shardLock = this.queue.lockManager.acquireLock(str);
                MutationBatch consistencyLevel = this.queue.keyspace.prepareMutationBatch().setConsistencyLevel(this.queue.consistencyLevel);
                List<MessageContext> readMessagesInternal = readMessagesInternal(str, i, 0, null, consistencyLevel.withRow(this.queue.queueColumnFamily, str), consistencyLevel, TimeUUIDUtils.getMicrosTimeFromUUID(TimeUUIDUtils.getUniqueTimeUUIDinMicros()));
                this.queue.lockManager.releaseLock(shardLock);
                return readMessagesInternal;
            } catch (BusyLockException e) {
                this.queue.stats.incLockContentionCount();
                throw e;
            } catch (Exception e2) {
                LOG.error("Error reading shard " + str, e2);
                throw new MessageQueueException("Error", e2);
            }
        } catch (Throwable th) {
            this.queue.lockManager.releaseLock(shardLock);
            throw th;
        }
    }

    List<MessageContext> readMessagesFromShardUsingDefaultLock(String str, int i) throws MessageQueueException, BusyLockException {
        Execution execution = null;
        try {
            try {
                try {
                    MessageQueueEntry newLockEntry = MessageQueueEntry.newLockEntry(MessageQueueEntryState.None);
                    long timeFromUUID = TimeUUIDUtils.getTimeFromUUID(newLockEntry.getTimestamp());
                    MutationBatch consistencyLevel = this.queue.keyspace.prepareMutationBatch().setConsistencyLevel(this.queue.consistencyLevel);
                    consistencyLevel.withRow(this.queue.queueColumnFamily, str).putColumn((ColumnListMutation) newLockEntry, timeFromUUID + this.queue.lockTimeout, Integer.valueOf(this.queue.lockTtl));
                    consistencyLevel.execute();
                    ColumnList<Column> columnList = (ColumnList) this.queue.keyspace.prepareQuery(this.queue.queueColumnFamily).setConsistencyLevel(this.queue.consistencyLevel).getKey(str).withColumnRange(ShardedDistributedMessageQueue.entrySerializer.buildRange().greaterThanEquals(Byte.valueOf((byte) MessageQueueEntryType.Lock.ordinal())).lessThanEquals(Byte.valueOf((byte) MessageQueueEntryType.Lock.ordinal())).build()).execute().getResult();
                    MutationBatch consistencyLevel2 = this.queue.keyspace.prepareMutationBatch().setConsistencyLevel(this.queue.consistencyLevel);
                    ColumnListMutation withRow = consistencyLevel2.withRow(this.queue.queueColumnFamily, str);
                    withRow.deleteColumn(newLockEntry);
                    int i2 = 0;
                    boolean z = false;
                    int size = columnList.size();
                    for (Column column : columnList) {
                        MessageQueueEntry messageQueueEntry = (MessageQueueEntry) column.getName();
                        if (messageQueueEntry.getType() == MessageQueueEntryType.Lock) {
                            size++;
                            if (column.getLongValue() < timeFromUUID) {
                                this.queue.stats.incExpiredLockCount();
                                withRow.deleteColumn(messageQueueEntry);
                            } else {
                                if (messageQueueEntry.getState() == MessageQueueEntryState.Acquired) {
                                    throw new BusyLockException("Not first lock");
                                }
                                i2++;
                                if (i2 == 1 && messageQueueEntry.getTimestamp().equals(newLockEntry.getTimestamp())) {
                                    z = true;
                                }
                            }
                            if (!z) {
                                throw new BusyLockException("Not first lock");
                            }
                            newLockEntry = MessageQueueEntry.newLockEntry(newLockEntry.getTimestamp(), MessageQueueEntryState.Acquired);
                            withRow.putColumn((ColumnListMutation) newLockEntry, timeFromUUID + this.queue.lockTimeout, Integer.valueOf(this.queue.lockTtl));
                        }
                    }
                    try {
                        consistencyLevel2.execute();
                        long microsTimeFromUUID = TimeUUIDUtils.getMicrosTimeFromUUID(newLockEntry.getTimestamp());
                        MutationBatch consistencyLevel3 = this.queue.keyspace.prepareMutationBatch().setConsistencyLevel(this.queue.consistencyLevel);
                        ColumnListMutation<MessageQueueEntry> withRow2 = consistencyLevel3.withRow(this.queue.queueColumnFamily, str);
                        withRow2.deleteColumn(newLockEntry);
                        return readMessagesInternal(str, i, size, newLockEntry, withRow2, consistencyLevel3, microsTimeFromUUID);
                    } catch (Exception e) {
                        throw new MessageQueueException("Error committing lock", e);
                    }
                } catch (ConnectionException e2) {
                    LOG.error("Error reading shard " + str, e2);
                    throw new MessageQueueException("Error", e2);
                }
            } catch (BusyLockException e3) {
                this.queue.stats.incLockContentionCount();
                throw e3;
            }
        } catch (Throwable th) {
            try {
                execution.execute();
                throw th;
            } catch (Exception e4) {
                throw new MessageQueueException("Error committing lock", e4);
            }
        }
    }

    @Override // com.netflix.astyanax.recipes.queue.MessageConsumer
    public void ackMessage(MessageContext messageContext) throws MessageQueueException {
        MutationBatch consistencyLevel = this.queue.keyspace.prepareMutationBatch().setConsistencyLevel(this.queue.consistencyLevel);
        fillAckMutation(messageContext, consistencyLevel);
        try {
            consistencyLevel.execute();
        } catch (ConnectionException e) {
            throw new MessageQueueException("Failed to ack message", e);
        }
    }

    @Override // com.netflix.astyanax.recipes.queue.MessageConsumer
    public void ackMessages(Collection<MessageContext> collection) throws MessageQueueException {
        MutationBatch consistencyLevel = this.queue.keyspace.prepareMutationBatch().setConsistencyLevel(this.queue.consistencyLevel);
        Iterator<MessageContext> it = collection.iterator();
        while (it.hasNext()) {
            fillAckMutation(it.next(), consistencyLevel);
        }
        try {
            consistencyLevel.execute();
        } catch (ConnectionException e) {
            throw new MessageQueueException("Failed to ack messages", e);
        }
    }

    private void fillAckMutation(MessageContext messageContext, MutationBatch mutationBatch) {
        this.queue.stats.incAckMessageCount();
        Message message = messageContext.getMessage();
        if (message.getToken() != null) {
            MessageQueueEntry newBusyEntry = MessageQueueEntry.newBusyEntry(message);
            mutationBatch.withRow(this.queue.queueColumnFamily, this.queue.getShardKey(message)).deleteColumn(newBusyEntry);
            if (message.hasKey()) {
                mutationBatch.withRow(this.queue.keyIndexColumnFamily, this.queue.getCompositeKey(this.queue.getName(), message.getKey())).putEmptyColumn(MessageMetadataEntry.newMessageId(this.queue.getCompositeKey(this.queue.getShardKey(message), newBusyEntry.getMessageId())), Integer.valueOf(this.queue.metadataDeleteTTL));
                if (message.isKeepHistory()) {
                    MessageHistory history = messageContext.getHistory();
                    if (history.getStatus() == MessageStatus.RUNNING) {
                        history.setStatus(MessageStatus.DONE);
                    }
                    history.setEndTime(TimeUnit.MICROSECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS));
                    try {
                        mutationBatch.withRow(this.queue.historyColumnFamily, message.getKey()).putColumn((ColumnListMutation) history.getToken(), this.queue.serializeToString(messageContext.getHistory()), this.queue.metadata.getHistoryTtl());
                    } catch (Exception e) {
                        LOG.warn("Error serializing message history for " + message.getKey(), e);
                    }
                }
            }
            Iterator<MessageQueueHooks> it = this.queue.hooks.iterator();
            while (it.hasNext()) {
                it.next().beforeAckMessage(message, mutationBatch);
            }
        }
        if (messageContext.getNextMessage() != null) {
            try {
                this.queue.fillMessageMutation(mutationBatch, messageContext.getNextMessage());
            } catch (MessageQueueException e2) {
                LOG.warn("Error filling nextMessage for " + message.getKey(), e2);
            }
        }
    }

    @Override // com.netflix.astyanax.recipes.queue.MessageConsumer
    public void ackPoisonMessage(MessageContext messageContext) throws MessageQueueException {
        MutationBatch consistencyLevel = this.queue.keyspace.prepareMutationBatch().setConsistencyLevel(this.queue.consistencyLevel);
        fillAckMutation(messageContext, consistencyLevel);
        try {
            consistencyLevel.execute();
        } catch (ConnectionException e) {
            this.queue.stats.incPersistError();
            throw new MessageQueueException("Failed to ack messages", e);
        }
    }

    private List<MessageContext> readMessagesInternal(String str, int i, int i2, MessageQueueEntry messageQueueEntry, ColumnListMutation<MessageQueueEntry> columnListMutation, MutationBatch mutationBatch, long j) throws BusyLockException, MessageQueueException {
        MessageMetadataEntry messageMetadataEntry;
        try {
            try {
                ArrayList newArrayList = Lists.newArrayList();
                RangeEndpoint append = ShardedDistributedMessageQueue.entrySerializer.makeEndpoint(Byte.valueOf((byte) MessageQueueEntryType.Message.ordinal()), Equality.EQUAL).append((byte) 0, Equality.EQUAL);
                if (messageQueueEntry != null) {
                    append.append(messageQueueEntry.getTimestamp(), Equality.LESS_THAN_EQUALS);
                } else {
                    append.append(TimeUUIDUtils.getMicrosTimeUUID(j), Equality.LESS_THAN_EQUALS);
                }
                try {
                    for (Column<MessageQueueEntry> column : (ColumnList) this.queue.keyspace.prepareQuery(this.queue.queueColumnFamily).setConsistencyLevel(this.queue.consistencyLevel).getKey(str).withColumnRange(new RangeBuilder().setLimit(i + (messageQueueEntry == null ? 0 : i2 + 1)).setEnd(append.toBytes()).build()).execute().getResult()) {
                        if (i == 0) {
                            mutationBatch.execute();
                            return newArrayList;
                        }
                        MessageQueueEntry name = column.getName();
                        switch (name.getType()) {
                            case Lock:
                                if (messageQueueEntry != null && name.getState() == MessageQueueEntryState.Acquired && !name.getTimestamp().equals(messageQueueEntry.getTimestamp())) {
                                    throw new BusyLockException("Someone else snuck in");
                                }
                                break;
                            case Message:
                                try {
                                    i--;
                                    String compositeKey = this.queue.getCompositeKey(str, name.getMessageId());
                                    columnListMutation.deleteColumn(name);
                                    Message extractMessageFromColumn = this.queue.extractMessageFromColumn(column);
                                    if (extractMessageFromColumn != null) {
                                        MessageContext messageContext = new MessageContext();
                                        messageContext.setMessage(extractMessageFromColumn);
                                        if (extractMessageFromColumn.hasTrigger()) {
                                            String compositeKey2 = this.queue.getCompositeKey(this.queue.getName(), extractMessageFromColumn.getKey());
                                            try {
                                                messageMetadataEntry = null;
                                                long j2 = 0;
                                                for (Column column2 : (ColumnList) this.queue.keyspace.prepareQuery(this.queue.keyIndexColumnFamily).getRow(compositeKey2).withColumnRange(ShardedDistributedMessageQueue.metadataSerializer.buildRange().greaterThanEquals(Byte.valueOf((byte) MessageMetadataEntryType.MessageId.ordinal())).lessThanEquals(Byte.valueOf((byte) MessageMetadataEntryType.MessageId.ordinal())).build()).execute().getResult()) {
                                                    MessageQueueEntry fromMetadata = MessageQueueEntry.fromMetadata((MessageMetadataEntry) column2.getName());
                                                    if (column2.getTtl() == 0) {
                                                        long timestamp = fromMetadata.getTimestamp(TimeUnit.MICROSECONDS);
                                                        if (messageMetadataEntry == null) {
                                                            messageMetadataEntry = (MessageMetadataEntry) column2.getName();
                                                            j2 = timestamp;
                                                        } else if (timestamp > j2) {
                                                            LOG.warn("Need to discard : " + name.getMessageId() + " => " + messageMetadataEntry.getName());
                                                            mutationBatch.withRow(this.queue.keyIndexColumnFamily, this.queue.getCompositeKey(this.queue.getName(), extractMessageFromColumn.getKey())).putEmptyColumn(messageMetadataEntry, Integer.valueOf(this.queue.metadataDeleteTTL));
                                                            j2 = timestamp;
                                                            messageMetadataEntry = (MessageMetadataEntry) column2.getName();
                                                        } else {
                                                            LOG.warn("Need to discard : " + name.getMessageId() + " => " + column2.getName());
                                                            mutationBatch.withRow(this.queue.keyIndexColumnFamily, this.queue.getCompositeKey(this.queue.getName(), extractMessageFromColumn.getKey())).putEmptyColumn(column2.getName(), Integer.valueOf(this.queue.metadataDeleteTTL));
                                                        }
                                                    }
                                                }
                                            } catch (NotFoundException e) {
                                            } catch (ConnectionException e2) {
                                                throw new MessageQueueException("Error fetching row " + compositeKey2, e2);
                                            }
                                            if (messageMetadataEntry != null && !messageMetadataEntry.getName().endsWith(name.getMessageId())) {
                                                throw new DuplicateMessageException("Duplicate trigger for " + compositeKey);
                                                break;
                                            } else {
                                                Trigger nextTrigger = extractMessageFromColumn.getTrigger().nextTrigger();
                                                if (nextTrigger != null) {
                                                    Message m826clone = extractMessageFromColumn.m826clone();
                                                    m826clone.setTrigger(nextTrigger);
                                                    messageContext.setNextMessage(m826clone);
                                                    if (extractMessageFromColumn.isAutoCommitTrigger()) {
                                                        this.queue.fillMessageMutation(mutationBatch, m826clone);
                                                    }
                                                }
                                            }
                                        }
                                        if (extractMessageFromColumn.hasKey()) {
                                            mutationBatch.withRow(this.queue.keyIndexColumnFamily, this.queue.getCompositeKey(this.queue.getName(), extractMessageFromColumn.getKey())).putEmptyColumn(MessageMetadataEntry.newMessageId(compositeKey), Integer.valueOf(this.queue.metadataDeleteTTL));
                                            LOG.debug("Removing from key  :  " + this.queue.getCompositeKey(this.queue.getName(), extractMessageFromColumn.getKey()) + " : " + compositeKey);
                                            if (extractMessageFromColumn.isKeepHistory()) {
                                                MessageHistory history = messageContext.getHistory();
                                                history.setToken(name.getTimestamp());
                                                history.setStartTime(j);
                                                history.setTriggerTime(extractMessageFromColumn.getTrigger().getTriggerTime());
                                                history.setStatus(MessageStatus.RUNNING);
                                                try {
                                                    mutationBatch.withRow(this.queue.historyColumnFamily, extractMessageFromColumn.getKey()).putColumn((ColumnListMutation) name.getTimestamp(), this.queue.serializeToString(history), this.queue.metadata.getHistoryTtl());
                                                } catch (Exception e3) {
                                                    LOG.warn("Error serializing history for key '" + extractMessageFromColumn.getKey() + "'", e3);
                                                }
                                            }
                                        }
                                        if (extractMessageFromColumn.getTimeout() > 0) {
                                            MessageQueueEntry newMessageEntry = MessageQueueEntry.newMessageEntry((byte) 0, TimeUUIDUtils.getMicrosTimeUUID(j + TimeUnit.MICROSECONDS.convert(extractMessageFromColumn.getTimeout(), TimeUnit.SECONDS) + (this.queue.counter.incrementAndGet() % 1000)), MessageQueueEntryState.Busy);
                                            extractMessageFromColumn.setToken(newMessageEntry.getTimestamp());
                                            extractMessageFromColumn.setRandom(newMessageEntry.getRandom());
                                            mutationBatch.withRow(this.queue.queueColumnFamily, this.queue.getShardKey(extractMessageFromColumn)).putColumn((ColumnListMutation) newMessageEntry, column.getStringValue(), this.queue.metadata.getRetentionTimeout());
                                            MessageMetadataEntry newMessageId = MessageMetadataEntry.newMessageId(this.queue.getCompositeKey(this.queue.getShardKey(extractMessageFromColumn), newMessageEntry.getMessageId()));
                                            if (extractMessageFromColumn.hasKey()) {
                                                mutationBatch.withRow(this.queue.keyIndexColumnFamily, this.queue.getCompositeKey(this.queue.getName(), extractMessageFromColumn.getKey())).putEmptyColumn(newMessageId, this.queue.metadata.getRetentionTimeout());
                                            }
                                            messageContext.setAckMessageId(newMessageId.getName());
                                        } else {
                                            extractMessageFromColumn.setToken(null);
                                        }
                                        switch (name.getState()) {
                                            case Waiting:
                                                this.queue.stats.incProcessCount();
                                                break;
                                            case Busy:
                                                this.queue.stats.incReprocessCount();
                                                break;
                                            default:
                                                LOG.warn("Unknown message state: " + name.getState());
                                                break;
                                        }
                                        newArrayList.add(messageContext);
                                    } else {
                                        this.queue.stats.incInvalidMessageCount();
                                    }
                                    break;
                                } catch (DuplicateMessageException e4) {
                                    break;
                                }
                                break;
                        }
                    }
                    mutationBatch.execute();
                    return newArrayList;
                } catch (Exception e5) {
                    throw new MessageQueueException("Error processing queue shard : " + str, e5);
                }
            } catch (Throwable th) {
                try {
                    mutationBatch.execute();
                    throw th;
                } catch (Exception e6) {
                    throw new MessageQueueException("Error processing queue shard : " + str, e6);
                }
            }
        } catch (BusyLockException e7) {
            this.queue.stats.incLockContentionCount();
            throw e7;
        } catch (Exception e8) {
            throw new MessageQueueException("Error processing queue shard : " + str, e8);
        }
    }
}
