/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.dcp;

import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.annotations.InterfaceAudience;
import com.couchbase.client.core.annotations.InterfaceStability;
import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.dcp.BucketStreamAggregatorState;
import com.couchbase.client.core.dcp.BucketStreamState;
import com.couchbase.client.core.endpoint.dcp.DCPConnection;
import com.couchbase.client.core.message.cluster.GetClusterConfigRequest;
import com.couchbase.client.core.message.cluster.GetClusterConfigResponse;
import com.couchbase.client.core.message.dcp.DCPRequest;
import com.couchbase.client.core.message.dcp.FailoverLogEntry;
import com.couchbase.client.core.message.dcp.GetFailoverLogRequest;
import com.couchbase.client.core.message.dcp.GetFailoverLogResponse;
import com.couchbase.client.core.message.dcp.OpenConnectionRequest;
import com.couchbase.client.core.message.dcp.OpenConnectionResponse;
import com.couchbase.client.core.message.dcp.StreamRequestRequest;
import com.couchbase.client.core.message.dcp.StreamRequestResponse;
import com.couchbase.client.core.message.kv.GetAllMutationTokensRequest;
import com.couchbase.client.core.message.kv.GetAllMutationTokensResponse;
import com.couchbase.client.core.message.kv.MutationToken;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import rx.Observable;
import rx.functions.Action2;
import rx.functions.Func0;
import rx.functions.Func1;

@InterfaceStability.Experimental
@InterfaceAudience.Public
public class BucketStreamAggregator {
    public static String DEFAULT_CONNECTION_NAME = "jvmCore";
    private final ClusterFacade core;
    private final String bucket;
    private final String name;
    private final AtomicReference<DCPConnection> connection = new AtomicReference();

    public BucketStreamAggregator(ClusterFacade core, String bucket) {
        this(DEFAULT_CONNECTION_NAME, core, bucket);
    }

    public BucketStreamAggregator(String name, ClusterFacade core, String bucket) {
        this.core = core;
        this.bucket = bucket;
        this.name = name;
    }

    public String name() {
        return this.name;
    }

    public Observable<DCPRequest> feed() {
        BucketStreamAggregatorState state = new BucketStreamAggregatorState();
        short s = this.partitionSize().toBlocking().first().intValue();
        for (short partition = 0; partition < s; partition = (short)(partition + 1)) {
            state.put(new BucketStreamState(partition, 0L, 0L, -1L, 0L, -1L));
        }
        return this.feed(state);
    }

    public Observable<DCPRequest> feed(final BucketStreamAggregatorState aggregatorState) {
        return this.open().flatMap(new Func1<DCPConnection, Observable<DCPRequest>>(){

            @Override
            public Observable<DCPRequest> call(DCPConnection response) {
                return Observable.from(aggregatorState).flatMap(new Func1<BucketStreamState, Observable<StreamRequestResponse>>(){

                    @Override
                    public Observable<StreamRequestResponse> call(final BucketStreamState feed) {
                        Observable<StreamRequestResponse> res = BucketStreamAggregator.this.core.send(new StreamRequestRequest(feed.partition(), feed.vbucketUUID(), feed.startSequenceNumber(), feed.endSequenceNumber(), feed.snapshotStartSequenceNumber(), feed.snapshotEndSequenceNumber(), BucketStreamAggregator.this.bucket));
                        return res.flatMap(new Func1<StreamRequestResponse, Observable<StreamRequestResponse>>(){

                            @Override
                            public Observable<StreamRequestResponse> call(StreamRequestResponse response) {
                                long rollbackSequenceNumber;
                                switch (response.status()) {
                                    case RANGE_ERROR: {
                                        rollbackSequenceNumber = 0L;
                                        break;
                                    }
                                    case ROLLBACK: {
                                        rollbackSequenceNumber = response.rollbackToSequenceNumber();
                                        break;
                                    }
                                    default: {
                                        return Observable.just(response);
                                    }
                                }
                                return BucketStreamAggregator.this.core.send(new StreamRequestRequest(feed.partition(), feed.vbucketUUID(), rollbackSequenceNumber, feed.endSequenceNumber(), rollbackSequenceNumber, feed.snapshotEndSequenceNumber(), BucketStreamAggregator.this.bucket));
                            }
                        });
                    }
                }).toList().flatMap(new Func1<List<StreamRequestResponse>, Observable<DCPRequest>>(){

                    @Override
                    public Observable<DCPRequest> call(List<StreamRequestResponse> streamRequestResponses) {
                        return ((DCPConnection)BucketStreamAggregator.this.connection.get()).subject();
                    }
                });
            }
        });
    }

    public Observable<BucketStreamAggregatorState> getCurrentState() {
        return this.open().flatMap(new Func1<DCPConnection, Observable<GetAllMutationTokensResponse>>(){

            @Override
            public Observable<GetAllMutationTokensResponse> call(DCPConnection response) {
                return BucketStreamAggregator.this.core.send(new GetAllMutationTokensRequest(BucketStreamAggregator.this.bucket));
            }
        }).flatMap(new Func1<GetAllMutationTokensResponse, Observable<BucketStreamAggregatorState>>(){

            @Override
            public Observable<BucketStreamAggregatorState> call(GetAllMutationTokensResponse response) {
                BucketStreamAggregatorState state = new BucketStreamAggregatorState();
                for (MutationToken token : response.mutationTokens()) {
                    state.put(new BucketStreamState((short)token.vbucketID(), token.vbucketUUID(), token.sequenceNumber()));
                }
                return Observable.just(state);
            }
        }).flatMap(new Func1<BucketStreamAggregatorState, Observable<BucketStreamAggregatorState>>(){

            @Override
            public Observable<BucketStreamAggregatorState> call(final BucketStreamAggregatorState aggregatorState) {
                return Observable.from(aggregatorState).flatMap(new Func1<BucketStreamState, Observable<GetFailoverLogResponse>>(){

                    @Override
                    public Observable<GetFailoverLogResponse> call(BucketStreamState streamState) {
                        return BucketStreamAggregator.this.core.send(new GetFailoverLogRequest(streamState.partition(), BucketStreamAggregator.this.bucket));
                    }
                }).collect(new Func0<BucketStreamAggregatorState>(){

                    @Override
                    public BucketStreamAggregatorState call() {
                        return aggregatorState;
                    }
                }, new Action2<BucketStreamAggregatorState, GetFailoverLogResponse>(){

                    @Override
                    public void call(BucketStreamAggregatorState state, GetFailoverLogResponse response) {
                        FailoverLogEntry entry = response.failoverLog().get(0);
                        state.put(new BucketStreamState(response.partition(), entry.vbucketUUID(), state.get(response.partition()).startSequenceNumber()));
                    }
                });
            }
        });
    }

    private Observable<DCPConnection> open() {
        if (this.connection.get() == null) {
            return this.core.send(new OpenConnectionRequest(this.name, this.bucket)).flatMap(new Func1<OpenConnectionResponse, Observable<DCPConnection>>(){

                @Override
                public Observable<DCPConnection> call(OpenConnectionResponse response) {
                    BucketStreamAggregator.this.connection.compareAndSet(null, response.connection());
                    return Observable.just(BucketStreamAggregator.this.connection.get());
                }
            });
        }
        return Observable.just(this.connection.get());
    }

    private Observable<Integer> partitionSize() {
        return this.core.send(new GetClusterConfigRequest()).map(new Func1<GetClusterConfigResponse, Integer>(){

            @Override
            public Integer call(GetClusterConfigResponse response) {
                CouchbaseBucketConfig config = (CouchbaseBucketConfig)response.config().bucketConfig(BucketStreamAggregator.this.bucket);
                return config.numberOfPartitions();
            }
        });
    }
}

