package com.couchbase.client.core.config.refresher;

import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.config.BucketConfig;
import com.couchbase.client.core.config.ClusterConfig;
import com.couchbase.client.core.config.ConfigurationException;
import com.couchbase.client.core.config.NodeInfo;
import com.couchbase.client.core.config.ProposedBucketConfigContext;
import com.couchbase.client.core.config.loader.HttpLoader;
import com.couchbase.client.core.config.refresher.AbstractRefresher;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.ResponseStatus;
import com.couchbase.client.core.message.config.BucketConfigRequest;
import com.couchbase.client.core.message.config.BucketConfigResponse;
import com.couchbase.client.core.message.config.BucketStreamingRequest;
import com.couchbase.client.core.message.config.BucketStreamingResponse;
import com.couchbase.client.core.service.ServiceType;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;

/* loaded from: input_file:core-io-1.7.11.jar:com/couchbase/client/core/config/refresher/HttpRefresher.class */
public class HttpRefresher extends AbstractRefresher {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance((Class<?>) HttpRefresher.class);
    private static final String TERSE_PATH = "/pools/default/bs/";
    private static final String VERBOSE_PATH = "/pools/default/bucketsStreaming/";
    private final CoreEnvironment environment;
    private final AtomicLong nodeOffset;
    private final Map<String, Long> lastPollTimestamps;
    private final long pollFloorNs;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:core-io-1.7.11.jar:com/couchbase/client/core/config/refresher/HttpRefresher$TerseConfigDoesNotExistException.class */
    public class TerseConfigDoesNotExistException extends ConfigurationException {
        TerseConfigDoesNotExistException() {
        }
    }

    public HttpRefresher(CoreEnvironment coreEnvironment, ClusterFacade clusterFacade) {
        super(coreEnvironment, clusterFacade);
        this.nodeOffset = new AtomicLong(0L);
        this.lastPollTimestamps = new ConcurrentHashMap();
        this.environment = coreEnvironment;
        this.pollFloorNs = TimeUnit.MILLISECONDS.toNanos(this.environment.configPollFloorInterval());
    }

    @Override // com.couchbase.client.core.config.refresher.AbstractRefresher, com.couchbase.client.core.config.refresher.Refresher
    public Observable<Boolean> registerBucket(String str, String str2) {
        return registerBucket(str, str, str2);
    }

    @Override // com.couchbase.client.core.config.refresher.AbstractRefresher, com.couchbase.client.core.config.refresher.Refresher
    public Observable<Boolean> registerBucket(final String str, final String str2, final String str3) {
        Observable<BucketStreamingResponse> onErrorResumeNext = super.registerBucket(str, str2, str3).flatMap(new Func1<Boolean, Observable<BucketStreamingResponse>>() { // from class: com.couchbase.client.core.config.refresher.HttpRefresher.2
            @Override // rx.functions.Func1
            public Observable<BucketStreamingResponse> call(Boolean bool) {
                return HttpRefresher.this.cluster().send(new BucketStreamingRequest(HttpRefresher.TERSE_PATH, str, str2, str3)).doOnNext(new Action1<BucketStreamingResponse>() { // from class: com.couchbase.client.core.config.refresher.HttpRefresher.2.1
                    @Override // rx.functions.Action1
                    public void call(BucketStreamingResponse bucketStreamingResponse) {
                        if (bucketStreamingResponse.status() == ResponseStatus.NOT_EXISTS) {
                            throw new TerseConfigDoesNotExistException();
                        }
                        if (!bucketStreamingResponse.status().isSuccess()) {
                            throw new ConfigurationException("Could not load terse config.");
                        }
                    }
                });
            }
        }).onErrorResumeNext(new Func1<Throwable, Observable<BucketStreamingResponse>>() { // from class: com.couchbase.client.core.config.refresher.HttpRefresher.1
            @Override // rx.functions.Func1
            public Observable<BucketStreamingResponse> call(Throwable th) {
                return th instanceof TerseConfigDoesNotExistException ? HttpRefresher.this.cluster().send(new BucketStreamingRequest(HttpRefresher.VERBOSE_PATH, str, str2, str3)).doOnNext(new Action1<BucketStreamingResponse>() { // from class: com.couchbase.client.core.config.refresher.HttpRefresher.1.1
                    @Override // rx.functions.Action1
                    public void call(BucketStreamingResponse bucketStreamingResponse) {
                        if (!bucketStreamingResponse.status().isSuccess()) {
                            throw new ConfigurationException("Could not load terse config.");
                        }
                    }
                }) : Observable.error(th);
            }
        });
        repeatConfigUntilUnsubscribed(str, onErrorResumeNext);
        return onErrorResumeNext.map(new Func1<BucketStreamingResponse, Boolean>() { // from class: com.couchbase.client.core.config.refresher.HttpRefresher.3
            @Override // rx.functions.Func1
            public Boolean call(BucketStreamingResponse bucketStreamingResponse) {
                return Boolean.valueOf(bucketStreamingResponse.status().isSuccess());
            }
        });
    }

    private void repeatConfigUntilUnsubscribed(final String str, Observable<BucketStreamingResponse> observable) {
        observable.flatMap(new Func1<BucketStreamingResponse, Observable<ProposedBucketConfigContext>>() { // from class: com.couchbase.client.core.config.refresher.HttpRefresher.6
            @Override // rx.functions.Func1
            public Observable<ProposedBucketConfigContext> call(final BucketStreamingResponse bucketStreamingResponse) {
                HttpRefresher.LOGGER.debug("Config stream started for {} on {}.", str, bucketStreamingResponse.host());
                return bucketStreamingResponse.configs().map(new Func1<String, ProposedBucketConfigContext>() { // from class: com.couchbase.client.core.config.refresher.HttpRefresher.6.2
                    @Override // rx.functions.Func1
                    public ProposedBucketConfigContext call(String str2) {
                        return new ProposedBucketConfigContext(str, str2.replace("$HOST", bucketStreamingResponse.host()), bucketStreamingResponse.host());
                    }
                }).doOnCompleted(new Action0() { // from class: com.couchbase.client.core.config.refresher.HttpRefresher.6.1
                    @Override // rx.functions.Action0
                    public void call() {
                        HttpRefresher.LOGGER.debug("Config stream ended for {} on {}.", str, bucketStreamingResponse.host());
                    }
                });
            }
        }).repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() { // from class: com.couchbase.client.core.config.refresher.HttpRefresher.5
            @Override // rx.functions.Func1
            public Observable<?> call(Observable<? extends Void> observable2) {
                return observable2.flatMap(new Func1<Void, Observable<?>>() { // from class: com.couchbase.client.core.config.refresher.HttpRefresher.5.1
                    @Override // rx.functions.Func1
                    public Observable<?> call(Void r5) {
                        if (HttpRefresher.this.registrations().containsKey(str)) {
                            HttpRefresher.LOGGER.debug("Resubscribing config stream for bucket {}, still registered.", str);
                            return Observable.just(true);
                        }
                        HttpRefresher.LOGGER.debug("Not resubscribing config stream for bucket {}, not registered.", str);
                        return Observable.empty();
                    }
                });
            }
        }).subscribe((Subscriber) new Subscriber<ProposedBucketConfigContext>() { // from class: com.couchbase.client.core.config.refresher.HttpRefresher.4
            @Override // rx.Observer
            public void onCompleted() {
            }

            @Override // rx.Observer
            public void onError(Throwable th) {
                HttpRefresher.LOGGER.error("Error while subscribing to Http refresh stream!", th);
            }

            @Override // rx.Observer
            public void onNext(ProposedBucketConfigContext proposedBucketConfigContext) {
                HttpRefresher.this.pushConfig(proposedBucketConfigContext);
            }
        });
    }

    @Override // com.couchbase.client.core.config.refresher.Refresher
    public Observable<Boolean> shutdown() {
        return Observable.just(true);
    }

    @Override // com.couchbase.client.core.config.refresher.Refresher
    public void markTainted(BucketConfig bucketConfig) {
    }

    @Override // com.couchbase.client.core.config.refresher.Refresher
    public void markUntainted(BucketConfig bucketConfig) {
    }

    @Override // com.couchbase.client.core.config.refresher.Refresher
    public void refresh(ClusterConfig clusterConfig) {
        Observable.from(clusterConfig.bucketConfigs().values()).observeOn(this.environment.scheduler()).filter(new Func1<BucketConfig, Boolean>() { // from class: com.couchbase.client.core.config.refresher.HttpRefresher.9
            @Override // rx.functions.Func1
            public Boolean call(BucketConfig bucketConfig) {
                return Boolean.valueOf(HttpRefresher.this.registrations().containsKey(bucketConfig.name()));
            }
        }).filter(new Func1<BucketConfig, Boolean>() { // from class: com.couchbase.client.core.config.refresher.HttpRefresher.8
            @Override // rx.functions.Func1
            public Boolean call(BucketConfig bucketConfig) {
                String name = bucketConfig.name();
                boolean allowedToPoll = HttpRefresher.this.allowedToPoll(name);
                if (allowedToPoll) {
                    HttpRefresher.this.lastPollTimestamps.put(name, Long.valueOf(System.nanoTime()));
                } else {
                    HttpRefresher.LOGGER.trace("Ignoring refresh polling attempt because poll interval is too small.");
                }
                return Boolean.valueOf(allowedToPoll);
            }
        }).subscribe(new Action1<BucketConfig>() { // from class: com.couchbase.client.core.config.refresher.HttpRefresher.7
            @Override // rx.functions.Action1
            public void call(BucketConfig bucketConfig) {
                final String name = bucketConfig.name();
                ArrayList arrayList = new ArrayList(bucketConfig.nodes());
                if (arrayList.isEmpty()) {
                    HttpRefresher.LOGGER.debug("Cannot refresh bucket, because node list contains no nodes.");
                } else {
                    HttpRefresher.this.shiftNodeList(arrayList);
                    HttpRefresher.this.buildRefreshFallbackSequence(arrayList, name).subscribe((Subscriber) new Subscriber<ProposedBucketConfigContext>() { // from class: com.couchbase.client.core.config.refresher.HttpRefresher.7.1
                        @Override // rx.Observer
                        public void onCompleted() {
                            HttpRefresher.LOGGER.debug("Completed refreshing config for bucket \"{}\"", name);
                        }

                        @Override // rx.Observer
                        public void onError(Throwable th) {
                            HttpRefresher.LOGGER.debug("Error while refreshing bucket config, ignoring.", th);
                        }

                        @Override // rx.Observer
                        public void onNext(ProposedBucketConfigContext proposedBucketConfigContext) {
                            if (proposedBucketConfigContext.config().startsWith("{")) {
                                HttpRefresher.this.provider().proposeBucketConfig(proposedBucketConfigContext);
                            }
                        }
                    });
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Observable<ProposedBucketConfigContext> buildRefreshFallbackSequence(List<NodeInfo> list, String str) {
        Observable<ProposedBucketConfigContext> observable = null;
        for (NodeInfo nodeInfo : list) {
            if (isValidConfigNode(this.environment.sslEnabled(), nodeInfo)) {
                observable = observable == null ? refreshAgainstNode(str, nodeInfo.hostname()) : observable.onErrorResumeNext(refreshAgainstNode(str, nodeInfo.hostname()));
            }
        }
        if (observable != null) {
            return observable;
        }
        LOGGER.debug("Could not build refresh sequence, node list is empty - ignoring attempt.");
        return Observable.empty();
    }

    private Observable<ProposedBucketConfigContext> refreshAgainstNode(final String str, final String str2) {
        final AbstractRefresher.Credential credential = registrations().get(str);
        if (credential != null) {
            return Observable.defer(new Func0<Observable<BucketConfigResponse>>() { // from class: com.couchbase.client.core.config.refresher.HttpRefresher.12
                @Override // rx.functions.Func0, java.util.concurrent.Callable
                public Observable<BucketConfigResponse> call() {
                    return HttpRefresher.this.cluster().send(new BucketConfigRequest(HttpLoader.TERSE_PATH, str2, str, credential.username(), credential.password())).flatMap(new Func1<BucketConfigResponse, Observable<BucketConfigResponse>>() { // from class: com.couchbase.client.core.config.refresher.HttpRefresher.12.1
                        @Override // rx.functions.Func1
                        public Observable<BucketConfigResponse> call(BucketConfigResponse bucketConfigResponse) {
                            if (bucketConfigResponse.status().isSuccess()) {
                                HttpRefresher.LOGGER.debug("Successfully got config refresh from terse bucket remote.");
                                return Observable.just(bucketConfigResponse);
                            }
                            HttpRefresher.LOGGER.debug("Terse bucket config refresh failed, falling back to verbose.");
                            return HttpRefresher.this.cluster().send(new BucketConfigRequest(HttpLoader.VERBOSE_PATH, str2, str, credential.username(), credential.password()));
                        }
                    });
                }
            }).map(new Func1<BucketConfigResponse, ProposedBucketConfigContext>() { // from class: com.couchbase.client.core.config.refresher.HttpRefresher.11
                @Override // rx.functions.Func1
                public ProposedBucketConfigContext call(BucketConfigResponse bucketConfigResponse) {
                    return new ProposedBucketConfigContext(str, bucketConfigResponse.config().replace("$HOST", str2), str2);
                }
            }).doOnError(new Action1<Throwable>() { // from class: com.couchbase.client.core.config.refresher.HttpRefresher.10
                @Override // rx.functions.Action1
                public void call(Throwable th) {
                    HttpRefresher.LOGGER.debug("Could not fetch config from bucket \"" + str + "\" against \"" + str2 + "\".", th);
                }
            });
        }
        LOGGER.debug("Ignoring refresh attempt since it seems the bucket registration is gone (closed).");
        return Observable.empty();
    }

    private static boolean isValidConfigNode(boolean z, NodeInfo nodeInfo) {
        return (z && nodeInfo.sslServices().containsKey(ServiceType.CONFIG)) || nodeInfo.services().containsKey(ServiceType.CONFIG);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> void shiftNodeList(List<T> list) {
        int andIncrement = (int) (this.nodeOffset.getAndIncrement() % list.size());
        for (int i = 0; i < andIncrement; i++) {
            list.add(list.remove(0));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean allowedToPoll(String str) {
        Long l = this.lastPollTimestamps.get(str);
        return l == null || System.nanoTime() - l.longValue() >= this.pollFloorNs;
    }
}
