package com.beast.clog.agent.works.handler;

import com.beast.clog.agent.Metrics;
import com.beast.clog.agent.config.LogConfig;
import com.beast.clog.agent.works.ChunkBuilder;
import com.beast.clog.agent.works.ClogThreadFactory;
import com.beast.clog.agent.works.events.TBaseEvent;
import com.beast.clog.agent.works.producers.KafkaMessageProducer;
import com.beast.clog.models.thrift.Chunk;
import com.lmax.disruptor.EventHandler;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;

/* loaded from: input_file:com/beast/clog/agent/works/handler/TBaseEventHandler.class */
public class TBaseEventHandler implements EventHandler<TBaseEvent> {
    private static Logger logger = Logger.getLogger(TBaseEventHandler.class.getName());
    private static final int CLEAR_INTERVAL = LogConfig.getInstance().getChunkInterval();
    private static final int MAX_CHUNK_SIZE = LogConfig.getInstance().getChunkSize();
    private final KafkaMessageProducer kafkaMessageProducer;
    private final ThreadLocal<TSerializer> serializer;
    private ChunkBuilder chunkBuilder = new ChunkBuilder();
    private AtomicBoolean running = new AtomicBoolean(true);
    private AtomicLong start = new AtomicLong(System.currentTimeMillis());
    private final Thread clearThread = ClogThreadFactory.getInstance().newThread(new Runnable() { // from class: com.beast.clog.agent.works.handler.TBaseEventHandler.1
        @Override // java.lang.Runnable
        public void run() {
            while (TBaseEventHandler.this.running.get()) {
                try {
                    if (System.currentTimeMillis() - TBaseEventHandler.this.start.get() >= TBaseEventHandler.CLEAR_INTERVAL && TBaseEventHandler.this.chunkBuilder.getChunkSize() > 0) {
                        TBaseEventHandler.this.clear();
                    }
                    try {
                        Thread.sleep(TBaseEventHandler.CLEAR_INTERVAL);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } catch (Exception e2) {
                    try {
                        Thread.sleep(TBaseEventHandler.CLEAR_INTERVAL);
                    } catch (InterruptedException e3) {
                        e3.printStackTrace();
                    }
                } catch (Throwable th) {
                    try {
                        Thread.sleep(TBaseEventHandler.CLEAR_INTERVAL);
                    } catch (InterruptedException e4) {
                        e4.printStackTrace();
                    }
                    throw th;
                }
            }
        }
    });

    public TBaseEventHandler(KafkaMessageProducer kafkaMessageProducer) {
        this.clearThread.start();
        this.serializer = new ThreadLocal<TSerializer>() { // from class: com.beast.clog.agent.works.handler.TBaseEventHandler.2
            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.lang.ThreadLocal
            public TSerializer initialValue() {
                return new TSerializer();
            }
        };
        this.kafkaMessageProducer = kafkaMessageProducer;
    }

    public void onEvent(TBaseEvent tBaseEvent, long j, boolean z) throws Exception {
        Metrics.instance().getPollQueueCounter().incrementAndGet();
        this.chunkBuilder.putMsg(tBaseEvent.getBase());
        if (this.chunkBuilder.getChunkSize() >= MAX_CHUNK_SIZE) {
            clear();
        }
    }

    public byte[] serializer(Chunk chunk) {
        try {
            return this.serializer.get().serialize(chunk);
        } catch (TException e) {
            e.printStackTrace();
            return new byte[0];
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void clear() {
        try {
            this.kafkaMessageProducer.setData("beast.clog", this.serializer.get().serialize(this.chunkBuilder.getChunk()));
        } catch (TException e) {
            logger.log(Level.SEVERE, "Serialize chunk to bytes failure", e);
        } finally {
            this.chunkBuilder.clear();
            this.start = new AtomicLong(System.currentTimeMillis());
        }
    }

    public void shutdown() {
        this.running.set(false);
        try {
            Thread.sleep(CLEAR_INTERVAL);
        } catch (Exception e) {
        }
        this.clearThread.interrupt();
        if (this.chunkBuilder.getChunkSize() > 0) {
            clear();
        }
        this.serializer.remove();
    }
}
