package voldemort.server.scheduler.slop;

import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import org.apache.log4j.Logger;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.cluster.Cluster;
import voldemort.cluster.failuredetector.FailureDetector;
import voldemort.server.StoreRepository;
import voldemort.server.VoldemortConfig;
import voldemort.server.storage.ScanPermitWrapper;
import voldemort.store.UnreachableStoreException;
import voldemort.store.metadata.MetadataStore;
import voldemort.store.slop.Slop;
import voldemort.store.slop.SlopStorageEngine;
import voldemort.utils.ByteArray;
import voldemort.utils.EventThrottler;
import voldemort.utils.Pair;
import voldemort.utils.Utils;
import voldemort.versioning.VectorClock;
import voldemort.versioning.Version;
import voldemort.versioning.Versioned;

/* loaded from: input_file:voldemort/server/scheduler/slop/StreamingSlopPusherJob.class */
public class StreamingSlopPusherJob implements Runnable {
    public static final String TYPE_NAME = "streaming";
    private final MetadataStore metadataStore;
    private final StoreRepository storeRepo;
    private final FailureDetector failureDetector;
    private ConcurrentMap<Integer, SynchronousQueue<Versioned<Slop>>> slopQueues;
    private final EventThrottler readThrottler;
    private Cluster cluster;
    private final VoldemortConfig voldemortConfig;
    private ConcurrentHashMap<Integer, Long> attemptedByNode;
    private ConcurrentHashMap<Integer, Long> succeededByNode;
    private final ScanPermitWrapper repairPermits;
    private static final Logger logger = Logger.getLogger(StreamingSlopPusherJob.class.getName());
    private static final Versioned<Slop> END = Versioned.value(null);
    private AdminClient adminClient = null;
    private final List<Future> consumerResults = Lists.newArrayList();
    private final Map<Integer, Set<Integer>> zoneMapping = Maps.newHashMap();
    private ExecutorService consumerExecutor = Executors.newCachedThreadPool(new ThreadFactory() { // from class: voldemort.server.scheduler.slop.StreamingSlopPusherJob.1
        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable);
            thread.setName("slop-pusher");
            return thread;
        }
    });

    /* loaded from: input_file:voldemort/server/scheduler/slop/StreamingSlopPusherJob$SlopConsumer.class */
    private class SlopConsumer implements Runnable {
        private final int nodeId;
        private SynchronousQueue<Versioned<Slop>> slopQueue;
        private long startTime;
        private SlopStorageEngine slopStorageEngine;
        private List<Pair<ByteArray, Version>> previous = Lists.newArrayList();
        private List<Pair<ByteArray, Version>> current = Lists.newArrayList();

        public SlopConsumer(int i, SynchronousQueue<Versioned<Slop>> synchronousQueue, SlopStorageEngine slopStorageEngine) {
            this.nodeId = i;
            this.slopQueue = synchronousQueue;
            this.slopStorageEngine = slopStorageEngine;
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v15, types: [java.util.Iterator, voldemort.server.scheduler.slop.StreamingSlopPusherJob$SlopIterator] */
        @Override // java.lang.Runnable
        public void run() {
            ?? slopIterator;
            do {
                try {
                    try {
                        if (!this.current.isEmpty()) {
                            if (!this.previous.isEmpty()) {
                                for (Pair<ByteArray, Version> pair : this.previous) {
                                    this.slopStorageEngine.delete(pair.getFirst(), pair.getSecond());
                                }
                                StreamingSlopPusherJob.this.succeededByNode.put(Integer.valueOf(this.nodeId), Long.valueOf(((Long) StreamingSlopPusherJob.this.succeededByNode.get(Integer.valueOf(this.nodeId))).longValue() + this.previous.size()));
                                this.previous.clear();
                            }
                            this.previous = null;
                            this.previous = this.current;
                            this.current = Lists.newArrayList();
                        }
                        this.startTime = System.currentTimeMillis();
                        slopIterator = new SlopIterator(this.slopQueue, this.current);
                        StreamingSlopPusherJob.this.adminClient.storeOps.updateSlopEntries(this.nodeId, slopIterator);
                    } catch (UnreachableStoreException e) {
                        StreamingSlopPusherJob.this.failureDetector.recordException(StreamingSlopPusherJob.this.metadataStore.getCluster().getNodeById(this.nodeId), System.currentTimeMillis() - this.startTime, e);
                        throw e;
                    }
                } finally {
                    this.slopQueue.clear();
                    StreamingSlopPusherJob.this.slopQueues.remove(Integer.valueOf(this.nodeId));
                }
            } while (!slopIterator.isComplete());
            if (!this.previous.isEmpty()) {
                for (Pair<ByteArray, Version> pair2 : this.previous) {
                    this.slopStorageEngine.delete(pair2.getFirst(), pair2.getSecond());
                }
                StreamingSlopPusherJob.this.succeededByNode.put(Integer.valueOf(this.nodeId), Long.valueOf(((Long) StreamingSlopPusherJob.this.succeededByNode.get(Integer.valueOf(this.nodeId))).longValue() + this.previous.size()));
                this.previous.clear();
            }
            if (!this.current.isEmpty()) {
                for (Pair<ByteArray, Version> pair3 : this.current) {
                    this.slopStorageEngine.delete(pair3.getFirst(), pair3.getSecond());
                }
                StreamingSlopPusherJob.this.succeededByNode.put(Integer.valueOf(this.nodeId), Long.valueOf(((Long) StreamingSlopPusherJob.this.succeededByNode.get(Integer.valueOf(this.nodeId))).longValue() + this.current.size()));
                this.current.clear();
            }
        }
    }

    /* loaded from: input_file:voldemort/server/scheduler/slop/StreamingSlopPusherJob$SlopIterator.class */
    private class SlopIterator extends AbstractIterator<Versioned<Slop>> {
        private final SynchronousQueue<Versioned<Slop>> slopQueue;
        private final List<Pair<ByteArray, Version>> deleteBatch;
        private final EventThrottler writeThrottler;
        private int writtenLast = 0;
        private long slopsDone = 0;
        private boolean shutDown = false;
        private boolean isComplete = false;

        public SlopIterator(SynchronousQueue<Versioned<Slop>> synchronousQueue, List<Pair<ByteArray, Version>> list) {
            this.slopQueue = synchronousQueue;
            this.deleteBatch = list;
            this.writeThrottler = new EventThrottler(StreamingSlopPusherJob.this.voldemortConfig.getSlopMaxWriteBytesPerSec());
        }

        public boolean isComplete() {
            return this.isComplete;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: computeNext, reason: merged with bridge method [inline-methods] */
        public Versioned<Slop> m2250computeNext() {
            try {
                if (!this.shutDown) {
                    Versioned<Slop> take = this.slopQueue.take();
                    if (!take.equals(StreamingSlopPusherJob.END)) {
                        this.slopsDone++;
                        if (this.slopsDone % StreamingSlopPusherJob.this.voldemortConfig.getSlopBatchSize() == 0) {
                            this.shutDown = true;
                        }
                        this.writeThrottler.maybeThrottle(this.writtenLast);
                        this.writtenLast = StreamingSlopPusherJob.this.slopSize(take);
                        this.deleteBatch.add(Pair.create(take.getValue().makeKey(), take.getVersion()));
                        return take;
                    }
                    this.shutDown = true;
                    this.isComplete = true;
                }
                return (Versioned) endOfData();
            } catch (Exception e) {
                StreamingSlopPusherJob.logger.error("Got an exception " + e);
                return (Versioned) endOfData();
            }
        }
    }

    public StreamingSlopPusherJob(StoreRepository storeRepository, MetadataStore metadataStore, FailureDetector failureDetector, VoldemortConfig voldemortConfig, ScanPermitWrapper scanPermitWrapper) {
        this.storeRepo = storeRepository;
        this.metadataStore = metadataStore;
        this.failureDetector = failureDetector;
        this.voldemortConfig = voldemortConfig;
        this.repairPermits = (ScanPermitWrapper) Utils.notNull(scanPermitWrapper);
        this.readThrottler = new EventThrottler(voldemortConfig.getSlopMaxReadBytesPerSec());
    }

    /*  JADX ERROR: JadxRuntimeException in pass: BlockProcessor
        jadx.core.utils.exceptions.JadxRuntimeException: Unreachable block: B:75:0x0447
        	at jadx.core.dex.visitors.blocks.BlockProcessor.checkForUnreachableBlocks(BlockProcessor.java:88)
        	at jadx.core.dex.visitors.blocks.BlockProcessor.processBlocksTree(BlockProcessor.java:52)
        	at jadx.core.dex.visitors.blocks.BlockProcessor.visit(BlockProcessor.java:44)
        */
    @Override // java.lang.Runnable
    public void run() {
        /*
            Method dump skipped, instructions count: 1545
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: voldemort.server.scheduler.slop.StreamingSlopPusherJob.run():void");
    }

    private void loadMetadata() {
        this.cluster = this.metadataStore.getCluster();
        this.slopQueues = new ConcurrentHashMap(this.cluster.getNumberOfNodes());
        this.attemptedByNode = new ConcurrentHashMap<>(this.cluster.getNumberOfNodes());
        this.succeededByNode = new ConcurrentHashMap<>(this.cluster.getNumberOfNodes());
    }

    private void stopAdminClient() {
        if (this.adminClient != null) {
            this.adminClient.stop();
            this.adminClient = null;
        }
    }

    private int nBytesRead(Pair<ByteArray, Versioned<Slop>> pair) {
        return pair.getFirst().length() + slopSize(pair.getSecond());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int slopSize(Versioned<Slop> versioned) {
        Slop value = versioned.getValue();
        int length = 0 + value.getKey().length() + ((VectorClock) versioned.getVersion()).sizeInBytes();
        switch (value.getOperation()) {
            case PUT:
                length += value.getValue().length;
                break;
            case DELETE:
                break;
            default:
                logger.error("Unknown slop operation: " + value.getOperation());
                break;
        }
        return length;
    }

    private void acquireRepairPermit() {
        logger.info("Acquiring lock to perform streaming slop pusher job ");
        try {
            this.repairPermits.acquire(null);
            logger.info("Acquired lock to perform streaming slop pusher job ");
        } catch (InterruptedException e) {
            stopAdminClient();
            throw new IllegalStateException("Streaming slop pusher job interrupted while waiting for permit.", e);
        }
    }
}
