package voldemort.server.protocol.admin;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;
import org.apache.log4j.Logger;
import voldemort.VoldemortException;
import voldemort.client.protocol.pb.ProtoUtils;
import voldemort.client.protocol.pb.VAdminProto;
import voldemort.server.StoreRepository;
import voldemort.server.VoldemortConfig;
import voldemort.server.protocol.StreamRequestHandler;
import voldemort.store.metadata.MetadataStore;
import voldemort.store.readonly.ReadOnlyStorageConfiguration;
import voldemort.store.readonly.ReadOnlyStorageEngine;
import voldemort.store.stats.StreamingStats;
import voldemort.utils.EventThrottler;
import voldemort.utils.Pair;
import voldemort.utils.RebalanceUtils;

/* loaded from: input_file:voldemort/server/protocol/admin/FetchPartitionFileStreamRequestHandler.class */
public class FetchPartitionFileStreamRequestHandler implements StreamRequestHandler {
    private final VAdminProto.FetchPartitionFilesRequest request;
    private final EventThrottler throttler;
    private final File storeDir;
    private final Logger logger = Logger.getLogger(getClass());
    private final long blockSize;
    private final StreamingStats streamStats;
    private final Iterator<Pair<Integer, Integer>> partitionIterator;
    private FetchStatus fetchStatus;
    private int currentChunkId;
    private int numChunks;
    private Pair<Integer, Integer> currentPair;
    private File indexFile;
    private File dataFile;
    private ChunkedFileWriter chunkedFileWriter;
    private final Set<Pair<Integer, Integer>> replicaToPartitionTuples;
    private final HashMap<Object, Integer> bucketToNumChunks;
    private final boolean nioEnabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:voldemort/server/protocol/admin/FetchPartitionFileStreamRequestHandler$ChunkedFileWriter.class */
    public class ChunkedFileWriter {
        private final File fileToWrite;
        private final DataOutputStream outStream;
        private final FileChannel dataChannel;
        private final WritableByteChannel outChannel;
        private long currentPos = 0;

        public ChunkedFileWriter(File file, DataOutputStream dataOutputStream) throws FileNotFoundException {
            this.fileToWrite = file;
            this.outStream = dataOutputStream;
            this.dataChannel = new FileInputStream(file).getChannel();
            this.outChannel = Channels.newChannel(this.outStream);
        }

        public void close() throws IOException {
            this.dataChannel.close();
            if (FetchPartitionFileStreamRequestHandler.this.nioEnabled) {
                this.outChannel.close();
            }
        }

        public void writeHeader() throws IOException {
            VAdminProto.FileEntry build = VAdminProto.FileEntry.newBuilder().setFileName(this.fileToWrite.getName()).setFileSizeBytes(this.dataChannel.size()).build();
            ProtoUtils.writeMessage(this.outStream, build);
            FetchPartitionFileStreamRequestHandler.this.throttler.maybeThrottle(build.getSerializedSize());
        }

        public long streamFile() throws IOException {
            long size = this.dataChannel.size() - this.currentPos;
            if (0 < size) {
                long transferTo = this.dataChannel.transferTo(this.currentPos, Math.min(size, FetchPartitionFileStreamRequestHandler.this.blockSize), this.outChannel);
                this.currentPos += transferTo;
                FetchPartitionFileStreamRequestHandler.this.logger.debug(transferTo + " bytes written");
                FetchPartitionFileStreamRequestHandler.this.throttler.maybeThrottle((int) transferTo);
            }
            return this.dataChannel.size() - this.currentPos;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:voldemort/server/protocol/admin/FetchPartitionFileStreamRequestHandler$FetchStatus.class */
    public enum FetchStatus {
        NEXT_PARTITION,
        SEND_DATA_FILE,
        SEND_INDEX_FILE
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FetchPartitionFileStreamRequestHandler(VAdminProto.FetchPartitionFilesRequest fetchPartitionFilesRequest, MetadataStore metadataStore, VoldemortConfig voldemortConfig, StoreRepository storeRepository) {
        this.request = fetchPartitionFilesRequest;
        if (!(metadataStore.getStoreDef(fetchPartitionFilesRequest.getStore()).getType().compareTo(ReadOnlyStorageConfiguration.TYPE_NAME) == 0)) {
            throw new VoldemortException("Should be fetching partition files only for read-only stores");
        }
        this.replicaToPartitionTuples = RebalanceUtils.flattenPartitionTuples(ProtoUtils.decodePartitionTuple(fetchPartitionFilesRequest.getReplicaToPartitionList()));
        ReadOnlyStorageEngine readOnlyStorageEngine = AdminServiceRequestHandler.getReadOnlyStorageEngine(metadataStore, storeRepository, fetchPartitionFilesRequest.getStore());
        this.bucketToNumChunks = readOnlyStorageEngine.getChunkedFileSet().getChunkIdToNumChunks();
        this.blockSize = voldemortConfig.getAllProps().getLong("partition.buffer.size.bytes", voldemortConfig.getAdminSocketBufferSize());
        this.storeDir = new File(readOnlyStorageEngine.getCurrentDirPath());
        this.throttler = new EventThrottler(voldemortConfig.getStreamMaxReadBytesPerSec());
        if (voldemortConfig.isJmxEnabled()) {
            this.streamStats = storeRepository.getStreamingStats(readOnlyStorageEngine.getName());
        } else {
            this.streamStats = null;
        }
        this.partitionIterator = Collections.unmodifiableSet(this.replicaToPartitionTuples).iterator();
        this.fetchStatus = FetchStatus.NEXT_PARTITION;
        this.currentChunkId = 0;
        this.indexFile = null;
        this.dataFile = null;
        this.chunkedFileWriter = null;
        this.numChunks = 0;
        this.nioEnabled = voldemortConfig.getUseNioConnector();
    }

    @Override // voldemort.server.protocol.StreamRequestHandler
    public StreamRequestHandler.StreamRequestDirection getDirection() {
        return StreamRequestHandler.StreamRequestDirection.WRITING;
    }

    @Override // voldemort.server.protocol.StreamRequestHandler
    public final void close(DataOutputStream dataOutputStream) throws IOException {
        ProtoUtils.writeEndOfStream(dataOutputStream);
    }

    @Override // voldemort.server.protocol.StreamRequestHandler
    public final void handleError(DataOutputStream dataOutputStream, VoldemortException voldemortException) throws IOException {
        this.logger.error("handleFetchPartitionFilesEntries failed for request(" + this.request.toString() + ")", voldemortException);
    }

    @Override // voldemort.server.protocol.StreamRequestHandler
    public StreamRequestHandler.StreamRequestHandlerState handleRequest(DataInputStream dataInputStream, DataOutputStream dataOutputStream) throws IOException {
        StreamRequestHandler.StreamRequestHandlerState streamRequestHandlerState = StreamRequestHandler.StreamRequestHandlerState.WRITING;
        switch (this.fetchStatus) {
            case NEXT_PARTITION:
                streamRequestHandlerState = handleNextPartition();
                break;
            case SEND_DATA_FILE:
                handleSendDataFile(dataOutputStream);
                break;
            case SEND_INDEX_FILE:
                handleSendIndexFile();
                break;
            default:
                throw new VoldemortException("Invalid fetch status " + this.fetchStatus);
        }
        return streamRequestHandlerState;
    }

    private void handleSendIndexFile() throws IOException {
        if (0 != this.chunkedFileWriter.streamFile()) {
            this.fetchStatus = FetchStatus.SEND_INDEX_FILE;
            return;
        }
        this.logger.info("Completed streaming " + this.indexFile.getAbsolutePath());
        this.chunkedFileWriter.close();
        this.currentChunkId++;
        this.indexFile = null;
        this.dataFile = null;
        if (this.streamStats != null) {
            this.streamStats.reportStreamingFetch(StreamingStats.Operation.FETCH_FILE);
        }
        if (this.currentChunkId >= this.numChunks) {
            this.fetchStatus = FetchStatus.NEXT_PARTITION;
        } else {
            this.fetchStatus = FetchStatus.SEND_DATA_FILE;
        }
    }

    private void handleSendDataFile(DataOutputStream dataOutputStream) throws IOException {
        if (null == this.dataFile && null == this.indexFile) {
            String str = Integer.toString(this.currentPair.getSecond().intValue()) + "_" + Integer.toString(this.currentPair.getFirst().intValue()) + "_" + Integer.toString(this.currentChunkId);
            this.dataFile = new File(this.storeDir, str + ".data");
            this.indexFile = new File(this.storeDir, str + ".index");
            this.chunkedFileWriter = new ChunkedFileWriter(this.dataFile, dataOutputStream);
            this.logger.info("Streaming " + this.dataFile.getAbsolutePath());
            this.chunkedFileWriter.writeHeader();
        }
        if (0 != this.chunkedFileWriter.streamFile()) {
            this.fetchStatus = FetchStatus.SEND_DATA_FILE;
            return;
        }
        this.logger.info("Completed streaming " + this.dataFile.getAbsolutePath());
        this.chunkedFileWriter.close();
        this.chunkedFileWriter = new ChunkedFileWriter(this.indexFile, dataOutputStream);
        this.logger.info("Streaming " + this.indexFile.getAbsolutePath());
        this.chunkedFileWriter.writeHeader();
        this.fetchStatus = FetchStatus.SEND_INDEX_FILE;
    }

    private StreamRequestHandler.StreamRequestHandlerState handleNextPartition() {
        StreamRequestHandler.StreamRequestHandlerState streamRequestHandlerState = StreamRequestHandler.StreamRequestHandlerState.WRITING;
        if (this.partitionIterator.hasNext()) {
            this.currentPair = this.partitionIterator.next();
            this.currentChunkId = 0;
            if (!this.bucketToNumChunks.containsKey(Pair.create(this.currentPair.getSecond(), this.currentPair.getFirst()))) {
                throw new VoldemortException("Bucket [ partition = " + this.currentPair.getSecond() + ", replica = " + this.currentPair.getFirst() + " ] does not exist for store " + this.request.getStore());
            }
            this.numChunks = this.bucketToNumChunks.get(Pair.create(this.currentPair.getSecond(), this.currentPair.getFirst())).intValue();
            this.indexFile = null;
            this.dataFile = null;
            this.fetchStatus = FetchStatus.SEND_DATA_FILE;
        } else {
            this.logger.info("Finished streaming files for partitions tuples " + this.replicaToPartitionTuples);
            streamRequestHandlerState = StreamRequestHandler.StreamRequestHandlerState.COMPLETE;
        }
        return streamRequestHandlerState;
    }
}
