/*
 * Decompiled with CFR 0.152.
 */
package com.beast.clog.agent.works.handler;

import com.beast.clog.agent.Metrics;
import com.beast.clog.agent.config.LogConfig;
import com.beast.clog.agent.works.ChunkBuilder;
import com.beast.clog.agent.works.ClogThreadFactory;
import com.beast.clog.agent.works.events.TBaseEvent;
import com.beast.clog.agent.works.producers.KafkaMessageProducer;
import com.beast.clog.models.thrift.Chunk;
import com.beast.clog.models.thrift.Span;
import com.lmax.disruptor.EventHandler;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;

public class TBaseEventHandler
implements EventHandler<TBaseEvent> {
    private static Logger logger = Logger.getLogger(TBaseEventHandler.class.getName());
    private Lock eventLock = new ReentrantLock();
    private static final int CLEAR_INTERVAL = LogConfig.getInstance().getChunkInterval();
    private static final int MAX_CHUNK_SIZE = LogConfig.getInstance().getChunkSize();
    private ChunkBuilder chunkBuilder = new ChunkBuilder();
    private AtomicBoolean running = new AtomicBoolean(true);
    private AtomicLong start = new AtomicLong(System.currentTimeMillis());
    private final KafkaMessageProducer kafkaMessageProducer;
    private final Thread clearThread = ClogThreadFactory.getInstance().newThread(new Runnable(){

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (TBaseEventHandler.this.running.get()) {
                try {
                    if (System.currentTimeMillis() - TBaseEventHandler.this.start.get() >= (long)CLEAR_INTERVAL && TBaseEventHandler.this.chunkBuilder.getChunkSize() > 0 && TBaseEventHandler.this.eventLock.tryLock()) {
                        try {
                            TBaseEventHandler.this.clear();
                        }
                        finally {
                            TBaseEventHandler.this.eventLock.unlock();
                        }
                    }
                    Thread.sleep(CLEAR_INTERVAL);
                }
                catch (Exception exception) {}
            }
        }
    });
    private final ThreadLocal<TSerializer> serializer;

    public TBaseEventHandler(KafkaMessageProducer kafkaMessageProducer) {
        this.clearThread.start();
        this.serializer = new ThreadLocal<TSerializer>(){

            @Override
            protected TSerializer initialValue() {
                return new TSerializer();
            }
        };
        this.kafkaMessageProducer = kafkaMessageProducer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onEvent(TBaseEvent event, long sequence, boolean endOfBatch) throws Exception {
        Metrics.instance().getPollQueueCounter().incrementAndGet();
        this.chunkBuilder.putMsg(event.getBase());
        if (this.chunkBuilder.getChunkSize() >= MAX_CHUNK_SIZE && this.eventLock.tryLock() && this.chunkBuilder.getChunkSize() >= MAX_CHUNK_SIZE) {
            try {
                this.clear();
            }
            finally {
                this.eventLock.unlock();
            }
        }
    }

    public byte[] serializer(Chunk chunk) {
        try {
            return this.serializer.get().serialize((TBase)chunk);
        }
        catch (TException e) {
            e.printStackTrace();
            return new byte[0];
        }
    }

    private synchronized void clear() {
        try {
            Metrics.instance().getSendLogEventCounter().addAndGet(this.chunkBuilder.getChunk().getLogEventsSize());
            Metrics.instance().getSpanCounter().addAndGet(this.chunkBuilder.getChunk().getSpansSize());
            if (this.chunkBuilder.getChunk().getSpansSize() > 0) {
                for (Span span : this.chunkBuilder.getChunk().getSpans()) {
                    Metrics.instance().getSendSpanLogEventCounter().addAndGet(span.getLogEventsSize());
                }
            }
            byte[] bytes = this.serializer.get().serialize((TBase)this.chunkBuilder.getChunk());
            this.kafkaMessageProducer.setData("beast.clog", bytes);
            this.chunkBuilder.clear();
            this.start = new AtomicLong(System.currentTimeMillis());
        }
        catch (TException e) {
            logger.log(Level.SEVERE, "Serialize chunk to bytes failure", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        this.running.set(false);
        try {
            Thread.sleep(CLEAR_INTERVAL);
        }
        catch (Exception exception) {
            // empty catch block
        }
        this.clearThread.interrupt();
        if (this.chunkBuilder.getChunkSize() > 0 && this.eventLock.tryLock() && this.chunkBuilder.getChunkSize() > 0) {
            try {
                this.clear();
            }
            finally {
                this.eventLock.unlock();
            }
        }
        this.serializer.remove();
    }
}

