package voldemort.client.protocol.admin;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import voldemort.VoldemortException;
import voldemort.client.protocol.RequestFormatType;
import voldemort.client.protocol.pb.ProtoUtils;
import voldemort.client.protocol.pb.VAdminProto;
import voldemort.cluster.Node;
import voldemort.routing.RoutingStrategy;
import voldemort.routing.RoutingStrategyFactory;
import voldemort.store.StoreDefinition;
import voldemort.store.socket.SocketDestination;
import voldemort.utils.ByteArray;
import voldemort.utils.EventThrottler;
import voldemort.utils.Pair;
import voldemort.versioning.Versioned;

/* loaded from: input_file:voldemort/client/protocol/admin/StreamingClient.class */
public class StreamingClient {
    private SocketPool streamingSocketPool;
    List<StoreDefinition> remoteStoreDefs;
    protected RoutingStrategy routingStrategy;
    boolean newBatch;
    boolean isMultiSession;
    ExecutorService streamingresults;
    private static int CHECKPOINT_COMMIT_SIZE;
    private static int THROTTLE_QPS;
    private int entriesProcessed;
    protected EventThrottler throttler;
    AdminClient adminClient;
    AdminClientConfig adminClientConfig;
    String bootstrapURL;
    private HashMap<String, RoutingStrategy> storeToRoutingStrategy;
    private HashMap<Pair<String, Integer>, Boolean> nodeIdStoreInitialized;
    private HashMap<Pair<String, Integer>, SocketDestination> nodeIdStoreToSocketRequest;
    private HashMap<Pair<String, Integer>, DataOutputStream> nodeIdStoreToOutputStreamRequest;
    private HashMap<Pair<String, Integer>, DataInputStream> nodeIdStoreToInputStreamRequest;
    private HashMap<Pair<String, Integer>, SocketAndStreams> nodeIdStoreToSocketAndStreams;
    private List<String> storeNames;
    private List<Node> nodesToStream;
    private List<Integer> blackListedNodes;
    private static final int MAX_STORES_PER_SESSION = 100;
    private static final Logger logger = Logger.getLogger(StreamingClient.class);
    private static int TIME_COMMIT_SIZE = 30;
    private static boolean MARKED_BAD = false;
    private Callable checkpointCallback = null;
    private Callable recoveryCallback = null;
    private boolean allowMerge = false;
    boolean cleanedUp = false;
    Calendar calendar = Calendar.getInstance();

    public StreamingClient(StreamingClientConfig streamingClientConfig) {
        this.bootstrapURL = streamingClientConfig.getBootstrapURL();
        CHECKPOINT_COMMIT_SIZE = streamingClientConfig.getBatchSize();
        THROTTLE_QPS = streamingClientConfig.getThrottleQPS();
    }

    public synchronized void updateThrottleLimit(int i) {
        THROTTLE_QPS = i;
        this.throttler = new EventThrottler(THROTTLE_QPS);
    }

    public synchronized void initStreamingSession(String str, Callable callable, Callable callable2, boolean z) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(str);
        initStreamingSessions(arrayList, callable, callable2, z);
    }

    public synchronized void streamingPut(ByteArray byteArray, Versioned<byte[]> versioned) {
        if (MARKED_BAD) {
            logger.error("Cannot stream more entries since Recovery Callback Failed!");
            throw new VoldemortException("Cannot stream more entries since Recovery Callback Failed!");
        }
        Iterator<String> it = this.storeNames.iterator();
        while (it.hasNext()) {
            streamingPut(byteArray, versioned, it.next());
        }
    }

    public synchronized void closeStreamingSession(Callable callable) {
        closeStreamingSessions(callable);
    }

    public synchronized void closeStreamingSession() {
        closeStreamingSessions();
    }

    private void close(Socket socket) {
        try {
            socket.close();
        } catch (IOException e) {
            logger.warn("Failed to close socket");
        }
    }

    protected void finalize() {
        if (this.cleanedUp) {
            return;
        }
        cleanupSessions();
    }

    public synchronized void initStreamingSessions(List<String> list, Callable callable, Callable callable2, boolean z) {
        initStreamingSessions(list, callable, callable2, z, null);
    }

    public synchronized void initStreamingSessions(List<String> list, Callable callable, Callable callable2, boolean z, List<Integer> list2) {
        logger.info("Initializing a streaming session");
        this.adminClientConfig = new AdminClientConfig();
        this.adminClient = new AdminClient(this.bootstrapURL, this.adminClientConfig);
        this.checkpointCallback = callable;
        this.recoveryCallback = callable2;
        this.allowMerge = z;
        this.streamingresults = Executors.newFixedThreadPool(3);
        this.entriesProcessed = 0;
        this.newBatch = true;
        this.isMultiSession = true;
        this.storeNames = new ArrayList();
        this.throttler = new EventThrottler(THROTTLE_QPS);
        TimeUnit timeUnit = TimeUnit.SECONDS;
        Collection<Node> nodes = this.adminClient.getAdminClientCluster().getNodes();
        this.nodesToStream = new ArrayList();
        if (list2 != null && list2.size() > 0) {
            this.blackListedNodes = list2;
        }
        for (Node node : nodes) {
            if (list2 == null || list2.size() <= 0) {
                this.nodesToStream.add(node);
            } else if (!list2.contains(Integer.valueOf(node.getId()))) {
                this.nodesToStream.add(node);
            }
        }
        this.streamingSocketPool = new SocketPool(this.adminClient.getAdminClientCluster().getNumberOfNodes() * MAX_STORES_PER_SESSION, (int) timeUnit.toMillis(this.adminClientConfig.getAdminConnectionTimeoutSec()), (int) timeUnit.toMillis(this.adminClientConfig.getAdminSocketTimeoutSec()), this.adminClientConfig.getAdminSocketBufferSize(), this.adminClientConfig.getAdminSocketKeepAlive());
        this.nodeIdStoreToSocketRequest = new HashMap<>();
        this.nodeIdStoreToOutputStreamRequest = new HashMap<>();
        this.nodeIdStoreToInputStreamRequest = new HashMap<>();
        this.nodeIdStoreInitialized = new HashMap<>();
        this.storeToRoutingStrategy = new HashMap<>();
        this.nodeIdStoreToSocketAndStreams = new HashMap<>();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            addStoreToSession(it.next());
        }
    }

    private void addStoreToSession(String str) {
        this.storeNames.add(str);
        for (Node node : this.nodesToStream) {
            SocketDestination socketDestination = new SocketDestination(node.getHost(), node.getAdminPort(), RequestFormatType.ADMIN_PROTOCOL_BUFFERS);
            SocketAndStreams checkout = this.streamingSocketPool.checkout(socketDestination);
            try {
                DataOutputStream outputStream = checkout.getOutputStream();
                DataInputStream inputStream = checkout.getInputStream();
                this.nodeIdStoreToSocketRequest.put(new Pair<>(str, Integer.valueOf(node.getId())), socketDestination);
                this.nodeIdStoreToOutputStreamRequest.put(new Pair<>(str, Integer.valueOf(node.getId())), outputStream);
                this.nodeIdStoreToInputStreamRequest.put(new Pair<>(str, Integer.valueOf(node.getId())), inputStream);
                this.nodeIdStoreToSocketAndStreams.put(new Pair<>(str, Integer.valueOf(node.getId())), checkout);
                this.nodeIdStoreInitialized.put(new Pair<>(str, Integer.valueOf(node.getId())), false);
                this.remoteStoreDefs = this.adminClient.metadataMgmtOps.getRemoteStoreDefList(node.getId()).getValue();
            } catch (Exception e) {
                close(checkout.getSocket());
                this.streamingSocketPool.checkin(socketDestination, checkout);
                throw new VoldemortException(e);
            }
        }
        boolean z = false;
        Iterator<StoreDefinition> it = this.remoteStoreDefs.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            StoreDefinition next = it.next();
            if (next.getName().equals(str)) {
                this.storeToRoutingStrategy.put(str, new RoutingStrategyFactory().updateRoutingStrategy(next, this.adminClient.getAdminClientCluster()));
                z = true;
                break;
            }
        }
        if (z) {
            return;
        }
        logger.error("Store Name not found on the cluster");
        throw new VoldemortException("Store Name not found on the cluster");
    }

    public synchronized void removeStoreFromSession(List<String> list) {
        logger.info("closing the Streaming session for a few stores");
        commitToVoldemort(list);
        cleanupSessions(list);
    }

    public synchronized void streamingPut(ByteArray byteArray, Versioned<byte[]> versioned, String str) {
        if (!this.storeNames.contains(str)) {
            addStoreToSession(str);
        }
        if (MARKED_BAD) {
            logger.error("Cannot stream more entries since Recovery Callback Failed!");
            throw new VoldemortException("Cannot stream more entries since Recovery Callback Failed! You Need to restart the session");
        }
        for (Node node : this.storeToRoutingStrategy.get(str).routeRequest(byteArray.get())) {
            if (this.blackListedNodes == null || this.blackListedNodes.size() <= 0 || !this.blackListedNodes.contains(Integer.valueOf(node.getId()))) {
                VAdminProto.UpdatePartitionEntriesRequest.Builder partitionEntry = VAdminProto.UpdatePartitionEntriesRequest.newBuilder().setStore(str).setPartitionEntry(VAdminProto.PartitionEntry.newBuilder().setKey(ProtoUtils.encodeBytes(byteArray)).setVersioned(ProtoUtils.encodeVersioned(versioned)).build());
                DataOutputStream dataOutputStream = this.nodeIdStoreToOutputStreamRequest.get(new Pair(str, Integer.valueOf(node.getId())));
                try {
                    if (this.nodeIdStoreInitialized.get(new Pair(str, Integer.valueOf(node.getId()))).booleanValue()) {
                        ProtoUtils.writeMessage(dataOutputStream, partitionEntry.build());
                    } else {
                        ProtoUtils.writeMessage(dataOutputStream, VAdminProto.VoldemortAdminRequest.newBuilder().setType(VAdminProto.AdminRequestType.UPDATE_PARTITION_ENTRIES).setUpdatePartitionEntries(partitionEntry).build());
                        dataOutputStream.flush();
                        this.nodeIdStoreInitialized.put(new Pair<>(str, Integer.valueOf(node.getId())), true);
                    }
                    this.entriesProcessed++;
                } catch (IOException e) {
                    logger.warn("Invoking the Recovery Callback");
                    try {
                        this.streamingresults.submit(this.recoveryCallback).get();
                        e.printStackTrace();
                    } catch (InterruptedException e2) {
                        MARKED_BAD = true;
                        logger.error("Recovery Callback failed");
                        e2.printStackTrace();
                        throw new VoldemortException("Recovery Callback failed");
                    } catch (ExecutionException e3) {
                        MARKED_BAD = true;
                        logger.error("Recovery Callback failed");
                        e3.printStackTrace();
                        throw new VoldemortException("Recovery Callback failed");
                    }
                }
            }
        }
        int i = this.calendar.get(13);
        if (this.entriesProcessed == CHECKPOINT_COMMIT_SIZE || i % TIME_COMMIT_SIZE == 0) {
            this.entriesProcessed = 0;
            commitToVoldemort();
        }
        this.throttler.maybeThrottle(1);
    }

    public synchronized void commitToVoldemort() {
        this.entriesProcessed = 0;
        commitToVoldemort(this.storeNames);
    }

    private void commitToVoldemort(List<String> list) {
        if (logger.isDebugEnabled()) {
            logger.debug("Trying to commit to Voldemort");
        }
        for (Node node : this.nodesToStream) {
            for (String str : list) {
                if (this.nodeIdStoreInitialized.get(new Pair(str, Integer.valueOf(node.getId()))).booleanValue()) {
                    this.nodeIdStoreInitialized.put(new Pair<>(str, Integer.valueOf(node.getId())), false);
                    DataOutputStream dataOutputStream = this.nodeIdStoreToOutputStreamRequest.get(new Pair(str, Integer.valueOf(node.getId())));
                    try {
                        ProtoUtils.writeEndOfStream(dataOutputStream);
                        dataOutputStream.flush();
                        if (ProtoUtils.readToBuilder(this.nodeIdStoreToInputStreamRequest.get(new Pair(str, Integer.valueOf(node.getId()))), VAdminProto.UpdatePartitionEntriesResponse.newBuilder()).hasError()) {
                            logger.warn("Invoking the Recovery Callback");
                            try {
                                this.streamingresults.submit(this.recoveryCallback).get();
                            } catch (InterruptedException e) {
                                MARKED_BAD = true;
                                logger.error("Recovery Callback failed");
                                e.printStackTrace();
                                throw new VoldemortException("Recovery Callback failed");
                            } catch (ExecutionException e2) {
                                MARKED_BAD = true;
                                logger.error("Recovery Callback failed");
                                e2.printStackTrace();
                                throw new VoldemortException("Recovery Callback failed");
                            }
                        } else {
                            if (logger.isDebugEnabled()) {
                                logger.debug("Commit successful");
                                logger.debug("calling checkpoint callback");
                            }
                            try {
                                this.streamingresults.submit(this.checkpointCallback).get();
                            } catch (InterruptedException e3) {
                                logger.warn("Checkpoint callback failed!");
                                e3.printStackTrace();
                            } catch (ExecutionException e4) {
                                logger.warn("Checkpoint callback failed!");
                                e4.printStackTrace();
                            }
                        }
                    } catch (IOException e5) {
                        logger.warn("Invoking the Recovery Callback");
                        this.streamingresults.submit(this.recoveryCallback).get();
                        e5.printStackTrace();
                    }
                    logger.warn("Invoking the Recovery Callback");
                    try {
                        this.streamingresults.submit(this.recoveryCallback).get();
                        e5.printStackTrace();
                    } catch (InterruptedException e6) {
                        MARKED_BAD = true;
                        logger.error("Recovery Callback failed");
                        e6.printStackTrace();
                        throw new VoldemortException("Recovery Callback failed");
                    } catch (ExecutionException e7) {
                        MARKED_BAD = true;
                        logger.error("Recovery Callback failed");
                        e7.printStackTrace();
                        throw new VoldemortException("Recovery Callback failed");
                    }
                }
            }
        }
    }

    public synchronized void closeStreamingSessions(Callable callable) {
        closeStreamingSessions();
        try {
            this.streamingresults.submit(callable).get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e2) {
            e2.printStackTrace();
        }
    }

    public synchronized void closeStreamingSessions() {
        logger.info("closing the Streaming session");
        commitToVoldemort();
        cleanupSessions();
    }

    private void cleanupSessions() {
        cleanupSessions(this.storeNames);
    }

    private void cleanupSessions(List<String> list) {
        logger.info("Performing cleanup");
        for (String str : list) {
            for (Node node : this.nodesToStream) {
                SocketAndStreams socketAndStreams = this.nodeIdStoreToSocketAndStreams.get(new Pair(str, Integer.valueOf(node.getId())));
                close(socketAndStreams.getSocket());
                this.streamingSocketPool.checkin(this.nodeIdStoreToSocketRequest.get(new Pair(str, Integer.valueOf(node.getId()))), socketAndStreams);
            }
        }
        this.cleanedUp = true;
    }
}
