package voldemort.server.rebalance.async;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import voldemort.VoldemortException;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.rebalance.RebalancePartitionsInfo;
import voldemort.cluster.Cluster;
import voldemort.server.StoreRepository;
import voldemort.server.VoldemortConfig;
import voldemort.server.rebalance.Rebalancer;
import voldemort.store.StorageEngine;
import voldemort.store.StoreDefinition;
import voldemort.store.metadata.MetadataStore;
import voldemort.utils.ByteArray;
import voldemort.utils.ClosableIterator;
import voldemort.utils.Pair;
import voldemort.utils.RebalanceUtils;
import voldemort.versioning.Versioned;

/* loaded from: input_file:voldemort/server/rebalance/async/DonorBasedRebalanceAsyncOperation.class */
public class DonorBasedRebalanceAsyncOperation extends RebalanceAsyncOperation {
    public static final Pair<ByteArray, Versioned<byte[]>> END = Pair.create(new ByteArray("END".getBytes()), new Versioned("END".getBytes()));
    public static final Pair<ByteArray, Versioned<byte[]>> BREAK = Pair.create(new ByteArray("BREAK".getBytes()), new Versioned("BREAK".getBytes()));
    private static final int FETCHUPDATE_BATCH_SIZE = 1000;
    private static final int SCAN_PROGRESS_COUNT = 100000;
    private final List<RebalancePartitionsInfo> stealInfos;
    private final StoreRepository storeRepository;
    private final AtomicBoolean running;
    private final Cluster initialCluster;
    private final Cluster targetCluster;
    private final HashMultimap<String, Pair<Integer, HashMap<Integer, List<Integer>>>> storeToNodePartitionMapping;
    private Map<String, Pair<ExecutorService, List<DonorBasedRebalancePusherSlave>>> updatePushSlavePool;

    private HashMultimap<String, Pair<Integer, HashMap<Integer, List<Integer>>>> groupByStores(List<RebalancePartitionsInfo> list) {
        HashMultimap<String, Pair<Integer, HashMap<Integer, List<Integer>>>> create = HashMultimap.create();
        for (RebalancePartitionsInfo rebalancePartitionsInfo : list) {
            int stealerId = rebalancePartitionsInfo.getStealerId();
            for (Map.Entry<String, HashMap<Integer, List<Integer>>> entry : rebalancePartitionsInfo.getStoreToReplicaToAddPartitionList().entrySet()) {
                create.put(entry.getKey(), Pair.create(Integer.valueOf(stealerId), entry.getValue()));
            }
        }
        return create;
    }

    public DonorBasedRebalanceAsyncOperation(Rebalancer rebalancer, StoreRepository storeRepository, VoldemortConfig voldemortConfig, MetadataStore metadataStore, int i, List<RebalancePartitionsInfo> list) {
        super(rebalancer, voldemortConfig, metadataStore, i, "Donor based rebalance : " + list);
        this.running = new AtomicBoolean(true);
        this.storeRepository = storeRepository;
        this.stealInfos = list;
        this.targetCluster = metadataStore.getCluster();
        this.initialCluster = list.get(0).getInitialCluster();
        this.storeToNodePartitionMapping = groupByStores(list);
        this.updatePushSlavePool = Collections.synchronizedMap(new HashMap());
    }

    /*  JADX ERROR: NullPointerException in pass: RegionMakerVisitor
        java.lang.NullPointerException: Cannot invoke "java.util.List.isEmpty()" because "s" is null
        	at jadx.core.utils.BlockUtils.getNextBlock(BlockUtils.java:411)
        	at jadx.core.dex.visitors.regions.RegionMaker.traverse(RegionMaker.java:172)
        	at jadx.core.dex.visitors.regions.RegionMaker.makeRegion(RegionMaker.java:91)
        	at jadx.core.dex.visitors.regions.RegionMaker.processExcHandler(RegionMaker.java:1110)
        	at jadx.core.dex.visitors.regions.RegionMaker.processTryCatchBlocks(RegionMaker.java:1046)
        	at jadx.core.dex.visitors.regions.RegionMakerVisitor.visit(RegionMakerVisitor.java:55)
        */
    /* JADX WARN: Removed duplicated region for block: B:17:0x013f A[DONT_GENERATE, LOOP:1: B:15:0x0135->B:17:0x013f, LOOP_END] */
    @Override // voldemort.server.protocol.admin.AsyncOperation
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void operate() throws java.lang.Exception {
        /*
            Method dump skipped, instructions count: 349
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: voldemort.server.rebalance.async.DonorBasedRebalanceAsyncOperation.operate():void");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getHeader(List<RebalancePartitionsInfo> list) {
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<RebalancePartitionsInfo> it = list.iterator();
        while (it.hasNext()) {
            newArrayList.add(Integer.valueOf(it.next().getStealerId()));
        }
        return " Donor " + list.get(0).getDonorId() + ", Stealer " + newArrayList + "] ";
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void rebalanceStore(String str, AdminClient adminClient, Set<Pair<Integer, HashMap<Integer, List<Integer>>>> set, boolean z) {
        StorageEngine<ByteArray, byte[], byte[]> storageEngine = this.storeRepository.getStorageEngine(str);
        StoreDefinition storeDef = this.metadataStore.getStoreDef(str);
        ArrayList newArrayList = Lists.newArrayList();
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(new ThreadFactory() { // from class: voldemort.server.rebalance.async.DonorBasedRebalanceAsyncOperation.2
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable);
                thread.setName(runnable.getClass().getName());
                return thread;
            }
        });
        this.updatePushSlavePool.put(str, new Pair<>(newCachedThreadPool, newArrayList));
        if (z) {
            throw new VoldemortException("Donor-based rebalancing for read-only store is currently not supported!");
        }
        HashMap<Integer, SynchronousQueue<Pair<ByteArray, Versioned<byte[]>>>> newHashMap = Maps.newHashMap();
        HashSet newHashSet = Sets.newHashSet();
        if (!this.voldemortConfig.getRebalancingOptimization() || storageEngine.isPartitionAware()) {
            newHashSet.addAll(set);
        } else {
            for (Pair<Integer, HashMap<Integer, List<Integer>>> pair : set) {
                HashMap<Integer, List<Integer>> optimizedReplicaToPartitionList = RebalanceUtils.getOptimizedReplicaToPartitionList(pair.getFirst().intValue(), this.initialCluster, storeDef, pair.getSecond());
                if (optimizedReplicaToPartitionList.size() > 0) {
                    newHashSet.add(Pair.create(pair.getFirst(), optimizedReplicaToPartitionList));
                }
            }
        }
        if (newHashSet.size() <= 0) {
            return;
        }
        for (Pair<Integer, HashMap<Integer, List<Integer>>> pair2 : set) {
            SynchronousQueue<Pair<ByteArray, Versioned<byte[]>>> synchronousQueue = new SynchronousQueue<>();
            newHashMap.put(pair2.getFirst(), synchronousQueue);
            String str2 = "DonorBasedRebalancePusherSlave for store " + str + " on node " + pair2.getFirst();
            DonorBasedRebalancePusherSlave donorBasedRebalancePusherSlave = new DonorBasedRebalancePusherSlave(pair2.getFirst().intValue(), synchronousQueue, str, adminClient);
            newArrayList.add(donorBasedRebalancePusherSlave);
            newCachedThreadPool.execute(donorBasedRebalancePusherSlave);
            logger.info("Started a thread for " + str2);
        }
        fetchEntriesForStealers(storageEngine, newHashSet, storeDef, newHashMap, str);
    }

    private void fetchEntriesForStealers(StorageEngine<ByteArray, byte[], byte[]> storageEngine, Set<Pair<Integer, HashMap<Integer, List<Integer>>>> set, StoreDefinition storeDefinition, HashMap<Integer, SynchronousQueue<Pair<ByteArray, Versioned<byte[]>>>> hashMap, String str) {
        int i = 0;
        int[] iArr = new int[this.targetCluster.getNumberOfNodes()];
        long currentTimeMillis = System.currentTimeMillis();
        ClosableIterator<ByteArray> keys = storageEngine.keys();
        while (this.running.get() && keys.hasNext()) {
            try {
                ByteArray next = keys.next();
                i++;
                List<Integer> checkKeyBelongsToPartition = RebalanceUtils.checkKeyBelongsToPartition(next.get(), set, this.targetCluster, storeDefinition);
                if (checkKeyBelongsToPartition.size() > 0) {
                    putAll(checkKeyBelongsToPartition, next, storageEngine.get(next, null), hashMap, iArr);
                }
                if (0 == i % SCAN_PROGRESS_COUNT) {
                    printProgress(i, iArr, currentTimeMillis, str);
                }
            } catch (InterruptedException e) {
                logger.info("InterruptedException received while sending entries to remote nodes, the process is terminating...");
                terminateAllSlavesAsync(str);
                return;
            } finally {
                close(keys, str, i, iArr, currentTimeMillis);
            }
        }
        terminateAllSlaves(str);
    }

    private void putAll(List<Integer> list, ByteArray byteArray, List<Versioned<byte[]>> list2, HashMap<Integer, SynchronousQueue<Pair<ByteArray, Versioned<byte[]>>>> hashMap, int[] iArr) throws InterruptedException {
        for (Versioned<byte[]> versioned : list2) {
            Iterator<Integer> it = list.iterator();
            while (it.hasNext()) {
                int intValue = it.next().intValue();
                iArr[intValue] = iArr[intValue] + 1;
                hashMap.get(Integer.valueOf(intValue)).put(Pair.create(byteArray, versioned));
                if (0 == iArr[intValue] % FETCHUPDATE_BATCH_SIZE) {
                    hashMap.get(Integer.valueOf(intValue)).put(BREAK);
                }
            }
        }
    }

    private void printProgress(int i, int[] iArr, long j, String str) {
        logger.info("Successfully scanned " + i + " tuples in " + ((System.currentTimeMillis() - j) / 1000) + " s");
        for (int i2 = 0; i2 < iArr.length; i2++) {
            logger.info(iArr[i2] + " tuples fetched for store '" + str + " for node " + i2);
        }
    }

    private void close(ClosableIterator<ByteArray> closableIterator, String str, int i, int[] iArr, long j) {
        printProgress(i, iArr, j, str);
        if (null != closableIterator) {
            closableIterator.close();
        }
    }

    private void terminateAllSlaves(String str) {
        logger.info("Terminating DonorBasedRebalancePushSlaves...");
        ExecutorService first = this.updatePushSlavePool.get(str).getFirst();
        Iterator<DonorBasedRebalancePusherSlave> it = this.updatePushSlavePool.get(str).getSecond().iterator();
        while (it.hasNext()) {
            it.next().requestCompletion();
        }
        first.shutdown();
        try {
            if (first.awaitTermination(30L, TimeUnit.MINUTES)) {
                logger.info("All DonorBasedRebalancePushSlaves terminated successfully.");
            } else {
                logger.warn("Timed out while waiting for pusher slaves to shutdown!!!");
            }
        } catch (InterruptedException e) {
            logger.warn("Interrupted while waiting for pusher slaves to shutdown!!!");
        }
        logger.info("DonorBasedRebalancingOperation existed.");
    }

    private void terminateAllSlavesAsync(String str) {
        logger.info("Terminating DonorBasedRebalancePushSlaves asynchronously.");
        if (null == str) {
            for (Pair<ExecutorService, List<DonorBasedRebalancePusherSlave>> pair : this.updatePushSlavePool.values()) {
                ExecutorService first = pair.getFirst();
                Iterator<DonorBasedRebalancePusherSlave> it = pair.getSecond().iterator();
                while (it.hasNext()) {
                    it.next().requestCompletion();
                }
                first.shutdownNow();
            }
        } else {
            ExecutorService first2 = this.updatePushSlavePool.get(str).getFirst();
            Iterator<DonorBasedRebalancePusherSlave> it2 = this.updatePushSlavePool.get(str).getSecond().iterator();
            while (it2.hasNext()) {
                it2.next().requestCompletion();
            }
            first2.shutdownNow();
        }
        logger.info("DonorBasedRebalancingAsyncOperation existed.");
    }

    @Override // voldemort.server.protocol.admin.AsyncOperation
    public void stop() {
        this.running.set(false);
        updateStatus(getHeader(this.stealInfos) + "Stop called on donor-based rebalance operation");
        logger.info(getHeader(this.stealInfos) + "Stop called on donor-based rebalance operation");
        terminateAllSlavesAsync(null);
        this.executors.shutdownNow();
    }
}
