package com.couchbase.client.core;

import com.couchbase.client.core.config.BucketConfig;
import com.couchbase.client.core.config.ClusterConfig;
import com.couchbase.client.core.config.ConfigurationProvider;
import com.couchbase.client.core.config.ProposedBucketConfigContext;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.CouchbaseMessage;
import com.couchbase.client.core.message.CouchbaseRequest;
import com.couchbase.client.core.message.CouchbaseResponse;
import com.couchbase.client.core.message.ResponseStatus;
import com.couchbase.client.core.message.internal.SignalConfigReload;
import com.couchbase.client.core.message.kv.BinaryResponse;
import com.couchbase.client.core.time.Delay;
import com.couchbase.client.deps.com.lmax.disruptor.EventHandler;
import com.couchbase.client.deps.com.lmax.disruptor.EventTranslatorTwoArg;
import com.couchbase.client.deps.io.netty.util.CharsetUtil;
import java.util.concurrent.TimeUnit;
import rx.Scheduler;
import rx.functions.Action0;
import rx.subjects.Subject;

/* loaded from: input_file:WEB-INF/lib/core-io-1.7.11.jar:com/couchbase/client/core/ResponseHandler.class */
public class ResponseHandler implements EventHandler<ResponseEvent> {
    private final ClusterFacade cluster;
    private final ConfigurationProvider configurationProvider;
    private final CoreEnvironment environment;
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance((Class<?>) ResponseHandler.class);
    public static final EventTranslatorTwoArg<ResponseEvent, CouchbaseMessage, Subject<CouchbaseResponse, CouchbaseResponse>> RESPONSE_TRANSLATOR = new EventTranslatorTwoArg<ResponseEvent, CouchbaseMessage, Subject<CouchbaseResponse, CouchbaseResponse>>() { // from class: com.couchbase.client.core.ResponseHandler.1
        @Override // com.couchbase.client.deps.com.lmax.disruptor.EventTranslatorTwoArg
        public void translateTo(ResponseEvent responseEvent, long j, CouchbaseMessage couchbaseMessage, Subject<CouchbaseResponse, CouchbaseResponse> subject) {
            responseEvent.setMessage(couchbaseMessage);
            responseEvent.setObservable(subject);
        }
    };
    private final int nmvbRetryDelay = Integer.parseInt(System.getProperty("com.couchbase.nmvbRetryDelay", "100"));
    private final boolean traceLoggingEnabled = LOGGER.isTraceEnabled();

    public ResponseHandler(CoreEnvironment coreEnvironment, ClusterFacade clusterFacade, ConfigurationProvider configurationProvider) {
        this.cluster = clusterFacade;
        this.configurationProvider = configurationProvider;
        this.environment = coreEnvironment;
    }

    @Override // com.couchbase.client.deps.com.lmax.disruptor.EventHandler
    public void onEvent(ResponseEvent responseEvent, long j, boolean z) throws Exception {
        try {
            CouchbaseMessage message = responseEvent.getMessage();
            if (message instanceof SignalConfigReload) {
                this.configurationProvider.signalOutdated();
            } else if (message instanceof CouchbaseResponse) {
                final CouchbaseResponse couchbaseResponse = (CouchbaseResponse) message;
                if (couchbaseResponse.status() == ResponseStatus.RETRY) {
                    retry(responseEvent, true);
                } else {
                    final Scheduler.Worker createWorker = this.environment.scheduler().createWorker();
                    final Subject<CouchbaseResponse, CouchbaseResponse> observable = responseEvent.getObservable();
                    createWorker.schedule(new Action0() { // from class: com.couchbase.client.core.ResponseHandler.2
                        @Override // rx.functions.Action0
                        public void call() {
                            try {
                                observable.onNext(couchbaseResponse);
                                observable.onCompleted();
                            } catch (Exception e) {
                                observable.onError(e);
                            } finally {
                                createWorker.unsubscribe();
                            }
                        }
                    });
                }
            } else {
                if (!(message instanceof CouchbaseRequest)) {
                    throw new IllegalStateException("Got message type I do not understand: " + message);
                }
                retry(responseEvent, false);
            }
        } finally {
            responseEvent.setMessage(null);
            responseEvent.setObservable(null);
        }
    }

    private void retry(ResponseEvent responseEvent, boolean z) {
        CouchbaseMessage message = responseEvent.getMessage();
        if (message instanceof CouchbaseRequest) {
            scheduleForRetry((CouchbaseRequest) message, z);
            return;
        }
        CouchbaseRequest request = ((CouchbaseResponse) message).request();
        if (request != null) {
            scheduleForRetry(request, z);
        } else {
            responseEvent.getObservable().onError(new CouchbaseException("Operation failed because it does not support cloning."));
        }
        if (message instanceof BinaryResponse) {
            BinaryResponse binaryResponse = (BinaryResponse) message;
            if (binaryResponse.content() == null || binaryResponse.content().readableBytes() <= 0) {
                return;
            }
            try {
                String trim = binaryResponse.content().toString(CharsetUtil.UTF_8).trim();
                if (trim.startsWith("{")) {
                    String str = null;
                    if (request != null) {
                        str = request.dispatchHostname();
                    }
                    this.configurationProvider.proposeBucketConfig(new ProposedBucketConfigContext(binaryResponse.bucket(), trim, str));
                }
            } finally {
                binaryResponse.content().release();
            }
        }
    }

    private void scheduleForRetry(final CouchbaseRequest couchbaseRequest, boolean z) {
        long calculate;
        TimeUnit unit;
        CoreEnvironment coreEnvironment = this.environment;
        if (couchbaseRequest.retryDelay() != null) {
            Delay retryDelay = couchbaseRequest.retryDelay();
            if (couchbaseRequest.retryCount() == 0) {
                calculate = couchbaseRequest.retryAfter();
                couchbaseRequest.incrementRetryCount();
            } else {
                calculate = retryDelay.calculate(couchbaseRequest.incrementRetryCount());
            }
            unit = retryDelay.unit();
            if (couchbaseRequest.maxRetryDuration() != 0 && System.currentTimeMillis() + calculate > couchbaseRequest.maxRetryDuration()) {
                couchbaseRequest.observable().onError(new RequestCancelledException("Could not dispatch request, cancelling instead of retrying as the maximum retry duration specified by Server has been exceeded"));
                return;
            }
        } else {
            Delay retryDelay2 = coreEnvironment.retryDelay();
            if (z) {
                calculate = (couchbaseRequest.incrementRetryCount() == 0 && bucketHasFastForwardMap(couchbaseRequest.bucket(), this.configurationProvider.config())) ? 0L : this.nmvbRetryDelay;
                unit = TimeUnit.MILLISECONDS;
            } else {
                calculate = retryDelay2.calculate(couchbaseRequest.incrementRetryCount());
                unit = retryDelay2.unit();
            }
        }
        if (this.traceLoggingEnabled) {
            LOGGER.trace("Retrying {} with a delay of {} {}", couchbaseRequest, Long.valueOf(calculate), unit);
        }
        final Scheduler.Worker createWorker = coreEnvironment.scheduler().createWorker();
        createWorker.schedule(new Action0() { // from class: com.couchbase.client.core.ResponseHandler.3
            @Override // rx.functions.Action0
            public void call() {
                try {
                    ResponseHandler.this.cluster.send(couchbaseRequest);
                } finally {
                    createWorker.unsubscribe();
                }
            }
        }, calculate, unit);
    }

    private static boolean bucketHasFastForwardMap(String str, ClusterConfig clusterConfig) {
        BucketConfig bucketConfig;
        return (str == null || (bucketConfig = clusterConfig.bucketConfig(str)) == null || !bucketConfig.hasFastForwardMap()) ? false : true;
    }
}
