package voldemort.server.rebalance.async;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.rebalance.RebalancePartitionsInfo;
import voldemort.server.VoldemortConfig;
import voldemort.server.rebalance.Rebalancer;
import voldemort.server.rebalance.VoldemortRebalancingException;
import voldemort.store.metadata.MetadataStore;
import voldemort.store.readonly.ReadOnlyStorageConfiguration;
import voldemort.utils.RebalanceUtils;

/* loaded from: input_file:voldemort/server/rebalance/async/StealerBasedRebalanceAsyncOperation.class */
public class StealerBasedRebalanceAsyncOperation extends RebalanceAsyncOperation {
    private List<Integer> rebalanceStatusList;
    private final RebalancePartitionsInfo stealInfo;

    public StealerBasedRebalanceAsyncOperation(Rebalancer rebalancer, VoldemortConfig voldemortConfig, MetadataStore metadataStore, int i, RebalancePartitionsInfo rebalancePartitionsInfo) {
        super(rebalancer, voldemortConfig, metadataStore, i, "Stealer based rebalance : " + rebalancePartitionsInfo);
        this.rebalancer = rebalancer;
        this.stealInfo = rebalancePartitionsInfo;
        this.rebalanceStatusList = new ArrayList();
    }

    @Override // voldemort.server.protocol.admin.AsyncOperation
    public void operate() throws Exception {
        this.adminClient = RebalanceUtils.createTempAdminClient(this.voldemortConfig, this.metadataStore.getCluster(), this.voldemortConfig.getMaxParallelStoresRebalancing());
        final ArrayList arrayList = new ArrayList();
        final ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final int size = this.stealInfo.getUnbalancedStoreList().size();
        try {
            Iterator it = ImmutableList.copyOf(this.stealInfo.getUnbalancedStoreList()).iterator();
            while (it.hasNext()) {
                final String str = (String) it.next();
                this.executors.submit(new Runnable() { // from class: voldemort.server.rebalance.async.StealerBasedRebalanceAsyncOperation.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            boolean z = StealerBasedRebalanceAsyncOperation.this.metadataStore.getStoreDef(str).getType().compareTo(ReadOnlyStorageConfiguration.TYPE_NAME) == 0;
                            concurrentLinkedQueue.add(str);
                            StealerBasedRebalanceAsyncOperation.this.updateStatus(StealerBasedRebalanceAsyncOperation.this.getHeader(StealerBasedRebalanceAsyncOperation.this.stealInfo) + "Completed working on " + atomicInteger.get() + " out of " + size + " stores. Still rebalancing " + concurrentLinkedQueue);
                            StealerBasedRebalanceAsyncOperation.this.rebalanceStore(str, StealerBasedRebalanceAsyncOperation.this.adminClient, StealerBasedRebalanceAsyncOperation.this.stealInfo, z);
                            StealerBasedRebalanceAsyncOperation.this.stealInfo.removeStore(str);
                            concurrentLinkedQueue.remove(str);
                            atomicInteger.getAndIncrement();
                            StealerBasedRebalanceAsyncOperation.this.updateStatus(StealerBasedRebalanceAsyncOperation.this.getHeader(StealerBasedRebalanceAsyncOperation.this.stealInfo) + "Completed working on " + atomicInteger.get() + " out of " + size + " stores. Still rebalancing " + concurrentLinkedQueue);
                        } catch (Exception e) {
                            RebalanceAsyncOperation.logger.error(StealerBasedRebalanceAsyncOperation.this.getHeader(StealerBasedRebalanceAsyncOperation.this.stealInfo) + "Error while rebalancing for store " + str + " - " + e.getMessage(), e);
                            arrayList.add(e);
                        }
                    }
                });
            }
            waitForShutdown();
            if (!Lists.newArrayList(this.stealInfo.getUnbalancedStoreList()).isEmpty()) {
                throw new VoldemortRebalancingException(getHeader(this.stealInfo) + "Failed to rebalance task " + this.stealInfo + ". Could only complete " + atomicInteger.get() + " out of " + size + " stores", arrayList);
            }
            logger.info(getHeader(this.stealInfo) + "Rebalance of " + this.stealInfo + " completed successfully for all " + size + " stores");
            updateStatus(getHeader(this.stealInfo) + "Rebalance of " + this.stealInfo + " completed successfully for all " + size + " stores");
            this.metadataStore.deleteRebalancingState(this.stealInfo);
        } finally {
            logger.info(getHeader(this.stealInfo) + "Releasing permit for donor node " + this.stealInfo.getDonorId());
            this.rebalancer.releaseRebalancingPermit(this.stealInfo.getDonorId());
            this.adminClient.stop();
            this.adminClient = null;
        }
    }

    @Override // voldemort.server.protocol.admin.AsyncOperation
    public void stop() {
        updateStatus(getHeader(this.stealInfo) + "Stop called on rebalance operation");
        if (null != this.adminClient) {
            Iterator<Integer> it = this.rebalanceStatusList.iterator();
            while (it.hasNext()) {
                this.adminClient.stopAsyncRequest(this.metadataStore.getNodeId(), it.next().intValue());
            }
        }
        this.executors.shutdownNow();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getHeader(RebalancePartitionsInfo rebalancePartitionsInfo) {
        return "Stealer " + rebalancePartitionsInfo.getStealerId() + ", Donor " + rebalancePartitionsInfo.getDonorId() + "] ";
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void rebalanceStore(String str, AdminClient adminClient, RebalancePartitionsInfo rebalancePartitionsInfo, boolean z) {
        if (rebalancePartitionsInfo.getReplicaToAddPartitionList(str) != null && rebalancePartitionsInfo.getReplicaToAddPartitionList(str).size() > 0) {
            logger.info(getHeader(rebalancePartitionsInfo) + "Starting partitions migration for store " + str + " from donor node " + rebalancePartitionsInfo.getDonorId());
            int migratePartitions = adminClient.migratePartitions(rebalancePartitionsInfo.getDonorId(), this.metadataStore.getNodeId(), str, rebalancePartitionsInfo.getReplicaToAddPartitionList(str), null, rebalancePartitionsInfo.getInitialCluster(), true);
            this.rebalanceStatusList.add(Integer.valueOf(migratePartitions));
            if (logger.isDebugEnabled()) {
                logger.debug(getHeader(rebalancePartitionsInfo) + "Waiting for completion for " + str + " with async id " + migratePartitions);
            }
            adminClient.waitForCompletion(this.metadataStore.getNodeId(), migratePartitions, this.voldemortConfig.getRebalancingTimeoutSec(), TimeUnit.SECONDS, getStatus());
            this.rebalanceStatusList.remove(Integer.valueOf(migratePartitions));
            logger.info(getHeader(rebalancePartitionsInfo) + "Completed partition migration for store " + str + " from donor node " + rebalancePartitionsInfo.getDonorId());
        }
        if (rebalancePartitionsInfo.getReplicaToDeletePartitionList(str) != null && rebalancePartitionsInfo.getReplicaToDeletePartitionList(str).size() > 0 && !z) {
            logger.info(getHeader(rebalancePartitionsInfo) + "Deleting partitions for store " + str + " on donor node " + rebalancePartitionsInfo.getDonorId());
            adminClient.deletePartitions(rebalancePartitionsInfo.getDonorId(), str, rebalancePartitionsInfo.getReplicaToDeletePartitionList(str), rebalancePartitionsInfo.getInitialCluster(), null);
            logger.info(getHeader(rebalancePartitionsInfo) + "Deleted partitions for store " + str + " on donor node " + rebalancePartitionsInfo.getDonorId());
        }
        logger.info(getHeader(rebalancePartitionsInfo) + "Finished all migration for store " + str);
    }
}
