package org.xtreemfs.babudb.replication.service;

import com.glines.socketio.server.transport.AbstractHttpTransport;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.net.InetAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.xtreemfs.babudb.BabuDBException;
import org.xtreemfs.babudb.BabuDBRequest;
import org.xtreemfs.babudb.config.ReplicationConfig;
import org.xtreemfs.babudb.log.LogEntry;
import org.xtreemfs.babudb.log.SyncListener;
import org.xtreemfs.babudb.lsmdb.InsertRecordGroup;
import org.xtreemfs.babudb.lsmdb.LSN;
import org.xtreemfs.babudb.replication.BabuDBInterface;
import org.xtreemfs.babudb.replication.FleaseMessageReceiver;
import org.xtreemfs.babudb.replication.Layer;
import org.xtreemfs.babudb.replication.control.RoleChangeListener;
import org.xtreemfs.babudb.replication.service.accounting.ParticipantsOverview;
import org.xtreemfs.babudb.replication.service.accounting.ParticipantsStates;
import org.xtreemfs.babudb.replication.service.accounting.ReplicateResponse;
import org.xtreemfs.babudb.replication.service.clients.ClientInterface;
import org.xtreemfs.babudb.replication.service.clients.ConditionClient;
import org.xtreemfs.babudb.replication.service.clients.MasterClient;
import org.xtreemfs.babudb.replication.service.clients.SlaveClient;
import org.xtreemfs.babudb.replication.service.operations.ChunkOperation;
import org.xtreemfs.babudb.replication.service.operations.FleaseOperation;
import org.xtreemfs.babudb.replication.service.operations.HeartbeatOperation;
import org.xtreemfs.babudb.replication.service.operations.LoadOperation;
import org.xtreemfs.babudb.replication.service.operations.LocalTimeOperation;
import org.xtreemfs.babudb.replication.service.operations.ReplicaOperation;
import org.xtreemfs.babudb.replication.service.operations.ReplicateOperation;
import org.xtreemfs.babudb.replication.service.operations.StateOperation;
import org.xtreemfs.babudb.replication.transmission.FileIOInterface;
import org.xtreemfs.babudb.replication.transmission.TransmissionToServiceInterface;
import org.xtreemfs.babudb.replication.transmission.dispatcher.Operation;
import org.xtreemfs.babudb.snapshots.SnapshotConfig;
import org.xtreemfs.foundation.LifeCycleListener;
import org.xtreemfs.foundation.TimeSync;
import org.xtreemfs.foundation.buffer.ReusableBuffer;
import org.xtreemfs.foundation.logging.Logging;
import org.xtreemfs.foundation.oncrpc.client.RPCResponse;
import org.xtreemfs.foundation.oncrpc.client.RPCResponseAvailableListener;

/* loaded from: input_file:WEB-INF/lib/BabuDB-0.4.5.jar:org/xtreemfs/babudb/replication/service/ServiceLayer.class */
public class ServiceLayer extends Layer implements ServiceToControlInterface, SlaveView {
    private final ParticipantsStates participantsStates;
    private final HeartbeatThread heartbeatThread;
    private final ReplicationStage replicationStage;
    private final BabuDBInterface babuDBInterface;
    private final TransmissionToServiceInterface transmissionInterface;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final AtomicReference<LSN> lastOnView = new AtomicReference<>();
    private volatile RoleChangeListener roleChangeListener = null;

    public ServiceLayer(ReplicationConfig replicationConfig, BabuDBInterface babuDBInterface, TransmissionToServiceInterface transmissionToServiceInterface) throws IOException {
        this.transmissionInterface = transmissionToServiceInterface;
        this.babuDBInterface = babuDBInterface;
        this.participantsStates = new ParticipantsStates(replicationConfig.getSyncN() > 0 ? replicationConfig.getSyncN() - 1 : replicationConfig.getSyncN(), replicationConfig.getParticipants(), transmissionToServiceInterface);
        this.heartbeatThread = new HeartbeatThread(this.participantsStates);
        this.replicationStage = new ReplicationStage(replicationConfig.getMaxQueueLength(), this.heartbeatThread, this, transmissionToServiceInterface.getFileIOInterface(), this.babuDBInterface, this.lastOnView);
    }

    @Override // org.xtreemfs.babudb.replication.service.ServiceToControlInterface
    public ReplicateResponse replicate(LogEntry logEntry, ReusableBuffer reusableBuffer) {
        try {
            List<SlaveClient> availableParticipants = this.participantsStates.getAvailableParticipants();
            final ReplicateResponse replicateResponse = new ReplicateResponse(logEntry, availableParticipants.size() - this.participantsStates.getSyncN());
            if (availableParticipants.size() == 0) {
                Logging.logMessage(7, this, "There are no slaves available anymore! BabuDB runs if it would be in non-replicated mode.", new Object[0]);
            } else {
                for (final SlaveClient slaveClient : availableParticipants) {
                    slaveClient.replicate(logEntry.getLSN(), reusableBuffer.createViewBuffer()).registerListener(new RPCResponseAvailableListener<Object>() { // from class: org.xtreemfs.babudb.replication.service.ServiceLayer.1
                        @Override // org.xtreemfs.foundation.oncrpc.client.RPCResponseAvailableListener
                        public void responseAvailable(RPCResponse<Object> rPCResponse) {
                            try {
                                try {
                                    rPCResponse.get();
                                    ServiceLayer.this.participantsStates.requestFinished(slaveClient);
                                    if (rPCResponse != null) {
                                        rPCResponse.freeBuffers();
                                    }
                                } catch (Exception e) {
                                    ServiceLayer.this.participantsStates.markAsDead(slaveClient);
                                    replicateResponse.decrementPermittedFailures();
                                    Logging.logMessage(6, this, "'%s' was marked as dead, because %s", slaveClient.getDefaultServerAddress().toString(), e.getMessage());
                                    if (e.getMessage() == null) {
                                        Logging.logError(7, this, e);
                                    }
                                    if (rPCResponse != null) {
                                        rPCResponse.freeBuffers();
                                    }
                                }
                            } catch (Throwable th) {
                                if (rPCResponse != null) {
                                    rPCResponse.freeBuffers();
                                }
                                throw th;
                            }
                        }
                    });
                }
            }
            return replicateResponse;
        } catch (Exception e) {
            return new ReplicateResponse(logEntry, e);
        }
    }

    @Override // org.xtreemfs.babudb.replication.service.ServiceToControlInterface
    public void synchronize() throws BabuDBException, InterruptedException, IOException {
        List<ConditionClient> conditionClients = this.participantsStates.getConditionClients();
        long localSystemTime = TimeSync.getLocalSystemTime();
        LSN lsn = null;
        Map<ClientInterface, LSN> map = null;
        if (conditionClients.size() > 0) {
            map = getStates(conditionClients);
            if (map.size() + 1 < this.participantsStates.getSyncN()) {
                throw new BabuDBException(BabuDBException.ErrorCode.REPLICATION_FAILURE, "Not enough slaves available to synchronize with!");
            }
            if (map.size() > 0) {
                LinkedList linkedList = new LinkedList(map.values());
                Collections.sort(linkedList, Collections.reverseOrder());
                lsn = (LSN) linkedList.get(0);
            }
        }
        LSN state = this.babuDBInterface.getState();
        if (lsn != null && lsn.compareTo(state) > 0) {
            Iterator<Map.Entry<ClientInterface, LSN>> it2 = map.entrySet().iterator();
            while (true) {
                if (!it2.hasNext()) {
                    break;
                }
                if (it2.next().getValue().equals(lsn)) {
                    Logging.logMessage(6, this, "Starting synchronization from '%s' to '%s'.", state.toString(), lsn.toString());
                    BabuDBRequest<Boolean> babuDBRequest = new BabuDBRequest<>();
                    this.replicationStage.manualLoad(babuDBRequest, state, lsn);
                    babuDBRequest.get();
                    if (!$assertionsDisabled && !lsn.equals(this.babuDBInterface.getState())) {
                        throw new AssertionError("Synchronization failed: (expected=" + lsn.toString() + ") != (acknowledged=" + this.babuDBInterface.getState() + ")");
                    }
                }
            }
        }
        this.lastOnView.set(this.babuDBInterface.getState());
        Logging.logMessage(7, this, "taking a checkpoint", new Object[0]);
        this.babuDBInterface.checkpoint();
        long localSystemTime2 = TimeSync.getLocalSystemTime() - localSystemTime;
        Thread.sleep(localSystemTime2 < AbstractHttpTransport.HTTP_REQUEST_TIMEOUT ? AbstractHttpTransport.HTTP_REQUEST_TIMEOUT - localSystemTime2 : 0L);
        Logging.logMessage(6, this, "Running in master-mode (%s)", this.babuDBInterface.getState().toString());
    }

    @Override // org.xtreemfs.babudb.replication.Coinable
    public void coin(RoleChangeListener roleChangeListener, FleaseMessageReceiver fleaseMessageReceiver) {
        if (!$assertionsDisabled && fleaseMessageReceiver == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && roleChangeListener == null) {
            throw new AssertionError();
        }
        synchronized (this) {
            if (this.roleChangeListener == null) {
                this.roleChangeListener = roleChangeListener;
                this.replicationStage.setRoleChangeListener(roleChangeListener);
                this.transmissionInterface.coin(initializeOperations(this.transmissionInterface.getFileIOInterface(), fleaseMessageReceiver), this.participantsStates);
            }
        }
    }

    @Override // org.xtreemfs.babudb.replication.service.SlaveView
    public void handleLogEntry(LogEntry logEntry, SyncListener syncListener) throws BabuDBException, InterruptedException {
        switch (logEntry.getPayloadType()) {
            case 0:
                this.babuDBInterface.insertRecordGroup(InsertRecordGroup.deserialize(logEntry.getPayload()));
                break;
            case 1:
                ObjectInputStream objectInputStream = null;
                try {
                    try {
                        objectInputStream = new ObjectInputStream(new ByteArrayInputStream(logEntry.getPayload().array()));
                        this.babuDBInterface.createSnapshot(objectInputStream.readInt(), (SnapshotConfig) objectInputStream.readObject());
                        if (objectInputStream != null) {
                            try {
                                objectInputStream.close();
                            } catch (IOException e) {
                                break;
                            }
                        }
                        break;
                    } catch (Throwable th) {
                        if (objectInputStream != null) {
                            try {
                                objectInputStream.close();
                            } catch (IOException e2) {
                                throw th;
                            }
                        }
                        throw th;
                    }
                } catch (Exception e3) {
                    throw new BabuDBException(BabuDBException.ErrorCode.REPLICATION_FAILURE, "Could not deserialize operation of type " + ((int) logEntry.getPayloadType()) + ", because: " + e3.getMessage(), e3);
                }
            case 2:
                int i = logEntry.getPayload().getInt();
                String string = logEntry.getPayload().getString();
                int i2 = logEntry.getPayload().getInt();
                if (!this.babuDBInterface.dbExists(i)) {
                    this.babuDBInterface.createDB(string, i2);
                    break;
                }
                break;
            case 3:
                logEntry.getPayload().getInt();
                int i3 = logEntry.getPayload().getInt();
                String string2 = logEntry.getPayload().getString();
                String string3 = logEntry.getPayload().getString();
                if (!this.babuDBInterface.dbExists(i3)) {
                    this.babuDBInterface.copyDB(string2, string3);
                    break;
                }
                break;
            case 4:
                int i4 = logEntry.getPayload().getInt();
                String string4 = logEntry.getPayload().getString();
                if (this.babuDBInterface.dbExists(i4)) {
                    this.babuDBInterface.deleteDB(string4);
                    break;
                }
                break;
            case 5:
                byte[] array = logEntry.getPayload().array();
                byte b = array[0];
                this.babuDBInterface.deleteSnapshot(new String(array, 1, (int) b), new String(array, b + 1, (array.length - b) - 1));
                break;
            default:
                new BabuDBException(BabuDBException.ErrorCode.INTERNAL_ERROR, "unknown payload-type");
                break;
        }
        logEntry.getPayload().flip();
        logEntry.setListener(syncListener);
        this.babuDBInterface.appendToDisklogger(logEntry);
    }

    @Override // org.xtreemfs.babudb.replication.service.SlaveView
    public MasterClient getSynchronizationPartner(LSN lsn) {
        MasterClient master = this.participantsStates.getMaster();
        List<ConditionClient> conditionClients = this.participantsStates.getConditionClients();
        conditionClients.remove(master);
        if (conditionClients.size() > 0) {
            for (Map.Entry<ClientInterface, LSN> entry : getStates(conditionClients).entrySet()) {
                if (entry.getValue().compareTo(lsn) >= 0) {
                    return (MasterClient) entry.getKey();
                }
            }
        }
        return master;
    }

    @Override // org.xtreemfs.babudb.replication.service.ServiceToControlInterface
    public ParticipantsOverview getParticipantOverview() {
        return this.participantsStates;
    }

    @Override // org.xtreemfs.babudb.replication.service.ServiceToControlInterface
    public void changeMaster(InetAddress inetAddress) {
        this.replicationStage.lastInserted = this.babuDBInterface.getState();
        this.participantsStates.setMaster(inetAddress);
    }

    @Override // org.xtreemfs.babudb.replication.Layer
    public void _setLifeCycleListener(LifeCycleListener lifeCycleListener) {
        this.heartbeatThread.setLifeCycleListener(lifeCycleListener);
        this.replicationStage.setLifeCycleListener(lifeCycleListener);
    }

    @Override // org.xtreemfs.babudb.replication.Layer
    public void start() {
        LSN state = this.babuDBInterface.getState();
        Object[] objArr = new Object[2];
        objArr[0] = state.getSequenceNo() == 0 ? new LSN(0, 0L) : state;
        objArr[1] = state;
        Logging.logMessage(7, this, "Setting last on view LSN to '%s', initial was '%s'.", objArr);
        this.lastOnView.set(state.getSequenceNo() == 0 ? new LSN(0, 0L) : state);
        try {
            if (this.roleChangeListener == null) {
                throw new Exception("The service layer has not been coined yet!");
            }
            this.heartbeatThread.start(state);
            this.replicationStage.lastInserted = state;
            this.replicationStage.start();
            this.heartbeatThread.waitForStartup();
            this.replicationStage.waitForStartup();
        } catch (Exception e) {
            this.listener.crashPerformed(e);
        }
    }

    @Override // org.xtreemfs.babudb.replication.Layer
    public void asyncShutdown() {
        this.heartbeatThread.shutdown();
        this.replicationStage.shutdown();
    }

    @Override // org.xtreemfs.babudb.replication.Layer
    public void shutdown() {
        this.heartbeatThread.shutdown();
        this.replicationStage.shutdown();
        try {
            this.heartbeatThread.waitForShutdown();
            this.replicationStage.waitForShutdown();
        } catch (Exception e) {
            this.listener.crashPerformed(e);
        }
    }

    @Override // org.xtreemfs.babudb.replication.service.ServiceToControlInterface
    public void subscribeListener(ReplicateResponse replicateResponse) {
        this.participantsStates.subscribeListener(replicateResponse);
    }

    @Override // org.xtreemfs.babudb.replication.service.ServiceToControlInterface
    public void reset() {
        this.participantsStates.clearListeners();
    }

    private Map<Integer, Operation> initializeOperations(FileIOInterface fileIOInterface, FleaseMessageReceiver fleaseMessageReceiver) {
        HashMap hashMap = new HashMap();
        LocalTimeOperation localTimeOperation = new LocalTimeOperation();
        hashMap.put(Integer.valueOf(localTimeOperation.getProcedureId()), localTimeOperation);
        FleaseOperation fleaseOperation = new FleaseOperation(fleaseMessageReceiver);
        hashMap.put(Integer.valueOf(fleaseOperation.getProcedureId()), fleaseOperation);
        StateOperation stateOperation = new StateOperation(this.babuDBInterface);
        hashMap.put(Integer.valueOf(stateOperation.getProcedureId()), stateOperation);
        HeartbeatOperation heartbeatOperation = new HeartbeatOperation(this.participantsStates);
        hashMap.put(Integer.valueOf(heartbeatOperation.getProcedureId()), heartbeatOperation);
        ReplicateOperation replicateOperation = new ReplicateOperation(this.replicationStage, this.participantsStates);
        hashMap.put(Integer.valueOf(replicateOperation.getProcedureId()), replicateOperation);
        ReplicaOperation replicaOperation = new ReplicaOperation(this.lastOnView, this.babuDBInterface, fileIOInterface);
        hashMap.put(Integer.valueOf(replicaOperation.getProcedureId()), replicaOperation);
        LoadOperation loadOperation = new LoadOperation(this.lastOnView, this.babuDBInterface, fileIOInterface);
        hashMap.put(Integer.valueOf(loadOperation.getProcedureId()), loadOperation);
        ChunkOperation chunkOperation = new ChunkOperation();
        hashMap.put(Integer.valueOf(chunkOperation.getProcedureId()), chunkOperation);
        return hashMap;
    }

    private Map<ClientInterface, LSN> getStates(List<ConditionClient> list) {
        int size = list.size();
        HashMap hashMap = new HashMap();
        RPCResponse[] rPCResponseArr = new RPCResponse[size];
        for (int i = 0; i < size; i++) {
            rPCResponseArr[i] = list.get(i).state();
        }
        for (int i2 = 0; i2 < size; i2++) {
            try {
                try {
                    hashMap.put(list.get(i2), (LSN) rPCResponseArr[i2].get());
                    if (rPCResponseArr[i2] != null) {
                        rPCResponseArr[i2].freeBuffers();
                    }
                } catch (Exception e) {
                    Logging.logMessage(6, this, "Could not receive state of '%s', because: %s.", list.get(i2), e.getMessage());
                    if (e.getMessage() == null) {
                        Logging.logError(6, this, e);
                    }
                    if (rPCResponseArr[i2] != null) {
                        rPCResponseArr[i2].freeBuffers();
                    }
                }
            } catch (Throwable th) {
                if (rPCResponseArr[i2] != null) {
                    rPCResponseArr[i2].freeBuffers();
                }
                throw th;
            }
        }
        return hashMap;
    }

    static {
        $assertionsDisabled = !ServiceLayer.class.desiredAssertionStatus();
    }
}
