/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.endpoint.dcp;

import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.annotations.InterfaceAudience;
import com.couchbase.client.core.annotations.InterfaceStability;
import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.message.ResponseStatus;
import com.couchbase.client.core.message.cluster.GetClusterConfigRequest;
import com.couchbase.client.core.message.cluster.GetClusterConfigResponse;
import com.couchbase.client.core.message.dcp.DCPMessage;
import com.couchbase.client.core.message.dcp.DCPRequest;
import com.couchbase.client.core.message.dcp.FailoverLogEntry;
import com.couchbase.client.core.message.dcp.GetFailoverLogRequest;
import com.couchbase.client.core.message.dcp.GetFailoverLogResponse;
import com.couchbase.client.core.message.dcp.GetLastCheckpointRequest;
import com.couchbase.client.core.message.dcp.GetLastCheckpointResponse;
import com.couchbase.client.core.message.dcp.StreamCloseRequest;
import com.couchbase.client.core.message.dcp.StreamCloseResponse;
import com.couchbase.client.core.message.dcp.StreamEndMessage;
import com.couchbase.client.core.message.dcp.StreamRequestRequest;
import com.couchbase.client.core.message.dcp.StreamRequestResponse;
import com.couchbase.client.core.message.kv.MutationToken;
import com.couchbase.client.core.utils.UnicastAutoReleaseSubject;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.channel.ChannelHandlerContext;
import com.couchbase.client.deps.io.netty.handler.codec.memcache.binary.BinaryMemcacheRequest;
import com.couchbase.client.deps.io.netty.handler.codec.memcache.binary.DefaultBinaryMemcacheRequest;
import com.couchbase.client.deps.io.netty.util.Attribute;
import com.couchbase.client.deps.io.netty.util.AttributeKey;
import com.couchbase.client.deps.io.netty.util.internal.ConcurrentSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;

@InterfaceStability.Experimental
@InterfaceAudience.Public
public class DCPConnection {
    private static final AttributeKey<Integer> CONSUMED_BYTES = AttributeKey.newInstance("CONSUMED_BYTES");
    private static final int MINIMUM_HEADER_SIZE = 24;
    private final SerializedSubject<DCPRequest, DCPRequest> subject;
    private final Set<Short> streams;
    private final ClusterFacade core;
    private final String bucket;
    private final String password;
    private final CoreEnvironment env;
    private final ConcurrentMap<Short, ChannelHandlerContext> contexts;

    public DCPConnection(CoreEnvironment env, ClusterFacade core, String bucket, String password) {
        this(env, core, bucket, password, UnicastAutoReleaseSubject.create(env.autoreleaseAfter(), TimeUnit.MILLISECONDS, env.scheduler()).toSerialized());
    }

    public DCPConnection(CoreEnvironment env, ClusterFacade core, String bucket, String password, SerializedSubject<DCPRequest, DCPRequest> subject) {
        this.env = env;
        this.core = core;
        this.subject = subject;
        this.bucket = bucket;
        this.password = password;
        this.streams = new ConcurrentSet<Short>();
        this.contexts = new ConcurrentHashMap<Short, ChannelHandlerContext>();
    }

    public String bucket() {
        return this.bucket;
    }

    public Subject<DCPRequest, DCPRequest> subject() {
        return this.subject;
    }

    public Observable<ResponseStatus> addStream(short partition) {
        return this.addStream(partition, 0L, 0L, -1L, 0L, 0L);
    }

    public Observable<ResponseStatus> addStream(final short partition, final long vbucketUUID, final long startSequenceNumber, final long endSequenceNumber, final long snapshotStartSequenceNumber, final long snapshotEndSequenceNumber) {
        if (this.streams.contains(partition)) {
            return Observable.just(ResponseStatus.EXISTS);
        }
        final DCPConnection connection = this;
        return Observable.defer(new Func0<Observable<StreamRequestResponse>>(){

            @Override
            public Observable<StreamRequestResponse> call() {
                return DCPConnection.this.core.send(new StreamRequestRequest(partition, vbucketUUID, startSequenceNumber, endSequenceNumber, snapshotStartSequenceNumber, snapshotEndSequenceNumber, DCPConnection.this.bucket, DCPConnection.this.password, connection));
            }
        }).flatMap(new Func1<StreamRequestResponse, Observable<StreamRequestResponse>>(){

            @Override
            public Observable<StreamRequestResponse> call(StreamRequestResponse response) {
                long rollbackSequenceNumber;
                switch (response.status()) {
                    case RANGE_ERROR: {
                        rollbackSequenceNumber = 0L;
                        break;
                    }
                    case ROLLBACK: {
                        rollbackSequenceNumber = response.rollbackToSequenceNumber();
                        break;
                    }
                    default: {
                        return Observable.just(response);
                    }
                }
                return DCPConnection.this.core.send(new StreamRequestRequest(partition, vbucketUUID, rollbackSequenceNumber, endSequenceNumber, rollbackSequenceNumber, snapshotEndSequenceNumber, DCPConnection.this.bucket, DCPConnection.this.password, connection));
            }
        }).map(new Func1<StreamRequestResponse, ResponseStatus>(){

            @Override
            public ResponseStatus call(StreamRequestResponse response) {
                if (response.status() == ResponseStatus.SUCCESS) {
                    DCPConnection.this.streams.add(partition);
                }
                return response.status();
            }
        });
    }

    public Observable<ResponseStatus> removeStream(final short partition) {
        if (!this.streams.contains(partition)) {
            return Observable.just(ResponseStatus.NOT_EXISTS);
        }
        return Observable.defer(new Func0<Observable<StreamCloseResponse>>(){

            @Override
            public Observable<StreamCloseResponse> call() {
                return DCPConnection.this.core.send(new StreamCloseRequest(partition, DCPConnection.this.bucket, DCPConnection.this.password));
            }
        }).map(new Func1<StreamCloseResponse, ResponseStatus>(){

            @Override
            public ResponseStatus call(StreamCloseResponse response) {
                if (response.status() == ResponseStatus.SUCCESS) {
                    DCPConnection.this.streams.remove(partition);
                }
                return response.status();
            }
        });
    }

    public Observable<MutationToken> getCurrentState() {
        return this.partitionSize().flatMap(new Func1<Integer, Observable<MutationToken>>(){

            @Override
            public Observable<MutationToken> call(Integer numPartitions) {
                return Observable.range(0, numPartitions).flatMap(new Func1<Integer, Observable<GetFailoverLogResponse>>(){

                    @Override
                    public Observable<GetFailoverLogResponse> call(Integer partition) {
                        return DCPConnection.this.core.send(new GetFailoverLogRequest(partition.shortValue(), DCPConnection.this.bucket));
                    }
                }).flatMap(new Func1<GetFailoverLogResponse, Observable<MutationToken>>(){

                    @Override
                    public Observable<MutationToken> call(final GetFailoverLogResponse failoverLogsResponse) {
                        final FailoverLogEntry entry = failoverLogsResponse.failoverLog().get(0);
                        return DCPConnection.this.core.send(new GetLastCheckpointRequest(failoverLogsResponse.partition(), DCPConnection.this.bucket)).map(new Func1<GetLastCheckpointResponse, MutationToken>(){

                            @Override
                            public MutationToken call(GetLastCheckpointResponse lastCheckpointResponse) {
                                return new MutationToken(failoverLogsResponse.partition(), entry.vbucketUUID(), lastCheckpointResponse.sequenceNumber(), DCPConnection.this.bucket);
                            }
                        });
                    }
                });
            }
        });
    }

    public void consumed(DCPMessage event) {
        this.consumed(event.partition(), event.totalBodyLength());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void consumed(short partition, int delta) {
        if (this.env.dcpConnectionBufferSize() > 0) {
            ChannelHandlerContext ctx = (ChannelHandlerContext)this.contexts.get(partition);
            if (ctx == null) {
                return;
            }
            ChannelHandlerContext channelHandlerContext = ctx;
            synchronized (channelHandlerContext) {
                Attribute<Integer> attr = ctx.attr(CONSUMED_BYTES);
                Integer consumedBytes = attr.get();
                if (consumedBytes == null) {
                    consumedBytes = 0;
                }
                if ((double)(consumedBytes = Integer.valueOf(consumedBytes + (24 + delta))).intValue() >= (double)this.env.dcpConnectionBufferSize() * this.env.dcpConnectionBufferAckThreshold()) {
                    ctx.writeAndFlush(this.createBufferAcknowledgmentRequest(ctx, consumedBytes));
                    consumedBytes = 0;
                }
                attr.set(consumedBytes);
            }
        }
    }

    void streamClosed(short partition, StreamEndMessage.Reason reason) {
        this.streams.remove(partition);
    }

    private Observable<Integer> partitionSize() {
        return this.core.send(new GetClusterConfigRequest()).map(new Func1<GetClusterConfigResponse, Integer>(){

            @Override
            public Integer call(GetClusterConfigResponse response) {
                CouchbaseBucketConfig config = (CouchbaseBucketConfig)response.config().bucketConfig(DCPConnection.this.bucket);
                return config.numberOfPartitions();
            }
        });
    }

    void registerContext(short partition, ChannelHandlerContext ctx) {
        this.contexts.put(partition, ctx);
    }

    private BinaryMemcacheRequest createBufferAcknowledgmentRequest(ChannelHandlerContext ctx, int bufferBytes) {
        ByteBuf extras = ctx.alloc().buffer(4).writeInt(bufferBytes);
        DefaultBinaryMemcacheRequest request = new DefaultBinaryMemcacheRequest(new byte[0], extras);
        request.setOpcode((byte)93);
        request.setExtrasLength((byte)extras.readableBytes());
        request.setTotalBodyLength(extras.readableBytes());
        return request;
    }
}

