package io.reactivex.netty.protocol.http.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.ReferenceCountUtil;
import io.reactivex.netty.metrics.Clock;
import io.reactivex.netty.metrics.MetricEventsSubject;
import io.reactivex.netty.protocol.http.server.HttpServerMetricsEvent;
import io.reactivex.netty.server.ServerMetricsEvent;
import rx.subjects.PublishSubject;

/* loaded from: input_file:io/reactivex/netty/protocol/http/server/ServerRequestResponseConverter.class */
public class ServerRequestResponseConverter extends ChannelDuplexHandler {
    private final MetricEventsSubject<ServerMetricsEvent<?>> eventsSubject;

    public ServerRequestResponseConverter(MetricEventsSubject<ServerMetricsEvent<?>> metricEventsSubject) {
        this.eventsSubject = metricEventsSubject;
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        Class<?> cls = obj.getClass();
        PublishSubject create = PublishSubject.create();
        if (HttpRequest.class.isAssignableFrom(cls)) {
            this.eventsSubject.onEvent(HttpServerMetricsEvent.REQUEST_HEADERS_RECEIVED);
            super.channelRead(channelHandlerContext, new HttpServerRequest((HttpRequest) obj, create));
        }
        if (!HttpContent.class.isAssignableFrom(cls)) {
            invokeContentOnNext(obj, create);
            return;
        }
        ByteBuf content = ((ByteBufHolder) obj).content();
        this.eventsSubject.onEvent(HttpServerMetricsEvent.REQUEST_CONTENT_RECEIVED);
        invokeContentOnNext(content, create);
        if (LastHttpContent.class.isAssignableFrom(cls)) {
            create.onCompleted();
        }
    }

    public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) throws Exception {
        Class<?> cls = obj.getClass();
        long newStartTimeMillis = Clock.newStartTimeMillis();
        if (HttpServerResponse.class.isAssignableFrom(cls)) {
            this.eventsSubject.onEvent(HttpServerMetricsEvent.RESPONSE_HEADERS_WRITE_START);
            addWriteCompleteEvents(channelPromise, newStartTimeMillis, HttpServerMetricsEvent.RESPONSE_HEADERS_WRITE_SUCCESS, HttpServerMetricsEvent.RESPONSE_HEADERS_WRITE_FAILED);
            super.write(channelHandlerContext, ((HttpServerResponse) obj).getNettyResponse(), channelPromise);
        } else {
            if (!ByteBuf.class.isAssignableFrom(cls)) {
                super.write(channelHandlerContext, obj, channelPromise);
                return;
            }
            this.eventsSubject.onEvent(HttpServerMetricsEvent.RESPONSE_CONTENT_WRITE_START);
            addWriteCompleteEvents(channelPromise, newStartTimeMillis, HttpServerMetricsEvent.RESPONSE_CONTENT_WRITE_SUCCESS, HttpServerMetricsEvent.RESPONSE_CONTENT_WRITE_FAILED);
            super.write(channelHandlerContext, new DefaultHttpContent((ByteBuf) obj), channelPromise);
        }
    }

    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelReadComplete(channelHandlerContext);
        channelHandlerContext.pipeline().flush();
    }

    private void addWriteCompleteEvents(ChannelPromise channelPromise, final long j, final HttpServerMetricsEvent<HttpServerMetricsEvent.EventType> httpServerMetricsEvent, final HttpServerMetricsEvent<HttpServerMetricsEvent.EventType> httpServerMetricsEvent2) {
        channelPromise.addListener(new ChannelFutureListener() { // from class: io.reactivex.netty.protocol.http.server.ServerRequestResponseConverter.1
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    ServerRequestResponseConverter.this.eventsSubject.onEvent((MetricEventsSubject) httpServerMetricsEvent, Clock.onEndMillis(j));
                } else {
                    ServerRequestResponseConverter.this.eventsSubject.onEvent((MetricEventsSubject) httpServerMetricsEvent2, Clock.onEndMillis(j), channelFuture.cause());
                }
            }
        });
    }

    private static void invokeContentOnNext(Object obj, PublishSubject publishSubject) {
        try {
            try {
                publishSubject.onNext(obj);
                ReferenceCountUtil.release(obj);
            } catch (ClassCastException e) {
                publishSubject.onError(e);
                ReferenceCountUtil.release(obj);
            }
        } catch (Throwable th) {
            ReferenceCountUtil.release(obj);
            throw th;
        }
    }
}
