package voldemort.scheduled;

import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Properties;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import voldemort.ServerTestUtils;
import voldemort.TestUtils;
import voldemort.cluster.Cluster;
import voldemort.cluster.failuredetector.BannagePeriodFailureDetector;
import voldemort.cluster.failuredetector.FailureDetectorConfig;
import voldemort.cluster.failuredetector.ServerStoreVerifier;
import voldemort.server.VoldemortConfig;
import voldemort.server.VoldemortServer;
import voldemort.server.scheduler.slop.StreamingSlopPusherJob;
import voldemort.server.storage.ScanPermitWrapper;
import voldemort.store.StorageEngine;
import voldemort.store.metadata.MetadataStore;
import voldemort.store.slop.Slop;
import voldemort.store.slop.SlopStorageEngine;
import voldemort.store.socket.SocketStoreFactory;
import voldemort.store.socket.clientrequest.ClientRequestExecutorPool;
import voldemort.utils.ByteArray;
import voldemort.utils.ByteUtils;
import voldemort.versioning.ObsoleteVersionException;
import voldemort.versioning.VectorClock;
import voldemort.versioning.Version;
import voldemort.versioning.Versioned;
import voldemort.xml.StoreDefinitionsMapper;

/* loaded from: input_file:voldemort/scheduled/StreamingSlopPusherTest.class */
public class StreamingSlopPusherTest {
    private VoldemortServer[] servers;
    private Cluster cluster;
    private static final int NUM_SERVERS = 3;
    private static String storesXmlfile = "test/common/voldemort/config/stores.xml";
    private SocketStoreFactory socketStoreFactory = new ClientRequestExecutorPool(2, 100000, 100000, 65536);
    private MetadataStore metadataStore;
    private Properties props;
    private VoldemortConfig[] configs;

    @Before
    public void setUp() throws Exception {
        this.cluster = ServerTestUtils.getLocalCluster(NUM_SERVERS);
        this.servers = new VoldemortServer[NUM_SERVERS];
        this.props = new Properties();
        this.metadataStore = ServerTestUtils.createMetadataStore(this.cluster, new StoreDefinitionsMapper().readStoreList(new File(storesXmlfile)));
        this.props.put("pusher.type", "streaming");
        this.props.put("slop.frequency.ms", "1000000");
        this.configs = new VoldemortConfig[NUM_SERVERS];
        for (int i = 0; i < NUM_SERVERS; i++) {
            this.configs[i] = ServerTestUtils.createServerConfig(true, i, TestUtils.createTempDir().getAbsolutePath(), null, storesXmlfile, this.props);
        }
    }

    private void startServers(int... iArr) throws IOException {
        for (int i : iArr) {
            if (i < NUM_SERVERS) {
                this.servers[i] = ServerTestUtils.startVoldemortServer(this.socketStoreFactory, this.configs[i], this.cluster);
            }
        }
    }

    private void stopServers(int... iArr) throws IOException {
        for (int i : iArr) {
            if (i < NUM_SERVERS) {
                ServerTestUtils.stopVoldemortServer(this.servers[i]);
            }
        }
    }

    private VoldemortServer getVoldemortServer(int i) {
        return this.servers[i];
    }

    @After
    public void tearDown() {
        this.socketStoreFactory.close();
    }

    @Test
    public void testFailedServer() throws IOException, InterruptedException {
        startServers(0, 2);
        StorageEngine<ByteArray, Slop, byte[]> asSlopStore = getVoldemortServer(0).getStoreRepository().getSlopStore().asSlopStore();
        List<Versioned<Slop>> createRandomSlops = ServerTestUtils.createRandomSlops(1, 50, "test-replication-memory", "users", "test-replication-persistent", "test-readrepair-memory", "test-consistent", "test-consistent-with-pref-list");
        List<Versioned<Slop>> createRandomSlops2 = ServerTestUtils.createRandomSlops(2, 50, "test-replication-memory", "users", "test-replication-persistent", "test-readrepair-memory", "test-consistent", "test-consistent-with-pref-list");
        populateSlops(0, asSlopStore, createRandomSlops, createRandomSlops2);
        new StreamingSlopPusherJob(getVoldemortServer(0).getStoreRepository(), getVoldemortServer(0).getMetadataStore(), new BannagePeriodFailureDetector(new FailureDetectorConfig().setCluster(this.cluster).setStoreVerifier(new ServerStoreVerifier(this.socketStoreFactory, this.metadataStore, this.configs[0]))), this.configs[0], new ScanPermitWrapper(1)).run();
        Thread.sleep(2000L);
        ListIterator<Versioned<Slop>> listIterator = createRandomSlops2.listIterator();
        while (listIterator.hasNext()) {
            Slop slop = (Slop) listIterator.next().getValue();
            StorageEngine storageEngine = getVoldemortServer(2).getStoreRepository().getStorageEngine(slop.getStoreName());
            if (slop.getOperation().equals(Slop.Operation.PUT)) {
                Assert.assertNotSame("entry should be present at store", 0, Integer.valueOf(storageEngine.get(slop.getKey(), (Object) null).size()));
                Assert.assertEquals("entry value should match", new String(slop.getValue()), new String((byte[]) ((Versioned) storageEngine.get(slop.getKey(), (Object) null).get(0)).getValue()));
            } else if (slop.getOperation().equals(Slop.Operation.DELETE)) {
                Assert.assertEquals("entry value should match", 0L, storageEngine.get(slop.getKey(), (Object) null).size());
            }
            Assert.assertEquals("slop should have gone", 0L, asSlopStore.get(slop.makeKey(), (Object) null).size());
        }
        ListIterator<Versioned<Slop>> listIterator2 = createRandomSlops.listIterator();
        while (listIterator2.hasNext()) {
            Assert.assertNotSame("slop should be there", 0, Integer.valueOf(asSlopStore.get(((Slop) listIterator2.next().getValue()).makeKey(), (Object) null).size()));
        }
        SlopStorageEngine slopStore = getVoldemortServer(0).getStoreRepository().getSlopStore();
        Assert.assertEquals(slopStore.getOutstandingTotal(), 50L);
        Assert.assertEquals(slopStore.getOutstandingByNode().get(1), new Long(50L));
        Assert.assertEquals(slopStore.getOutstandingByNode().get(2), new Long(0L));
        stopServers(0, 2);
    }

    @Test
    @Ignore
    public void testOutOfOrder() throws InterruptedException, IOException {
        startServers(0, 1);
        StorageEngine<ByteArray, Slop, byte[]> asSlopStore = getVoldemortServer(0).getStoreRepository().getSlopStore().asSlopStore();
        long random = (long) (Math.random() * 1.0E9d);
        long random2 = (long) (Math.random() * 1.0E7d);
        ByteArray byteArray = new ByteArray(ByteUtils.getBytes("" + random, "UTF-8"));
        ByteArray byteArray2 = new ByteArray(ByteUtils.getBytes("" + random2, "UTF-8"));
        byte[] bytes = ByteUtils.getBytes("value-" + new String(byteArray.get()), "UTF-8");
        byte[] bytes2 = ByteUtils.getBytes("value-" + new String(byteArray2.get()), "UTF-8");
        Version vectorClock = new VectorClock();
        Version vectorClock2 = new VectorClock();
        ArrayList newArrayList = Lists.newArrayList();
        Iterator it = Lists.newArrayList(new String[]{"test-replication-memory", "users", "test-replication-persistent"}).iterator();
        while (it.hasNext()) {
            String str = (String) it.next();
            newArrayList.add(Versioned.value(new Slop(str, Slop.Operation.PUT, byteArray, bytes, (byte[]) null, 1, new Date()), vectorClock));
            VectorClock incremented = vectorClock.incremented(0, System.currentTimeMillis());
            newArrayList.add(Versioned.value(new Slop(str, Slop.Operation.DELETE, byteArray, (byte[]) null, (byte[]) null, 1, new Date()), incremented));
            vectorClock = incremented.incremented(0, System.currentTimeMillis());
            newArrayList.add(Versioned.value(new Slop(str, Slop.Operation.PUT, byteArray, bytes2, (byte[]) null, 1, new Date()), vectorClock));
            VectorClock incremented2 = vectorClock2.incremented(0, System.currentTimeMillis());
            newArrayList.add(Versioned.value(new Slop(str, Slop.Operation.PUT, byteArray2, bytes2, (byte[]) null, 1, new Date()), incremented2));
            vectorClock2 = incremented2.incremented(0, System.currentTimeMillis());
            newArrayList.add(Versioned.value(new Slop(str, Slop.Operation.DELETE, byteArray2, (byte[]) null, (byte[]) null, 1, new Date()), vectorClock2));
        }
        Collections.shuffle(newArrayList);
        List<Versioned<Slop>> subList = newArrayList.subList(0, 7);
        List<Versioned<Slop>> subList2 = newArrayList.subList(7, 15);
        populateSlops(0, asSlopStore, subList);
        StreamingSlopPusherJob streamingSlopPusherJob = new StreamingSlopPusherJob(getVoldemortServer(0).getStoreRepository(), getVoldemortServer(0).getMetadataStore(), new BannagePeriodFailureDetector(new FailureDetectorConfig().setCluster(this.cluster).setStoreVerifier(new ServerStoreVerifier(this.socketStoreFactory, this.metadataStore, this.configs[0]))), this.configs[0], new ScanPermitWrapper(1));
        streamingSlopPusherJob.run();
        populateSlops(0, asSlopStore, subList2);
        streamingSlopPusherJob.run();
        Thread.sleep(2000L);
        Iterator it2 = Lists.newArrayList(new String[]{"test-replication-memory", "users", "test-replication-persistent"}).iterator();
        while (it2.hasNext()) {
            StorageEngine storageEngine = getVoldemortServer(1).getStoreRepository().getStorageEngine((String) it2.next());
            Assert.assertEquals(storageEngine.get(byteArray, (Object) null).size(), 1L);
            Assert.assertEquals(ByteUtils.compare((byte[]) ((Versioned) storageEngine.get(byteArray, (Object) null).get(0)).getValue(), bytes2), 0L);
            Assert.assertEquals(storageEngine.get(byteArray2, (Object) null).size(), 0L);
        }
        stopServers(0, 1);
    }

    @Test
    public void testNormalPush() throws InterruptedException, IOException {
        startServers(0, 1);
        StorageEngine<ByteArray, Slop, byte[]> asSlopStore = getVoldemortServer(0).getStoreRepository().getSlopStore().asSlopStore();
        List<Versioned<Slop>> createRandomSlops = ServerTestUtils.createRandomSlops(1, 100, "test-replication-memory", "users", "test-replication-persistent", "test-readrepair-memory", "test-consistent", "test-consistent-with-pref-list");
        populateSlops(0, asSlopStore, createRandomSlops);
        new StreamingSlopPusherJob(getVoldemortServer(0).getStoreRepository(), getVoldemortServer(0).getMetadataStore(), new BannagePeriodFailureDetector(new FailureDetectorConfig().setCluster(this.cluster).setStoreVerifier(new ServerStoreVerifier(this.socketStoreFactory, this.metadataStore, this.configs[0]))), this.configs[0], new ScanPermitWrapper(1)).run();
        Thread.sleep(2000L);
        ListIterator<Versioned<Slop>> listIterator = createRandomSlops.listIterator();
        while (listIterator.hasNext()) {
            Slop slop = (Slop) listIterator.next().getValue();
            StorageEngine storageEngine = getVoldemortServer(1).getStoreRepository().getStorageEngine(slop.getStoreName());
            if (slop.getOperation().equals(Slop.Operation.PUT)) {
                Assert.assertNotSame("entry should be present at store", 0, Integer.valueOf(storageEngine.get(slop.getKey(), (Object) null).size()));
                Assert.assertEquals("entry value should match", new String(slop.getValue()), new String((byte[]) ((Versioned) storageEngine.get(slop.getKey(), (Object) null).get(0)).getValue()));
            } else if (slop.getOperation().equals(Slop.Operation.DELETE)) {
                Assert.assertEquals("entry value should match", 0L, storageEngine.get(slop.getKey(), (Object) null).size());
            }
            Assert.assertEquals("slop should have gone", 0L, asSlopStore.get(slop.makeKey(), (Object) null).size());
        }
        SlopStorageEngine slopStore = getVoldemortServer(0).getStoreRepository().getSlopStore();
        Assert.assertEquals(slopStore.getOutstandingTotal(), 0L);
        Assert.assertEquals(slopStore.getOutstandingByNode().get(1), new Long(0L));
        Assert.assertEquals(slopStore.getOutstandingByNode().get(2), new Long(0L));
        stopServers(0, 1);
    }

    private void populateSlops(int i, StorageEngine<ByteArray, Slop, byte[]> storageEngine, List<Versioned<Slop>>... listArr) {
        int size = listArr[0].size();
        Iterator[] itArr = new Iterator[listArr.length];
        for (int i2 = 0; i2 < listArr.length; i2++) {
            itArr[i2] = listArr[i2].iterator();
        }
        for (int i3 = 0; i3 < size; i3++) {
            for (int i4 = 0; i4 < listArr.length; i4++) {
                Versioned versioned = (Versioned) itArr[i4].next();
                if (((Slop) versioned.getValue()).getOperation() == Slop.Operation.DELETE) {
                    try {
                        getVoldemortServer(i).getStoreRepository().getStorageEngine(((Slop) versioned.getValue()).getStoreName()).put(((Slop) versioned.getValue()).getKey(), Versioned.value(((Slop) versioned.getValue()).getValue(), versioned.getVersion()), (Object) null);
                    } catch (ObsoleteVersionException e) {
                    }
                }
                try {
                    storageEngine.put(((Slop) versioned.getValue()).makeKey(), versioned, (Object) null);
                } catch (ObsoleteVersionException e2) {
                }
            }
        }
    }

    @Test
    public void testNormalPushBothWays() throws InterruptedException, IOException {
        startServers(0, 1);
        StorageEngine<ByteArray, Slop, byte[]> asSlopStore = getVoldemortServer(0).getStoreRepository().getSlopStore().asSlopStore();
        StorageEngine<ByteArray, Slop, byte[]> asSlopStore2 = getVoldemortServer(1).getStoreRepository().getSlopStore().asSlopStore();
        List<Versioned<Slop>> createRandomSlops = ServerTestUtils.createRandomSlops(1, 100, "test-readrepair-memory", "test-consistent", "test-consistent-with-pref-list");
        List<Versioned<Slop>> createRandomSlops2 = ServerTestUtils.createRandomSlops(0, 100, "test-replication-memory", "users", "test-replication-persistent");
        populateSlops(0, asSlopStore, createRandomSlops);
        populateSlops(1, asSlopStore2, createRandomSlops2);
        StreamingSlopPusherJob streamingSlopPusherJob = new StreamingSlopPusherJob(getVoldemortServer(0).getStoreRepository(), getVoldemortServer(0).getMetadataStore(), new BannagePeriodFailureDetector(new FailureDetectorConfig().setCluster(this.cluster).setStoreVerifier(new ServerStoreVerifier(this.socketStoreFactory, this.metadataStore, this.configs[0]))), this.configs[0], new ScanPermitWrapper(1));
        StreamingSlopPusherJob streamingSlopPusherJob2 = new StreamingSlopPusherJob(getVoldemortServer(1).getStoreRepository(), getVoldemortServer(1).getMetadataStore(), new BannagePeriodFailureDetector(new FailureDetectorConfig().setCluster(this.cluster).setStoreVerifier(new ServerStoreVerifier(this.socketStoreFactory, this.metadataStore, this.configs[1]))), this.configs[1], new ScanPermitWrapper(1));
        streamingSlopPusherJob.run();
        streamingSlopPusherJob2.run();
        Thread.sleep(2000L);
        ListIterator<Versioned<Slop>> listIterator = createRandomSlops.listIterator();
        while (listIterator.hasNext()) {
            Slop slop = (Slop) listIterator.next().getValue();
            StorageEngine storageEngine = getVoldemortServer(1).getStoreRepository().getStorageEngine(slop.getStoreName());
            if (slop.getOperation().equals(Slop.Operation.PUT)) {
                Assert.assertNotSame("entry should be present at store", 0, Integer.valueOf(storageEngine.get(slop.getKey(), (Object) null).size()));
                Assert.assertEquals("entry value should match", new String(slop.getValue()), new String((byte[]) ((Versioned) storageEngine.get(slop.getKey(), (Object) null).get(0)).getValue()));
            } else if (slop.getOperation().equals(Slop.Operation.DELETE)) {
                Assert.assertEquals("entry value should match", 0L, storageEngine.get(slop.getKey(), (Object) null).size());
            }
            Assert.assertEquals("slop should have gone", 0L, asSlopStore.get(slop.makeKey(), (Object) null).size());
        }
        ListIterator<Versioned<Slop>> listIterator2 = createRandomSlops2.listIterator();
        while (listIterator2.hasNext()) {
            Slop slop2 = (Slop) listIterator2.next().getValue();
            StorageEngine storageEngine2 = getVoldemortServer(0).getStoreRepository().getStorageEngine(slop2.getStoreName());
            if (slop2.getOperation().equals(Slop.Operation.PUT)) {
                Assert.assertNotSame("entry should be present at store", 0, Integer.valueOf(storageEngine2.get(slop2.getKey(), (Object) null).size()));
                Assert.assertEquals("entry value should match", new String(slop2.getValue()), new String((byte[]) ((Versioned) storageEngine2.get(slop2.getKey(), (Object) null).get(0)).getValue()));
            } else if (slop2.getOperation().equals(Slop.Operation.DELETE)) {
                Assert.assertEquals("entry value should match", 0L, storageEngine2.get(slop2.getKey(), (Object) null).size());
            }
            Assert.assertEquals("slop should have gone", 0L, asSlopStore2.get(slop2.makeKey(), (Object) null).size());
        }
        SlopStorageEngine slopStore = getVoldemortServer(0).getStoreRepository().getSlopStore();
        Assert.assertEquals(slopStore.getOutstandingTotal(), 0L);
        Assert.assertEquals(slopStore.getOutstandingByNode().get(1), new Long(0L));
        SlopStorageEngine slopStore2 = getVoldemortServer(1).getStoreRepository().getSlopStore();
        Assert.assertEquals(slopStore2.getOutstandingTotal(), 0L);
        Assert.assertEquals(slopStore2.getOutstandingByNode().get(0), new Long(0L));
        stopServers(0, 1);
    }

    @Test
    public void testServerReplacementWithoutBounce() throws IOException, InterruptedException {
        startServers(0, 2);
        StorageEngine<ByteArray, Slop, byte[]> asSlopStore = getVoldemortServer(0).getStoreRepository().getSlopStore().asSlopStore();
        List<Versioned<Slop>> createRandomSlops = ServerTestUtils.createRandomSlops(1, 50, "test-replication-memory", "users", "test-replication-persistent", "test-readrepair-memory", "test-consistent", "test-consistent-with-pref-list");
        List<Versioned<Slop>> createRandomSlops2 = ServerTestUtils.createRandomSlops(2, 50, "test-replication-memory", "users", "test-replication-persistent", "test-readrepair-memory", "test-consistent", "test-consistent-with-pref-list");
        populateSlops(0, asSlopStore, createRandomSlops, createRandomSlops2);
        StreamingSlopPusherJob streamingSlopPusherJob = new StreamingSlopPusherJob(getVoldemortServer(0).getStoreRepository(), getVoldemortServer(0).getMetadataStore(), new BannagePeriodFailureDetector(new FailureDetectorConfig().setCluster(this.cluster).setStoreVerifier(new ServerStoreVerifier(this.socketStoreFactory, this.metadataStore, this.configs[0]))), this.configs[0], new ScanPermitWrapper(1));
        streamingSlopPusherJob.run();
        Thread.sleep(10000L);
        ListIterator<Versioned<Slop>> listIterator = createRandomSlops2.listIterator();
        while (listIterator.hasNext()) {
            Slop slop = (Slop) listIterator.next().getValue();
            StorageEngine storageEngine = getVoldemortServer(2).getStoreRepository().getStorageEngine(slop.getStoreName());
            if (slop.getOperation().equals(Slop.Operation.PUT)) {
                Assert.assertNotSame("entry should be present at store", 0, Integer.valueOf(storageEngine.get(slop.getKey(), (Object) null).size()));
                Assert.assertEquals("entry value should match", new String(slop.getValue()), new String((byte[]) ((Versioned) storageEngine.get(slop.getKey(), (Object) null).get(0)).getValue()));
            } else if (slop.getOperation().equals(Slop.Operation.DELETE)) {
                Assert.assertEquals("entry value should match", 0L, storageEngine.get(slop.getKey(), (Object) null).size());
            }
            Assert.assertEquals("slop should have gone", 0L, asSlopStore.get(slop.makeKey(), (Object) null).size());
        }
        ListIterator<Versioned<Slop>> listIterator2 = createRandomSlops.listIterator();
        while (listIterator2.hasNext()) {
            Assert.assertNotSame("slop should be there", 0, Integer.valueOf(asSlopStore.get(((Slop) listIterator2.next().getValue()).makeKey(), (Object) null).size()));
        }
        SlopStorageEngine slopStore = getVoldemortServer(0).getStoreRepository().getSlopStore();
        Assert.assertEquals(slopStore.getOutstandingTotal(), 50L);
        Assert.assertEquals(slopStore.getOutstandingByNode().get(1), new Long(50L));
        Assert.assertEquals(slopStore.getOutstandingByNode().get(2), new Long(0L));
        this.cluster = ServerTestUtils.updateClusterWithNewHost(this.cluster, 1);
        startServers(1);
        this.servers[0].getMetadataStore().put("cluster.xml", this.cluster);
        this.servers[2].getMetadataStore().put("cluster.xml", this.cluster);
        Thread.sleep(35000L);
        streamingSlopPusherJob.run();
        Thread.sleep(10000L);
        ListIterator<Versioned<Slop>> listIterator3 = createRandomSlops.listIterator();
        while (listIterator3.hasNext()) {
            Slop slop2 = (Slop) listIterator3.next().getValue();
            StorageEngine storageEngine2 = getVoldemortServer(1).getStoreRepository().getStorageEngine(slop2.getStoreName());
            if (slop2.getOperation().equals(Slop.Operation.PUT)) {
                Assert.assertNotSame("entry should be present at store", 0, Integer.valueOf(storageEngine2.get(slop2.getKey(), (Object) null).size()));
                Assert.assertEquals("entry value should match", new String(slop2.getValue()), new String((byte[]) ((Versioned) storageEngine2.get(slop2.getKey(), (Object) null).get(0)).getValue()));
            } else if (slop2.getOperation().equals(Slop.Operation.DELETE)) {
                Assert.assertEquals("entry value should match", 0L, storageEngine2.get(slop2.getKey(), (Object) null).size());
            }
            Assert.assertEquals("slop should have gone", 0L, asSlopStore.get(slop2.makeKey(), (Object) null).size());
        }
    }
}
