package voldemort.store.routed;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import voldemort.ServerTestUtils;
import voldemort.TestUtils;
import voldemort.VoldemortTestConstants;
import voldemort.client.RoutingTier;
import voldemort.client.TimeoutConfig;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.cluster.failuredetector.BannagePeriodFailureDetector;
import voldemort.cluster.failuredetector.FailureDetector;
import voldemort.cluster.failuredetector.FailureDetectorConfig;
import voldemort.cluster.failuredetector.FailureDetectorListener;
import voldemort.cluster.failuredetector.FailureDetectorUtils;
import voldemort.cluster.failuredetector.MutableStoreVerifier;
import voldemort.performance.benchmark.Benchmark;
import voldemort.routing.RoutingStrategy;
import voldemort.routing.RoutingStrategyFactory;
import voldemort.serialization.SerializerDefinition;
import voldemort.server.StoreRepository;
import voldemort.server.scheduler.slop.StreamingSlopPusherJob;
import voldemort.server.storage.ScanPermitWrapper;
import voldemort.store.ForceFailStore;
import voldemort.store.Store;
import voldemort.store.StoreDefinition;
import voldemort.store.StoreDefinitionBuilder;
import voldemort.store.UnreachableStoreException;
import voldemort.store.logging.LoggingStore;
import voldemort.store.memory.InMemoryStorageEngine;
import voldemort.store.slop.Slop;
import voldemort.store.slop.SlopStorageEngine;
import voldemort.store.slop.strategy.HintedHandoffStrategyType;
import voldemort.utils.ByteArray;
import voldemort.utils.ByteUtils;
import voldemort.versioning.Version;
import voldemort.versioning.Versioned;

@RunWith(Parameterized.class)
/* loaded from: input_file:voldemort/store/routed/HintedHandoffTest.class */
public class HintedHandoffTest {
    private static final Logger logger = Logger.getLogger(HintedHandoffTest.class);
    private static final String STORE_NAME = "test";
    private static final String SLOP_STORE_NAME = "slop";
    private static final int NUM_THREADS = 9;
    private static final int NUM_NODES_TOTAL = 9;
    private static final int NUM_NODES_FAILED = 4;
    private static final int REPLICATION_FACTOR = 3;
    private static final int P_READS = 1;
    private static final int R_READS = 1;
    private static final int P_WRITES = 2;
    private static final int R_WRITES = 1;
    private static final int KEY_LENGTH = 32;
    private static final int VALUE_LENGTH = 32;
    private final HintedHandoffStrategyType hintRoutingStrategy;
    private Cluster cluster;
    private FailureDetector failureDetector;
    private StoreDefinition storeDef;
    private ExecutorService routedStoreThreadPool;
    private RoutedStoreFactory routedStoreFactory;
    private RoutingStrategy strategy;
    private RoutedStore store;
    private final Class<? extends FailureDetector> failureDetectorCls = BannagePeriodFailureDetector.class;
    private final Map<Integer, Store<ByteArray, byte[], byte[]>> subStores = new ConcurrentHashMap();
    private final Map<Integer, Store<ByteArray, Slop, byte[]>> slopStores = new ConcurrentHashMap();
    private final List<StreamingSlopPusherJob> slopPusherJobs = Lists.newLinkedList();
    private final Multimap<ByteArray, Integer> keysToNodes = HashMultimap.create();
    private final Map<ByteArray, ByteArray> keyValues = Maps.newHashMap();

    public HintedHandoffTest(HintedHandoffStrategyType hintedHandoffStrategyType) {
        this.hintRoutingStrategy = hintedHandoffStrategyType;
    }

    @Parameterized.Parameters
    public static Collection<Object[]> configs() {
        return Arrays.asList(new Object[]{HintedHandoffStrategyType.ANY_STRATEGY}, new Object[]{HintedHandoffStrategyType.PROXIMITY_STRATEGY});
    }

    private StoreDefinition getStoreDef(String str, int i, int i2, int i3, int i4, int i5, String str2) {
        SerializerDefinition serializerDefinition = new SerializerDefinition(Benchmark.STRING_KEY_TYPE);
        return new StoreDefinitionBuilder().setName(str).setType("memory").setKeySerializer(serializerDefinition).setValueSerializer(serializerDefinition).setRoutingPolicy(RoutingTier.SERVER).setRoutingStrategyType(str2).setReplicationFactor(i).setPreferredReads(Integer.valueOf(i2)).setRequiredReads(i3).setPreferredWrites(Integer.valueOf(i4)).setRequiredWrites(i5).setHintedHandoffStrategy(this.hintRoutingStrategy).build();
    }

    @Before
    public void setUp() throws Exception {
        this.cluster = VoldemortTestConstants.getNineNodeCluster();
        this.storeDef = getStoreDef(STORE_NAME, REPLICATION_FACTOR, 1, 1, 2, 1, "consistent-routing");
        Iterator it = this.cluster.getNodes().iterator();
        while (it.hasNext()) {
            this.subStores.put(Integer.valueOf(((Node) it.next()).getId()), new ForceFailStore(new LoggingStore(new InMemoryStorageEngine(STORE_NAME)), new UnreachableStoreException("Node down")));
        }
        setFailureDetector(this.subStores);
        this.routedStoreThreadPool = Executors.newFixedThreadPool(9);
        this.routedStoreFactory = new RoutedStoreFactory(true, this.routedStoreThreadPool, new TimeoutConfig(1500L, false));
        this.strategy = new RoutingStrategyFactory().updateRoutingStrategy(this.storeDef, this.cluster);
        HashMap newHashMap = Maps.newHashMap();
        Iterator it2 = this.cluster.getNodes().iterator();
        while (it2.hasNext()) {
            int id = ((Node) it2.next()).getId();
            StoreRepository storeRepository = new StoreRepository();
            storeRepository.addLocalStore(this.subStores.get(Integer.valueOf(id)));
            for (int i = 0; i < 9; i++) {
                storeRepository.addNodeStore(i, this.subStores.get(Integer.valueOf(i)));
            }
            SlopStorageEngine slopStorageEngine = new SlopStorageEngine(new InMemoryStorageEngine(SLOP_STORE_NAME), this.cluster);
            Store<ByteArray, Slop, byte[]> asSlopStore = slopStorageEngine.asSlopStore();
            storeRepository.setSlopStore(slopStorageEngine);
            newHashMap.put(Integer.valueOf(id), this.routedStoreFactory.toNonblockingStore(slopStorageEngine));
            this.slopStores.put(Integer.valueOf(id), asSlopStore);
            this.slopPusherJobs.add(new StreamingSlopPusherJob(storeRepository, ServerTestUtils.createMetadataStore(this.cluster, Lists.newArrayList(new StoreDefinition[]{this.storeDef})), this.failureDetector, ServerTestUtils.createServerConfigWithDefs(false, id, TestUtils.createTempDir().getAbsolutePath(), this.cluster, Lists.newArrayList(new StoreDefinition[]{this.storeDef}), new Properties()), new ScanPermitWrapper(1)));
        }
        HashMap newHashMap2 = Maps.newHashMap();
        for (Map.Entry<Integer, Store<ByteArray, byte[], byte[]>> entry : this.subStores.entrySet()) {
            newHashMap2.put(entry.getKey(), this.routedStoreFactory.toNonblockingStore(entry.getValue()));
        }
        this.store = this.routedStoreFactory.create(this.cluster, this.storeDef, this.subStores, newHashMap2, this.slopStores, newHashMap, false, 0, this.failureDetector);
        generateData();
    }

    @After
    public void tearDown() throws Exception {
        if (this.failureDetector != null) {
            this.failureDetector.destroy();
        }
        if (this.routedStoreThreadPool != null) {
            this.routedStoreThreadPool.shutdown();
        }
    }

    @Test
    public void testHintedHandoff() throws Exception {
        Multimap<Integer, ByteArray> populateStore = populateStore(getFailedNodes());
        Thread.sleep(5000L);
        HashMap newHashMap = Maps.newHashMap();
        Set<ByteArray> makeSlopKeys = makeSlopKeys(populateStore, Slop.Operation.PUT);
        Iterator<Store<ByteArray, Slop, byte[]>> it = this.slopStores.values().iterator();
        while (it.hasNext()) {
            Iterator it2 = it.next().getAll(makeSlopKeys, (Map) null).entrySet().iterator();
            while (it2.hasNext()) {
                Slop slop = (Slop) ((Versioned) ((List) ((Map.Entry) it2.next()).getValue()).get(0)).getValue();
                newHashMap.put(slop.getKey(), slop.getValue());
                if (logger.isTraceEnabled()) {
                    logger.trace(slop);
                }
            }
        }
        for (Map.Entry entry : populateStore.entries()) {
            byte[] bArr = this.keyValues.get(entry.getValue()).get();
            Assert.assertNotNull("data should be stored in the slop for key = " + entry.getValue(), (byte[]) newHashMap.get(entry.getValue()));
            Assert.assertEquals("correct should be stored in slop", 0L, ByteUtils.compare(r0, bArr));
        }
    }

    /* JADX WARN: Type inference failed for: r2v4, types: [byte[], byte[][]] */
    private Set<ByteArray> makeSlopKeys(Multimap<Integer, ByteArray> multimap, Slop.Operation operation) {
        HashSet newHashSet = Sets.newHashSet();
        for (Map.Entry entry : multimap.entries()) {
            byte[] bArr = {operation.getOpCode()};
            byte[] bArr2 = {0};
            byte[] bytes = ByteUtils.getBytes(STORE_NAME, "UTF-8");
            byte[] bArr3 = new byte[NUM_NODES_FAILED];
            ByteUtils.writeInt(bArr3, ((Integer) entry.getKey()).intValue(), 0);
            newHashSet.add(new ByteArray(ByteUtils.cat((byte[][]) new byte[]{bArr, bArr2, bytes, bArr2, bArr3, bArr2, ((ByteArray) entry.getValue()).get()})));
        }
        return newHashSet;
    }

    @Test
    @Ignore
    public void testSlopPushers() throws Exception {
        Multimap<Integer, ByteArray> populateStore = populateStore(getFailedNodes());
        Thread.sleep(5000L);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.slopPusherJobs.size());
        final CountDownLatch countDownLatch = new CountDownLatch(this.slopPusherJobs.size());
        for (final StreamingSlopPusherJob streamingSlopPusherJob : this.slopPusherJobs) {
            newFixedThreadPool.submit(new Runnable() { // from class: voldemort.store.routed.HintedHandoffTest.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        if (HintedHandoffTest.logger.isTraceEnabled()) {
                            HintedHandoffTest.logger.trace("Started slop pusher job " + streamingSlopPusherJob);
                        }
                        streamingSlopPusherJob.run();
                        if (HintedHandoffTest.logger.isTraceEnabled()) {
                            HintedHandoffTest.logger.trace("Finished slop pusher job " + streamingSlopPusherJob);
                        }
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            });
        }
        countDownLatch.await();
        Thread.sleep(5000L);
        for (Map.Entry entry : populateStore.entries()) {
            List list = this.store.get(entry.getValue(), (Object) null);
            Assert.assertTrue("slop entry should be pushed for " + entry.getValue() + ", preflist " + this.keysToNodes.get(entry.getValue()), list.size() > 0);
            Assert.assertEquals("slop entry should be correct for " + entry.getValue(), this.keyValues.get(entry.getValue()), new ByteArray((byte[]) ((Versioned) list.get(0)).getValue()));
        }
    }

    @Test
    @Ignore
    public void testDeleteHandoff() throws Exception {
        populateStore(Sets.newHashSet());
        HashMap newHashMap = Maps.newHashMap();
        for (ByteArray byteArray : this.keyValues.keySet()) {
            newHashMap.put(byteArray, ((Versioned) this.store.get(byteArray, (Object) null).get(0)).getVersion());
        }
        Set<Integer> failedNodes = getFailedNodes();
        ArrayListMultimap create = ArrayListMultimap.create();
        for (ByteArray byteArray2 : this.keysToNodes.keySet()) {
            Iterator it = this.strategy.routeRequest(byteArray2.get()).iterator();
            while (true) {
                if (it.hasNext()) {
                    Node node = (Node) it.next();
                    if (failedNodes.contains(Integer.valueOf(node.getId()))) {
                        create.put(Integer.valueOf(node.getId()), byteArray2);
                        break;
                    }
                }
            }
        }
        for (Map.Entry entry : create.entries()) {
            try {
                this.store.delete(entry.getValue(), (Version) newHashMap.get(entry.getValue()));
            } catch (Exception e) {
                if (logger.isTraceEnabled()) {
                    logger.trace(e, e);
                }
            }
        }
        Set<ByteArray> makeSlopKeys = makeSlopKeys(create, Slop.Operation.DELETE);
        HashSet newHashSet = Sets.newHashSet();
        Thread.sleep(5000L);
        Iterator<Store<ByteArray, Slop, byte[]>> it2 = this.slopStores.values().iterator();
        while (it2.hasNext()) {
            Iterator it3 = it2.next().getAll(makeSlopKeys, (Map) null).entrySet().iterator();
            while (it3.hasNext()) {
                Slop slop = (Slop) ((Versioned) ((List) ((Map.Entry) it3.next()).getValue()).get(0)).getValue();
                newHashSet.add(slop.getKey());
                if (logger.isTraceEnabled()) {
                    logger.trace(slop);
                }
            }
        }
        for (Map.Entry entry2 : create.entries()) {
            Assert.assertTrue("delete operation for " + entry2.getValue() + " should be handed off", newHashSet.contains(entry2.getValue()));
        }
    }

    private Set<Integer> getFailedNodes() {
        CopyOnWriteArraySet copyOnWriteArraySet = new CopyOnWriteArraySet();
        Random random = new Random();
        for (int i = 0; i < NUM_NODES_FAILED; i++) {
            copyOnWriteArraySet.add(Integer.valueOf(random.nextInt(9)));
        }
        Iterator it = copyOnWriteArraySet.iterator();
        while (it.hasNext()) {
            getForceFailStore(((Integer) it.next()).intValue()).setFail(true);
        }
        if (logger.isTraceEnabled()) {
            logger.trace("Failing requests to " + copyOnWriteArraySet);
        }
        return copyOnWriteArraySet;
    }

    private Multimap<Integer, ByteArray> populateStore(Set<Integer> set) {
        ArrayListMultimap create = ArrayListMultimap.create();
        for (ByteArray byteArray : this.keysToNodes.keySet()) {
            for (Node node : this.strategy.routeRequest(byteArray.get())) {
                if (set.contains(Integer.valueOf(node.getId()))) {
                    create.put(Integer.valueOf(node.getId()), byteArray);
                    break;
                }
            }
            try {
                this.store.put(byteArray, new Versioned(this.keyValues.get(byteArray).get()), (Object) null);
            } catch (Exception e) {
                if (logger.isTraceEnabled()) {
                    logger.trace(e, e);
                }
            }
        }
        return create;
    }

    private void generateData() {
        for (int i = 0; i < 2; i++) {
            HashSet newHashSet = Sets.newHashSet();
            while (newHashSet.size() < 9) {
                ByteArray byteArray = new ByteArray(TestUtils.randomBytes(32));
                byte[] randomBytes = TestUtils.randomBytes(32);
                if (byteArray.length() > 0 && randomBytes.length > 0) {
                    for (Node node : this.strategy.routeRequest(byteArray.get())) {
                        this.keysToNodes.put(byteArray, Integer.valueOf(node.getId()));
                        newHashSet.add(Integer.valueOf(node.getId()));
                    }
                    this.keyValues.put(byteArray, new ByteArray(randomBytes));
                }
            }
        }
    }

    private void setFailureDetector(Map<Integer, Store<ByteArray, byte[], byte[]>> map) throws Exception {
        if (this.failureDetector != null) {
            this.failureDetector.destroy();
        }
        FailureDetectorConfig failureDetectorConfig = new FailureDetectorConfig();
        failureDetectorConfig.setImplementationClassName(this.failureDetectorCls.getName());
        failureDetectorConfig.setBannagePeriod(500L);
        failureDetectorConfig.setCluster(this.cluster);
        failureDetectorConfig.setStoreVerifier(MutableStoreVerifier.create(map));
        this.failureDetector = FailureDetectorUtils.create(failureDetectorConfig, false, new FailureDetectorListener[0]);
    }

    public ForceFailStore<ByteArray, byte[], byte[]> getForceFailStore(int i) {
        return this.subStores.get(Integer.valueOf(i));
    }
}
