package com.couchbase.client.core.endpoint.config;

import com.couchbase.client.core.ResponseEvent;
import com.couchbase.client.core.endpoint.AbstractEndpoint;
import com.couchbase.client.core.endpoint.AbstractGenericHandler;
import com.couchbase.client.core.endpoint.ResponseStatusConverter;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.CouchbaseResponse;
import com.couchbase.client.core.message.ResponseStatus;
import com.couchbase.client.core.message.config.BucketConfigRequest;
import com.couchbase.client.core.message.config.BucketConfigResponse;
import com.couchbase.client.core.message.config.BucketStreamingRequest;
import com.couchbase.client.core.message.config.BucketStreamingResponse;
import com.couchbase.client.core.message.config.BucketsConfigRequest;
import com.couchbase.client.core.message.config.BucketsConfigResponse;
import com.couchbase.client.core.message.config.ClusterConfigRequest;
import com.couchbase.client.core.message.config.ClusterConfigResponse;
import com.couchbase.client.core.message.config.ConfigRequest;
import com.couchbase.client.core.message.config.FlushRequest;
import com.couchbase.client.core.message.config.FlushResponse;
import com.couchbase.client.core.message.config.GetDesignDocumentsRequest;
import com.couchbase.client.core.message.config.GetDesignDocumentsResponse;
import com.couchbase.client.core.message.config.GetUsersRequest;
import com.couchbase.client.core.message.config.GetUsersResponse;
import com.couchbase.client.core.message.config.InsertBucketRequest;
import com.couchbase.client.core.message.config.InsertBucketResponse;
import com.couchbase.client.core.message.config.RemoveBucketRequest;
import com.couchbase.client.core.message.config.RemoveBucketResponse;
import com.couchbase.client.core.message.config.RemoveUserRequest;
import com.couchbase.client.core.message.config.RemoveUserResponse;
import com.couchbase.client.core.message.config.RestApiRequest;
import com.couchbase.client.core.message.config.RestApiResponse;
import com.couchbase.client.core.message.config.UpdateBucketRequest;
import com.couchbase.client.core.message.config.UpdateBucketResponse;
import com.couchbase.client.core.message.config.UpsertUserRequest;
import com.couchbase.client.core.message.config.UpsertUserResponse;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.deps.com.lmax.disruptor.EventSink;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.buffer.Unpooled;
import com.couchbase.client.deps.io.netty.channel.ChannelHandlerContext;
import com.couchbase.client.deps.io.netty.handler.codec.http.DefaultFullHttpRequest;
import com.couchbase.client.deps.io.netty.handler.codec.http.HttpContent;
import com.couchbase.client.deps.io.netty.handler.codec.http.HttpHeaders;
import com.couchbase.client.deps.io.netty.handler.codec.http.HttpMethod;
import com.couchbase.client.deps.io.netty.handler.codec.http.HttpObject;
import com.couchbase.client.deps.io.netty.handler.codec.http.HttpRequest;
import com.couchbase.client.deps.io.netty.handler.codec.http.HttpResponse;
import com.couchbase.client.deps.io.netty.handler.codec.http.HttpVersion;
import com.couchbase.client.deps.io.netty.handler.codec.http.LastHttpContent;
import com.couchbase.client.deps.io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.RejectedExecutionException;
import rx.Observable;
import rx.subjects.BehaviorSubject;

/* loaded from: input_file:core-io-1.2.3.jar:com/couchbase/client/core/endpoint/config/ConfigHandler.class */
public class ConfigHandler extends AbstractGenericHandler<HttpObject, HttpRequest, ConfigRequest> {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance((Class<?>) ConfigHandler.class);
    private HttpResponse responseHeader;
    private ByteBuf responseContent;
    private BehaviorSubject<String> streamingConfigObservable;

    public ConfigHandler(AbstractEndpoint abstractEndpoint, EventSink<ResponseEvent> eventSink, boolean z, boolean z2) {
        super(abstractEndpoint, eventSink, z, z2);
    }

    ConfigHandler(AbstractEndpoint abstractEndpoint, EventSink<ResponseEvent> eventSink, Queue<ConfigRequest> queue, boolean z, boolean z2) {
        super(abstractEndpoint, eventSink, queue, z, z2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.couchbase.client.core.endpoint.AbstractGenericHandler
    public HttpRequest encodeRequest(ChannelHandlerContext channelHandlerContext, ConfigRequest configRequest) throws Exception {
        if (configRequest instanceof RestApiRequest) {
            return encodeRestApiRequest(channelHandlerContext, (RestApiRequest) configRequest);
        }
        HttpMethod httpMethod = HttpMethod.GET;
        if ((configRequest instanceof FlushRequest) || (configRequest instanceof InsertBucketRequest) || (configRequest instanceof UpdateBucketRequest)) {
            httpMethod = HttpMethod.POST;
        } else if (configRequest instanceof UpsertUserRequest) {
            httpMethod = HttpMethod.PUT;
        } else if ((configRequest instanceof RemoveBucketRequest) || (configRequest instanceof RemoveUserRequest)) {
            httpMethod = HttpMethod.DELETE;
        }
        ByteBuf copiedBuffer = configRequest instanceof InsertBucketRequest ? Unpooled.copiedBuffer(((InsertBucketRequest) configRequest).payload(), CharsetUtil.UTF_8) : configRequest instanceof UpdateBucketRequest ? Unpooled.copiedBuffer(((UpdateBucketRequest) configRequest).payload(), CharsetUtil.UTF_8) : configRequest instanceof UpsertUserRequest ? Unpooled.copiedBuffer(((UpsertUserRequest) configRequest).payload(), CharsetUtil.UTF_8) : Unpooled.EMPTY_BUFFER;
        DefaultFullHttpRequest defaultFullHttpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, httpMethod, configRequest.path(), copiedBuffer);
        defaultFullHttpRequest.headers().set("User-Agent", (Object) env().userAgent());
        if ((configRequest instanceof InsertBucketRequest) || (configRequest instanceof UpdateBucketRequest) || (configRequest instanceof UpsertUserRequest)) {
            defaultFullHttpRequest.headers().set("Accept", (Object) "*/*");
            defaultFullHttpRequest.headers().set("Content-Type", (Object) HttpHeaders.Values.APPLICATION_X_WWW_FORM_URLENCODED);
        }
        defaultFullHttpRequest.headers().set("Content-Length", (Object) Integer.valueOf(copiedBuffer.readableBytes()));
        defaultFullHttpRequest.headers().set("Host", (Object) remoteHttpHost(channelHandlerContext));
        addHttpBasicAuth(channelHandlerContext, defaultFullHttpRequest, configRequest.username(), configRequest.password());
        return defaultFullHttpRequest;
    }

    private HttpRequest encodeRestApiRequest(ChannelHandlerContext channelHandlerContext, RestApiRequest restApiRequest) {
        HttpMethod method = restApiRequest.method();
        ByteBuf copiedBuffer = Unpooled.copiedBuffer(restApiRequest.body(), CharsetUtil.UTF_8);
        DefaultFullHttpRequest defaultFullHttpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, method, restApiRequest.pathWithParameters(), copiedBuffer);
        defaultFullHttpRequest.headers().set("User-Agent", (Object) env().userAgent());
        defaultFullHttpRequest.headers().set("Host", (Object) remoteHttpHost(channelHandlerContext));
        for (Map.Entry<String, Object> entry : restApiRequest.headers().entrySet()) {
            defaultFullHttpRequest.headers().set(entry.getKey(), entry.getValue());
        }
        defaultFullHttpRequest.headers().set("Content-Length", (Object) Integer.valueOf(copiedBuffer.readableBytes()));
        addHttpBasicAuth(channelHandlerContext, defaultFullHttpRequest, restApiRequest.username(), restApiRequest.password());
        return defaultFullHttpRequest;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.couchbase.client.core.endpoint.AbstractGenericHandler
    public CouchbaseResponse decodeResponse(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {
        ConfigRequest currentRequest = currentRequest();
        CouchbaseResponse couchbaseResponse = null;
        if (httpObject instanceof HttpResponse) {
            this.responseHeader = (HttpResponse) httpObject;
            if (currentRequest instanceof BucketStreamingRequest) {
                couchbaseResponse = handleBucketStreamingResponse(channelHandlerContext, this.responseHeader);
            }
            if (this.responseContent != null) {
                this.responseContent.clear();
            } else {
                this.responseContent = channelHandlerContext.alloc().buffer();
            }
        }
        if (httpObject instanceof HttpContent) {
            this.responseContent.writeBytes(((HttpContent) httpObject).content());
            if (this.streamingConfigObservable != null) {
                maybePushConfigChunk();
            }
        }
        if (httpObject instanceof LastHttpContent) {
            if (currentRequest instanceof BucketStreamingRequest) {
                if (this.streamingConfigObservable != null) {
                    this.streamingConfigObservable.onCompleted();
                    this.streamingConfigObservable = null;
                }
                finishedDecoding();
                return null;
            }
            ResponseStatus fromHttp = ResponseStatusConverter.fromHttp(this.responseHeader.getStatus().code());
            String byteBuf = this.responseContent.readableBytes() > 0 ? this.responseContent.toString(CHARSET) : this.responseHeader.getStatus().reasonPhrase();
            if (currentRequest instanceof BucketConfigRequest) {
                couchbaseResponse = new BucketConfigResponse(byteBuf, fromHttp);
            } else if (currentRequest instanceof ClusterConfigRequest) {
                couchbaseResponse = new ClusterConfigResponse(byteBuf, fromHttp);
            } else if (currentRequest instanceof BucketsConfigRequest) {
                couchbaseResponse = new BucketsConfigResponse(byteBuf, fromHttp);
            } else if (currentRequest instanceof GetDesignDocumentsRequest) {
                couchbaseResponse = new GetDesignDocumentsResponse(byteBuf, fromHttp, currentRequest);
            } else if (currentRequest instanceof RemoveBucketRequest) {
                couchbaseResponse = new RemoveBucketResponse(fromHttp);
            } else if (currentRequest instanceof InsertBucketRequest) {
                couchbaseResponse = new InsertBucketResponse(byteBuf, fromHttp);
            } else if (currentRequest instanceof UpdateBucketRequest) {
                couchbaseResponse = new UpdateBucketResponse(byteBuf, fromHttp);
            } else if (currentRequest instanceof FlushRequest) {
                couchbaseResponse = new FlushResponse(this.responseHeader.getStatus().code() != 201, byteBuf, fromHttp);
            } else if (currentRequest instanceof GetUsersRequest) {
                couchbaseResponse = new GetUsersResponse(byteBuf, fromHttp, currentRequest);
            } else if (currentRequest instanceof UpsertUserRequest) {
                couchbaseResponse = new UpsertUserResponse(byteBuf, fromHttp);
            } else if (currentRequest instanceof RemoveUserRequest) {
                couchbaseResponse = new RemoveUserResponse(fromHttp);
            } else if (currentRequest instanceof RestApiRequest) {
                couchbaseResponse = new RestApiResponse((RestApiRequest) currentRequest, this.responseHeader.getStatus(), this.responseHeader.headers(), byteBuf);
            }
            finishedDecoding();
        }
        return couchbaseResponse;
    }

    private CouchbaseResponse handleBucketStreamingResponse(ChannelHandlerContext channelHandlerContext, HttpResponse httpResponse) {
        SocketAddress remoteAddress = channelHandlerContext.channel().remoteAddress();
        String hostAddress = remoteAddress instanceof InetSocketAddress ? ((InetSocketAddress) remoteAddress).getAddress().getHostAddress() : remoteAddress.toString();
        ResponseStatus fromHttp = ResponseStatusConverter.fromHttp(httpResponse.getStatus().code());
        Observable<String> observable = null;
        if (fromHttp.isSuccess()) {
            this.streamingConfigObservable = BehaviorSubject.create();
            observable = this.streamingConfigObservable.onBackpressureBuffer().observeOn(env().scheduler());
        }
        return new BucketStreamingResponse(observable, hostAddress, fromHttp, currentRequest());
    }

    private void maybePushConfigChunk() {
        String byteBuf = this.responseContent.toString(CHARSET);
        int indexOf = byteBuf.indexOf("\n\n\n\n");
        if (indexOf > 0) {
            this.streamingConfigObservable.onNext(byteBuf.substring(0, indexOf).trim());
            this.responseContent.clear();
            this.responseContent.writeBytes(byteBuf.substring(indexOf + 4).getBytes(CHARSET));
        }
    }

    private void releaseResponseContent() {
        if (this.responseContent != null) {
            if (this.responseContent.refCnt() > 0) {
                this.responseContent.release();
            }
            this.responseContent = null;
        }
    }

    @Override // com.couchbase.client.core.endpoint.AbstractGenericHandler
    protected void finishedDecoding() {
        super.finishedDecoding();
        releaseResponseContent();
    }

    @Override // com.couchbase.client.core.endpoint.AbstractGenericHandler, com.couchbase.client.deps.io.netty.channel.ChannelHandlerAdapter, com.couchbase.client.deps.io.netty.channel.ChannelHandler
    public void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (this.streamingConfigObservable != null) {
            try {
                this.streamingConfigObservable.onCompleted();
            } catch (RejectedExecutionException e) {
                LOGGER.info("{}Could not complete config stream, scheduler shut down already.", logIdent(channelHandlerContext, endpoint()));
            }
        }
        super.handlerRemoved(channelHandlerContext);
        releaseResponseContent();
    }

    @Override // com.couchbase.client.core.endpoint.AbstractGenericHandler
    protected ServiceType serviceType() {
        return ServiceType.CONFIG;
    }
}
