package voldemort.client.protocol.admin;

import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.Callable;
import org.apache.commons.io.FileUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import voldemort.ServerTestUtils;
import voldemort.TestUtils;
import voldemort.client.RoutingTier;
import voldemort.client.protocol.VoldemortFilter;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.performance.benchmark.Benchmark;
import voldemort.routing.RoutingStrategy;
import voldemort.routing.RoutingStrategyFactory;
import voldemort.serialization.DefaultSerializerFactory;
import voldemort.serialization.Serializer;
import voldemort.serialization.SerializerDefinition;
import voldemort.serialization.SerializerFactory;
import voldemort.server.VoldemortServer;
import voldemort.store.StoreDefinition;
import voldemort.store.StoreDefinitionBuilder;
import voldemort.store.compress.CompressionStrategy;
import voldemort.store.compress.CompressionStrategyFactory;
import voldemort.store.socket.SocketStoreFactory;
import voldemort.store.socket.clientrequest.ClientRequestExecutorPool;
import voldemort.utils.ByteArray;
import voldemort.utils.Props;
import voldemort.versioning.Versioned;
import voldemort.xml.StoreDefinitionsMapper;

/* loaded from: input_file:voldemort/client/protocol/admin/StreamingClientTest.class */
public class StreamingClientTest {
    private static long startTime;
    public static final String SERVER_LOCAL_URL = "tcp://localhost:";
    public static final String TEST_STORE_NAME = "test-store-streaming-1";
    public static final String STORES_XML_FILE = "test/common/voldemort/config/stores.xml";
    public static final int TOTAL_SERVERS = 2;
    private static AdminClient adminClient;
    private static StoreDefinition storeDef;
    private static int NUM_KEYS_1 = 4000;
    private static SocketStoreFactory socketStoreFactory = new ClientRequestExecutorPool(2, 10000, 100000, 32768);
    private static VoldemortServer[] servers = null;
    private static int[] serverPorts = null;
    private static Cluster cluster = ServerTestUtils.getLocalCluster(2, new int[]{new int[]{0, 1, 2, 3}, new int[]{4, 5, 6, 7}});
    private static SerializerFactory serializerFactory = new DefaultSerializerFactory();

    @BeforeClass
    public static void testSetup() {
        if (null == servers) {
            servers = new VoldemortServer[2];
            serverPorts = new int[2];
            storeDef = new StoreDefinitionBuilder().setName(TEST_STORE_NAME).setType("memory").setKeySerializer(new SerializerDefinition(Benchmark.STRING_KEY_TYPE)).setValueSerializer(new SerializerDefinition(Benchmark.STRING_KEY_TYPE)).setRoutingPolicy(RoutingTier.SERVER).setRoutingStrategyType("consistent-routing").setReplicationFactor(2).setPreferredReads(1).setRequiredReads(1).setPreferredWrites(2).setRequiredWrites(2).build();
            File file = new File(TestUtils.createTempDir(), "stores.xml");
            try {
                FileUtils.writeStringToFile(file, new StoreDefinitionsMapper().writeStoreList(Lists.newArrayList(new StoreDefinition[]{storeDef})));
            } catch (IOException e) {
                e.printStackTrace();
            }
            for (int i = 0; i < 2; i++) {
                try {
                    servers[i] = ServerTestUtils.startVoldemortServer(socketStoreFactory, ServerTestUtils.createServerConfig(true, i, TestUtils.createTempDir().getAbsolutePath(), null, file.getAbsolutePath(), new Properties()), cluster);
                } catch (IOException e2) {
                    e2.printStackTrace();
                }
                serverPorts[i] = servers[i].getIdentityNode().getSocketPort();
            }
            adminClient = ServerTestUtils.getAdminClient(cluster);
        }
        startTime = System.currentTimeMillis();
    }

    @AfterClass
    public static void testCleanup() {
    }

    @Test
    public void testStreaming() {
        Props props = new Props();
        props.put("streaming.platform.bootstrapURL", "tcp://localhost:" + serverPorts[0]);
        StreamingClient streamingClient = new StreamingClient(new StreamingClientConfig(props));
        streamingClient.initStreamingSession(TEST_STORE_NAME, new Callable<Object>() { // from class: voldemort.client.protocol.admin.StreamingClientTest.1
            @Override // java.util.concurrent.Callable
            public Object call() throws Exception {
                return null;
            }
        }, new Callable<Object>() { // from class: voldemort.client.protocol.admin.StreamingClientTest.2
            @Override // java.util.concurrent.Callable
            public Object call() throws Exception {
                return null;
            }
        }, true);
        for (int i = 0; i < NUM_KEYS_1; i++) {
            String str = i + "";
            streamingClient.streamingPut(new ByteArray(str.getBytes()), Versioned.value(str.getBytes()));
        }
        streamingClient.commitToVoldemort();
        streamingClient.closeStreamingSession();
        Assert.assertEquals(Boolean.valueOf(verifyKeysExist()), true);
    }

    public boolean verifyKeysExist() {
        byte[] inflate;
        RoutingStrategy updateRoutingStrategy = new RoutingStrategyFactory().updateRoutingStrategy(storeDef, adminClient.getAdminClientCluster());
        HashMap hashMap = new HashMap();
        Collection<Node> nodes = adminClient.getAdminClientCluster().getNodes();
        Iterator it = nodes.iterator();
        while (it.hasNext()) {
            hashMap.put(Integer.valueOf(((Node) it.next()).getId()), new ArrayList());
        }
        for (int i = 0; i < NUM_KEYS_1; i++) {
            String str = i + "";
            Iterator it2 = updateRoutingStrategy.routeRequest(str.getBytes()).iterator();
            while (it2.hasNext()) {
                ((ArrayList) hashMap.get(Integer.valueOf(((Node) it2.next()).getId()))).add(str);
            }
        }
        ArrayList arrayList = new ArrayList();
        for (Node node : nodes) {
            ArrayList newArrayList = Lists.newArrayList();
            newArrayList.addAll(node.getPartitionIds());
            Iterator fetchKeys = adminClient.bulkFetchOps.fetchKeys(node.getId(), TEST_STORE_NAME, newArrayList, (VoldemortFilter) null, false);
            SerializerDefinition keySerializer = storeDef.getKeySerializer();
            Serializer serializer = new DefaultSerializerFactory().getSerializer(keySerializer);
            CompressionStrategy compressionStrategy = (keySerializer == null || !keySerializer.hasCompression()) ? null : new CompressionStrategyFactory().get(keySerializer.getCompression());
            while (fetchKeys.hasNext()) {
                byte[] bArr = ((ByteArray) fetchKeys.next()).get();
                if (null == compressionStrategy) {
                    inflate = bArr;
                } else {
                    try {
                        inflate = compressionStrategy.inflate(bArr);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                arrayList.add((String) serializer.toObject(inflate));
            }
        }
        return arrayList.containsAll((ArrayList) hashMap.get(0));
    }
}
