package com.couchbase.client.core;

import com.couchbase.client.core.config.ClusterConfig;
import com.couchbase.client.core.config.ConfigurationProvider;
import com.couchbase.client.core.config.DefaultConfigurationProvider;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.env.DefaultCoreEnvironment;
import com.couchbase.client.core.env.Diagnostics;
import com.couchbase.client.core.hooks.CouchbaseCoreSendHook;
import com.couchbase.client.core.lang.Tuple2;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.logging.RedactableArgument;
import com.couchbase.client.core.message.CouchbaseRequest;
import com.couchbase.client.core.message.CouchbaseResponse;
import com.couchbase.client.core.message.ResponseStatus;
import com.couchbase.client.core.message.cluster.CloseBucketRequest;
import com.couchbase.client.core.message.cluster.CloseBucketResponse;
import com.couchbase.client.core.message.cluster.ClusterRequest;
import com.couchbase.client.core.message.cluster.DisconnectRequest;
import com.couchbase.client.core.message.cluster.DisconnectResponse;
import com.couchbase.client.core.message.cluster.GetClusterConfigRequest;
import com.couchbase.client.core.message.cluster.GetClusterConfigResponse;
import com.couchbase.client.core.message.cluster.OpenBucketRequest;
import com.couchbase.client.core.message.cluster.OpenBucketResponse;
import com.couchbase.client.core.message.cluster.SeedNodesRequest;
import com.couchbase.client.core.message.cluster.SeedNodesResponse;
import com.couchbase.client.core.message.internal.AddNodeRequest;
import com.couchbase.client.core.message.internal.AddNodeResponse;
import com.couchbase.client.core.message.internal.AddServiceRequest;
import com.couchbase.client.core.message.internal.AddServiceResponse;
import com.couchbase.client.core.message.internal.DiagnosticsRequest;
import com.couchbase.client.core.message.internal.GetConfigProviderRequest;
import com.couchbase.client.core.message.internal.GetConfigProviderResponse;
import com.couchbase.client.core.message.internal.InternalRequest;
import com.couchbase.client.core.message.internal.RemoveNodeRequest;
import com.couchbase.client.core.message.internal.RemoveNodeResponse;
import com.couchbase.client.core.message.internal.RemoveServiceRequest;
import com.couchbase.client.core.message.internal.RemoveServiceResponse;
import com.couchbase.client.core.service.Service;
import com.couchbase.client.core.state.LifecycleState;
import com.couchbase.client.core.tracing.RingBufferMonitor;
import com.couchbase.client.deps.com.lmax.disruptor.EventTranslatorOneArg;
import com.couchbase.client.deps.com.lmax.disruptor.ExceptionHandler;
import com.couchbase.client.deps.com.lmax.disruptor.RingBuffer;
import com.couchbase.client.deps.com.lmax.disruptor.dsl.Disruptor;
import com.couchbase.client.deps.com.lmax.disruptor.dsl.ProducerType;
import com.couchbase.client.deps.io.netty.util.concurrent.DefaultThreadFactory;
import java.util.Random;
import rx.Observable;
import rx.functions.Func1;
import rx.subjects.Subject;

/* loaded from: input_file:core-io-1.2.3.jar:com/couchbase/client/core/CouchbaseCore.class */
public class CouchbaseCore implements ClusterFacade {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance((Class<?>) CouchbaseCore.class);
    private static final EventTranslatorOneArg<RequestEvent, CouchbaseRequest> REQUEST_TRANSLATOR = new EventTranslatorOneArg<RequestEvent, CouchbaseRequest>() { // from class: com.couchbase.client.core.CouchbaseCore.1
        @Override // com.couchbase.client.deps.com.lmax.disruptor.EventTranslatorOneArg
        public void translateTo(RequestEvent requestEvent, long j, CouchbaseRequest couchbaseRequest) {
            requestEvent.setRequest(couchbaseRequest);
        }
    };
    public static final BackpressureException BACKPRESSURE_EXCEPTION = new BackpressureException();
    private final RingBuffer<RequestEvent> requestRingBuffer;
    private final RequestHandler requestHandler;
    private final ConfigurationProvider configProvider;
    private final CoreEnvironment environment;
    private final Disruptor<RequestEvent> requestDisruptor;
    private final Disruptor<ResponseEvent> responseDisruptor;
    private volatile boolean sharedEnvironment;
    private final CouchbaseCoreSendHook coreSendHook;
    private final CoreContext ctx;
    private final long coreId;

    public CouchbaseCore() {
        this(DefaultCoreEnvironment.create());
        this.sharedEnvironment = false;
    }

    public CouchbaseCore(CoreEnvironment coreEnvironment) {
        this.sharedEnvironment = true;
        LOGGER.info(coreEnvironment.toString());
        LOGGER.debug(Diagnostics.collectAndFormat());
        this.environment = coreEnvironment;
        this.coreId = Math.abs(new Random().nextLong());
        this.coreSendHook = coreEnvironment.couchbaseCoreSendHook();
        this.configProvider = new DefaultConfigurationProvider(this, coreEnvironment);
        DefaultThreadFactory defaultThreadFactory = new DefaultThreadFactory("cb-core", true);
        this.responseDisruptor = new Disruptor<>(new ResponseEventFactory(), coreEnvironment.responseBufferSize(), defaultThreadFactory);
        this.responseDisruptor.setDefaultExceptionHandler(new ExceptionHandler<ResponseEvent>() { // from class: com.couchbase.client.core.CouchbaseCore.2
            @Override // com.couchbase.client.deps.com.lmax.disruptor.ExceptionHandler
            public void handleEventException(Throwable th, long j, ResponseEvent responseEvent) {
                CouchbaseCore.LOGGER.warn("Exception while Handling Response Events {}", RedactableArgument.user(responseEvent), th);
            }

            @Override // com.couchbase.client.deps.com.lmax.disruptor.ExceptionHandler
            public void handleOnStartException(Throwable th) {
                CouchbaseCore.LOGGER.warn("Exception while Starting Response RingBuffer", th);
            }

            @Override // com.couchbase.client.deps.com.lmax.disruptor.ExceptionHandler
            public void handleOnShutdownException(Throwable th) {
                CouchbaseCore.LOGGER.info("Exception while shutting down Response RingBuffer", th);
            }
        });
        this.responseDisruptor.handleEventsWith(new ResponseHandler(coreEnvironment, this, this.configProvider));
        this.responseDisruptor.start();
        RingBuffer<ResponseEvent> ringBuffer = this.responseDisruptor.getRingBuffer();
        this.requestDisruptor = new Disruptor<>(new RequestEventFactory(), coreEnvironment.requestBufferSize(), defaultThreadFactory, ProducerType.MULTI, coreEnvironment.requestBufferWaitStrategy().newWaitStrategy());
        this.ctx = new CoreContext(coreEnvironment, ringBuffer, this.coreId);
        this.requestHandler = new RequestHandler(this.ctx, this.configProvider.configs());
        this.requestDisruptor.setDefaultExceptionHandler(new ExceptionHandler<RequestEvent>() { // from class: com.couchbase.client.core.CouchbaseCore.3
            @Override // com.couchbase.client.deps.com.lmax.disruptor.ExceptionHandler
            public void handleEventException(Throwable th, long j, RequestEvent requestEvent) {
                CouchbaseCore.LOGGER.warn("Exception while Handling Request Events {}", RedactableArgument.user(requestEvent), th);
            }

            @Override // com.couchbase.client.deps.com.lmax.disruptor.ExceptionHandler
            public void handleOnStartException(Throwable th) {
                CouchbaseCore.LOGGER.warn("Exception while Starting Request RingBuffer", th);
            }

            @Override // com.couchbase.client.deps.com.lmax.disruptor.ExceptionHandler
            public void handleOnShutdownException(Throwable th) {
                CouchbaseCore.LOGGER.info("Exception while shutting down Request RingBuffer", th);
            }
        });
        this.requestDisruptor.handleEventsWith(this.requestHandler);
        this.requestDisruptor.start();
        this.requestRingBuffer = this.requestDisruptor.getRingBuffer();
    }

    @Override // com.couchbase.client.core.ClusterFacade
    public <R extends CouchbaseResponse> Observable<R> send(CouchbaseRequest couchbaseRequest) {
        if (couchbaseRequest instanceof InternalRequest) {
            handleInternalRequest(couchbaseRequest);
            return (Observable<R>) couchbaseRequest.observable().observeOn(this.environment.scheduler());
        }
        if (couchbaseRequest instanceof ClusterRequest) {
            handleClusterRequest(couchbaseRequest);
            return (Observable<R>) couchbaseRequest.observable().observeOn(this.environment.scheduler());
        }
        RingBufferMonitor instance = RingBufferMonitor.instance();
        instance.addRequest(couchbaseRequest);
        if (this.coreSendHook == null) {
            if (!this.requestRingBuffer.tryPublishEvent((EventTranslatorOneArg<RequestEvent, EventTranslatorOneArg>) REQUEST_TRANSLATOR, (EventTranslatorOneArg) couchbaseRequest)) {
                couchbaseRequest.observable().onError(instance.createException());
            }
            return couchbaseRequest.observable();
        }
        Subject<CouchbaseResponse, CouchbaseResponse> observable = couchbaseRequest.observable();
        Tuple2<CouchbaseRequest, Observable<CouchbaseResponse>> beforeSend = this.coreSendHook.beforeSend(couchbaseRequest, observable);
        if (!this.requestRingBuffer.tryPublishEvent((EventTranslatorOneArg<RequestEvent, EventTranslatorOneArg>) REQUEST_TRANSLATOR, (EventTranslatorOneArg) beforeSend.value1())) {
            observable.onError(instance.createException());
        }
        return (Observable) beforeSend.value2();
    }

    private void handleClusterRequest(final CouchbaseRequest couchbaseRequest) {
        if (couchbaseRequest instanceof SeedNodesRequest) {
            couchbaseRequest.observable().onNext(new SeedNodesResponse(this.configProvider.seedHosts(((SeedNodesRequest) couchbaseRequest).nodes(), true) ? ResponseStatus.SUCCESS : ResponseStatus.FAILURE));
            couchbaseRequest.observable().onCompleted();
            return;
        }
        if (couchbaseRequest instanceof OpenBucketRequest) {
            this.configProvider.openBucket(couchbaseRequest.bucket(), couchbaseRequest.username(), couchbaseRequest.password()).flatMap(new Func1<ClusterConfig, Observable<ClusterConfig>>() { // from class: com.couchbase.client.core.CouchbaseCore.5
                @Override // rx.functions.Func1
                public Observable<ClusterConfig> call(ClusterConfig clusterConfig) {
                    return CouchbaseCore.this.requestHandler.reconfigure(clusterConfig);
                }
            }).map(new Func1<ClusterConfig, OpenBucketResponse>() { // from class: com.couchbase.client.core.CouchbaseCore.4
                @Override // rx.functions.Func1
                public OpenBucketResponse call(ClusterConfig clusterConfig) {
                    if (clusterConfig.hasBucket(couchbaseRequest.bucket())) {
                        return new OpenBucketResponse(ResponseStatus.SUCCESS);
                    }
                    throw new CouchbaseException("Could not open bucket.");
                }
            }).subscribe(couchbaseRequest.observable());
            return;
        }
        if (couchbaseRequest instanceof CloseBucketRequest) {
            this.configProvider.closeBucket(couchbaseRequest.bucket()).flatMap(new Func1<ClusterConfig, Observable<ClusterConfig>>() { // from class: com.couchbase.client.core.CouchbaseCore.7
                @Override // rx.functions.Func1
                public Observable<ClusterConfig> call(ClusterConfig clusterConfig) {
                    return CouchbaseCore.this.requestHandler.reconfigure(clusterConfig);
                }
            }).map(new Func1<ClusterConfig, CloseBucketResponse>() { // from class: com.couchbase.client.core.CouchbaseCore.6
                @Override // rx.functions.Func1
                public CloseBucketResponse call(ClusterConfig clusterConfig) {
                    if (clusterConfig.hasBucket(couchbaseRequest.bucket())) {
                        throw new CouchbaseException("Could not close bucket.");
                    }
                    return new CloseBucketResponse(ResponseStatus.SUCCESS);
                }
            }).subscribe(couchbaseRequest.observable());
            return;
        }
        if (couchbaseRequest instanceof DisconnectRequest) {
            this.configProvider.closeBuckets().flatMap(new Func1<Boolean, Observable<Boolean>>() { // from class: com.couchbase.client.core.CouchbaseCore.11
                @Override // rx.functions.Func1
                public Observable<Boolean> call(Boolean bool) {
                    return CouchbaseCore.this.configProvider.shutdown();
                }
            }).flatMap(new Func1<Boolean, Observable<Boolean>>() { // from class: com.couchbase.client.core.CouchbaseCore.10
                @Override // rx.functions.Func1
                public Observable<Boolean> call(Boolean bool) {
                    return CouchbaseCore.this.sharedEnvironment ? Observable.just(true) : CouchbaseCore.this.environment.shutdownAsync();
                }
            }).map(new Func1<Boolean, Boolean>() { // from class: com.couchbase.client.core.CouchbaseCore.9
                @Override // rx.functions.Func1
                public Boolean call(Boolean bool) {
                    CouchbaseCore.this.requestDisruptor.shutdown();
                    CouchbaseCore.this.responseDisruptor.shutdown();
                    return bool;
                }
            }).map(new Func1<Boolean, DisconnectResponse>() { // from class: com.couchbase.client.core.CouchbaseCore.8
                @Override // rx.functions.Func1
                public DisconnectResponse call(Boolean bool) {
                    return new DisconnectResponse(ResponseStatus.SUCCESS);
                }
            }).subscribe(couchbaseRequest.observable());
        } else if (couchbaseRequest instanceof GetClusterConfigRequest) {
            couchbaseRequest.observable().onNext(new GetClusterConfigResponse(this.configProvider.config(), ResponseStatus.SUCCESS));
            couchbaseRequest.observable().onCompleted();
        }
    }

    private void handleInternalRequest(final CouchbaseRequest couchbaseRequest) {
        if (couchbaseRequest instanceof GetConfigProviderRequest) {
            couchbaseRequest.observable().onNext(new GetConfigProviderResponse(this.configProvider));
            couchbaseRequest.observable().onCompleted();
            return;
        }
        if (couchbaseRequest instanceof AddNodeRequest) {
            this.requestHandler.addNode(((AddNodeRequest) couchbaseRequest).hostname(), null).map(new Func1<LifecycleState, AddNodeResponse>() { // from class: com.couchbase.client.core.CouchbaseCore.12
                @Override // rx.functions.Func1
                public AddNodeResponse call(LifecycleState lifecycleState) {
                    return new AddNodeResponse(ResponseStatus.SUCCESS, ((AddNodeRequest) couchbaseRequest).hostname());
                }
            }).subscribe(couchbaseRequest.observable());
            return;
        }
        if (couchbaseRequest instanceof RemoveNodeRequest) {
            this.requestHandler.removeNode(((RemoveNodeRequest) couchbaseRequest).hostname()).map(new Func1<LifecycleState, RemoveNodeResponse>() { // from class: com.couchbase.client.core.CouchbaseCore.13
                @Override // rx.functions.Func1
                public RemoveNodeResponse call(LifecycleState lifecycleState) {
                    return new RemoveNodeResponse(ResponseStatus.SUCCESS);
                }
            }).subscribe(couchbaseRequest.observable());
            return;
        }
        if (couchbaseRequest instanceof AddServiceRequest) {
            this.requestHandler.addService((AddServiceRequest) couchbaseRequest).map(new Func1<Service, AddServiceResponse>() { // from class: com.couchbase.client.core.CouchbaseCore.14
                @Override // rx.functions.Func1
                public AddServiceResponse call(Service service) {
                    return new AddServiceResponse(ResponseStatus.SUCCESS, ((AddServiceRequest) couchbaseRequest).hostname());
                }
            }).subscribe(couchbaseRequest.observable());
            return;
        }
        if (couchbaseRequest instanceof RemoveServiceRequest) {
            this.requestHandler.removeService((RemoveServiceRequest) couchbaseRequest).map(new Func1<Service, RemoveServiceResponse>() { // from class: com.couchbase.client.core.CouchbaseCore.15
                @Override // rx.functions.Func1
                public RemoveServiceResponse call(Service service) {
                    return new RemoveServiceResponse(ResponseStatus.SUCCESS);
                }
            }).subscribe(couchbaseRequest.observable());
        } else if (couchbaseRequest instanceof DiagnosticsRequest) {
            this.requestHandler.diagnostics(((DiagnosticsRequest) couchbaseRequest).id()).subscribe(couchbaseRequest.observable());
        } else {
            couchbaseRequest.observable().onError(new IllegalArgumentException("Unknown request " + couchbaseRequest));
        }
    }

    @Override // com.couchbase.client.core.ClusterFacade
    public long id() {
        return ctx().coreId();
    }

    @Override // com.couchbase.client.core.ClusterFacade
    public CoreContext ctx() {
        return this.ctx;
    }

    static {
        BACKPRESSURE_EXCEPTION.setStackTrace(new StackTraceElement[0]);
    }
}
