package io.reactivex.netty.protocol.http.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
import io.reactivex.netty.client.ClientMetricsEvent;
import io.reactivex.netty.client.ConnectionReuseEvent;
import io.reactivex.netty.metrics.Clock;
import io.reactivex.netty.metrics.MetricEventsSubject;
import io.reactivex.netty.protocol.http.client.HttpClientMetricsEvent;
import io.reactivex.netty.util.MultipleFutureListener;
import rx.Observable;
import rx.Subscriber;
import rx.subjects.PublishSubject;

/* loaded from: input_file:io/reactivex/netty/protocol/http/client/ClientRequestResponseConverter.class */
public class ClientRequestResponseConverter extends ChannelDuplexHandler {
    public static final AttributeKey<Long> KEEP_ALIVE_TIMEOUT_MILLIS_ATTR = AttributeKey.valueOf("rxnetty_http_conn_keep_alive_timeout_millis");
    public static final AttributeKey<Boolean> DISCARD_CONNECTION = AttributeKey.valueOf("rxnetty_http_discard_connection");
    private final MetricEventsSubject<ClientMetricsEvent<?>> eventsSubject;
    private PublishSubject contentSubject = PublishSubject.create();
    private long responseReceiveStartTimeMillis;

    public ClientRequestResponseConverter(MetricEventsSubject<ClientMetricsEvent<?>> metricEventsSubject) {
        this.eventsSubject = metricEventsSubject;
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        Class<?> cls = obj.getClass();
        PublishSubject publishSubject = this.contentSubject;
        if (HttpResponse.class.isAssignableFrom(cls)) {
            this.responseReceiveStartTimeMillis = Clock.newStartTimeMillis();
            this.eventsSubject.onEvent(HttpClientMetricsEvent.RESPONSE_HEADER_RECEIVED);
            HttpClientResponse httpClientResponse = new HttpClientResponse((HttpResponse) obj, publishSubject);
            Long keepAliveTimeoutSeconds = httpClientResponse.getKeepAliveTimeoutSeconds();
            if (null != keepAliveTimeoutSeconds) {
                channelHandlerContext.channel().attr(KEEP_ALIVE_TIMEOUT_MILLIS_ATTR).set(Long.valueOf(keepAliveTimeoutSeconds.longValue() * 1000));
            }
            if (!httpClientResponse.getHeaders().isKeepAlive()) {
                channelHandlerContext.channel().attr(DISCARD_CONNECTION).set(true);
            }
            super.channelRead(channelHandlerContext, httpClientResponse);
        }
        if (!HttpContent.class.isAssignableFrom(cls)) {
            if (HttpResponse.class.isAssignableFrom(cls)) {
                return;
            }
            invokeContentOnNext(obj);
            return;
        }
        this.eventsSubject.onEvent(HttpClientMetricsEvent.RESPONSE_CONTENT_RECEIVED);
        ByteBuf content = ((ByteBufHolder) obj).content();
        if (content.isReadable()) {
            invokeContentOnNext(content);
        }
        if (LastHttpContent.class.isAssignableFrom(cls)) {
            this.eventsSubject.onEvent((MetricEventsSubject<ClientMetricsEvent<?>>) HttpClientMetricsEvent.RESPONSE_RECEIVE_COMPLETE, Clock.onEndMillis(this.responseReceiveStartTimeMillis));
            publishSubject.onCompleted();
        }
    }

    public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) throws Exception {
        if (!HttpClientRequest.class.isAssignableFrom(obj.getClass())) {
            channelHandlerContext.write(obj, channelPromise);
            return;
        }
        HttpClientRequest<?> httpClientRequest = (HttpClientRequest) obj;
        MultipleFutureListener multipleFutureListener = new MultipleFutureListener(channelPromise);
        Observable<ByteBuf> observable = null;
        switch (httpClientRequest.getContentSourceType()) {
            case Raw:
                if (!httpClientRequest.getHeaders().isContentLengthSet()) {
                    httpClientRequest.getHeaders().add("Transfer-Encoding", (Object) "chunked");
                }
                observable = httpClientRequest.getRawContentSource();
                break;
            case Typed:
                if (!httpClientRequest.getHeaders().isContentLengthSet()) {
                    httpClientRequest.getHeaders().add("Transfer-Encoding", (Object) "chunked");
                }
                observable = httpClientRequest.getContentSource();
                break;
            case Absent:
                if (!httpClientRequest.getHeaders().isContentLengthSet() && httpClientRequest.getMethod() != HttpMethod.GET) {
                    httpClientRequest.getHeaders().set("Content-Length", (Object) 0);
                    break;
                }
                break;
        }
        writeHttpHeaders(channelHandlerContext, httpClientRequest, multipleFutureListener);
        if (null == observable) {
            writeAContentChunk(channelHandlerContext, multipleFutureListener, new DefaultLastHttpContent());
            return;
        }
        if (!httpClientRequest.getHeaders().isContentLengthSet()) {
            httpClientRequest.getHeaders().add("Transfer-Encoding", (Object) "chunked");
        }
        writeContent(channelHandlerContext, multipleFutureListener, observable, channelPromise);
    }

    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (obj instanceof ConnectionReuseEvent) {
            this.contentSubject = PublishSubject.create();
        }
        super.userEventTriggered(channelHandlerContext, obj);
    }

    private void invokeContentOnNext(Object obj) {
        try {
            try {
                this.contentSubject.onNext(obj);
                ReferenceCountUtil.release(obj);
            } catch (ClassCastException e) {
                this.contentSubject.onError(e);
                ReferenceCountUtil.release(obj);
            }
        } catch (Throwable th) {
            ReferenceCountUtil.release(obj);
            throw th;
        }
    }

    private void writeHttpHeaders(ChannelHandlerContext channelHandlerContext, HttpClientRequest<?> httpClientRequest, MultipleFutureListener multipleFutureListener) {
        long newStartTimeMillis = Clock.newStartTimeMillis();
        this.eventsSubject.onEvent(HttpClientMetricsEvent.REQUEST_HEADERS_WRITE_START);
        ChannelFuture write = channelHandlerContext.write(httpClientRequest.getNettyRequest());
        addWriteCompleteEvents(write, newStartTimeMillis, HttpClientMetricsEvent.REQUEST_HEADERS_WRITE_SUCCESS, HttpClientMetricsEvent.REQUEST_HEADERS_WRITE_FAILED);
        multipleFutureListener.listen(write);
    }

    private void writeContent(final ChannelHandlerContext channelHandlerContext, final MultipleFutureListener multipleFutureListener, Observable<?> observable, final ChannelPromise channelPromise) {
        observable.subscribe(new Subscriber<Object>() { // from class: io.reactivex.netty.protocol.http.client.ClientRequestResponseConverter.1
            public void onCompleted() {
                ClientRequestResponseConverter.this.writeAContentChunk(channelHandlerContext, multipleFutureListener, new DefaultLastHttpContent());
            }

            public void onError(Throwable th) {
                multipleFutureListener.cancelPendingFutures(true);
                channelPromise.tryFailure(th);
            }

            public void onNext(Object obj) {
                ClientRequestResponseConverter.this.writeAContentChunk(channelHandlerContext, multipleFutureListener, obj);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void writeAContentChunk(ChannelHandlerContext channelHandlerContext, MultipleFutureListener multipleFutureListener, Object obj) {
        this.eventsSubject.onEvent(HttpClientMetricsEvent.REQUEST_CONTENT_WRITE_START);
        long newStartTimeMillis = Clock.newStartTimeMillis();
        ChannelFuture write = channelHandlerContext.write(obj);
        addWriteCompleteEvents(write, newStartTimeMillis, HttpClientMetricsEvent.REQUEST_CONTENT_WRITE_SUCCESS, HttpClientMetricsEvent.REQUEST_CONTENT_WRITE_FAILED);
        multipleFutureListener.listen(write);
    }

    private void addWriteCompleteEvents(ChannelFuture channelFuture, final long j, final HttpClientMetricsEvent<HttpClientMetricsEvent.EventType> httpClientMetricsEvent, final HttpClientMetricsEvent<HttpClientMetricsEvent.EventType> httpClientMetricsEvent2) {
        channelFuture.addListener(new ChannelFutureListener() { // from class: io.reactivex.netty.protocol.http.client.ClientRequestResponseConverter.2
            public void operationComplete(ChannelFuture channelFuture2) throws Exception {
                if (channelFuture2.isSuccess()) {
                    ClientRequestResponseConverter.this.eventsSubject.onEvent((MetricEventsSubject) httpClientMetricsEvent, Clock.onEndMillis(j));
                } else {
                    ClientRequestResponseConverter.this.eventsSubject.onEvent((MetricEventsSubject) httpClientMetricsEvent2, Clock.onEndMillis(j), channelFuture2.cause());
                }
            }
        });
    }
}
