package com.couchbase.client.core.service;

import com.couchbase.client.core.CoreContext;
import com.couchbase.client.core.CouchbaseException;
import com.couchbase.client.core.endpoint.Endpoint;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.CouchbaseRequest;
import com.couchbase.client.core.message.internal.SignalFlush;
import com.couchbase.client.core.retry.RetryHelper;
import com.couchbase.client.core.service.Service;
import com.couchbase.client.core.state.LifecycleState;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func1;

/* loaded from: input_file:WEB-INF/lib/core-io-1.7.11.jar:com/couchbase/client/core/service/AbstractOnDemandService.class */
public abstract class AbstractOnDemandService extends AbstractDynamicService {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance((Class<?>) Service.class);
    private final List<Endpoint> onDemandEndpoints;
    private volatile boolean disconnect;
    private final CoreContext ctx;
    private final String hostname;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractOnDemandService(String str, String str2, String str3, String str4, int i, CoreContext coreContext, Service.EndpointFactory endpointFactory) {
        super(str, str2, str3, str4, i, coreContext, 0, endpointFactory);
        this.onDemandEndpoints = new CopyOnWriteArrayList();
        this.disconnect = false;
        this.ctx = coreContext;
        this.hostname = str;
    }

    @Override // com.couchbase.client.core.service.AbstractDynamicService
    protected void dispatch(final CouchbaseRequest couchbaseRequest) {
        if (this.disconnect) {
            RetryHelper.retryOrCancel(this.ctx.environment(), couchbaseRequest, this.ctx.responseRingBuffer());
            return;
        }
        final Endpoint createEndpoint = createEndpoint();
        endpointStates().register(createEndpoint, createEndpoint);
        this.onDemandEndpoints.add(createEndpoint);
        createEndpoint.connect().subscribe((Subscriber<? super LifecycleState>) new Subscriber<LifecycleState>() { // from class: com.couchbase.client.core.service.AbstractOnDemandService.1
            @Override // rx.Observer
            public void onCompleted() {
            }

            @Override // rx.Observer
            public void onError(Throwable th) {
                couchbaseRequest.observable().onError(th);
            }

            @Override // rx.Observer
            public void onNext(LifecycleState lifecycleState) {
                if (lifecycleState == LifecycleState.DISCONNECTED) {
                    couchbaseRequest.observable().onError(new CouchbaseException("Could not connect endpoint."));
                }
            }
        });
        whenState(createEndpoint, LifecycleState.CONNECTED, new Action1<LifecycleState>() { // from class: com.couchbase.client.core.service.AbstractOnDemandService.2
            @Override // rx.functions.Action1
            public void call(LifecycleState lifecycleState) {
                if (!AbstractOnDemandService.this.disconnect) {
                    createEndpoint.send(couchbaseRequest);
                    createEndpoint.send(SignalFlush.INSTANCE);
                } else {
                    RetryHelper.retryOrCancel(AbstractOnDemandService.this.ctx.environment(), couchbaseRequest, AbstractOnDemandService.this.ctx.responseRingBuffer());
                    AbstractOnDemandService.LOGGER.debug(AbstractDynamicService.logIdent(AbstractOnDemandService.this.hostname, AbstractOnDemandService.this) + "Initializing disconnect on Endpoint.");
                    createEndpoint.disconnect().subscribe((Subscriber<? super LifecycleState>) new Subscriber<LifecycleState>() { // from class: com.couchbase.client.core.service.AbstractOnDemandService.2.1
                        @Override // rx.Observer
                        public void onCompleted() {
                        }

                        @Override // rx.Observer
                        public void onError(Throwable th) {
                            AbstractOnDemandService.LOGGER.warn("Error while disconnecting endpoint.", th);
                        }

                        @Override // rx.Observer
                        public void onNext(LifecycleState lifecycleState2) {
                        }
                    });
                }
            }
        });
        whenState(createEndpoint, LifecycleState.DISCONNECTED, new Action1<LifecycleState>() { // from class: com.couchbase.client.core.service.AbstractOnDemandService.3
            @Override // rx.functions.Action1
            public void call(LifecycleState lifecycleState) {
                AbstractOnDemandService.this.endpointStates().deregister(createEndpoint);
                AbstractOnDemandService.this.onDemandEndpoints.remove(createEndpoint);
            }
        });
    }

    @Override // com.couchbase.client.core.service.AbstractDynamicService, com.couchbase.client.core.service.Service
    public Observable<LifecycleState> disconnect() {
        this.disconnect = true;
        LOGGER.debug(logIdent(this.hostname, this) + "Got instructed to disconnect.");
        return Observable.from(this.onDemandEndpoints).flatMap(new Func1<Endpoint, Observable<LifecycleState>>() { // from class: com.couchbase.client.core.service.AbstractOnDemandService.5
            @Override // rx.functions.Func1
            public Observable<LifecycleState> call(Endpoint endpoint) {
                AbstractOnDemandService.LOGGER.debug(AbstractDynamicService.logIdent(AbstractOnDemandService.this.hostname, AbstractOnDemandService.this) + "Initializing disconnect on Endpoint.");
                return endpoint.disconnect();
            }
        }).lastOrDefault(LifecycleState.IDLE).map(new Func1<LifecycleState, LifecycleState>() { // from class: com.couchbase.client.core.service.AbstractOnDemandService.4
            @Override // rx.functions.Func1
            public LifecycleState call(LifecycleState lifecycleState) {
                AbstractOnDemandService.this.endpointStates().terminate();
                return AbstractOnDemandService.this.state();
            }
        });
    }
}
