/*
 * Decompiled with CFR 0.152.
 */
package com.beast.clog.agent.works.handler;

import com.beast.clog.agent.config.LogConfig;
import com.beast.clog.agent.works.ChunkBuilder;
import com.beast.clog.agent.works.ClogThreadFactory;
import com.beast.clog.agent.works.events.MetricsEvent;
import com.beast.clog.agent.works.producers.KafkaMessageProducer;
import com.lmax.disruptor.EventHandler;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;

public class MetricsEventHandler
implements EventHandler<MetricsEvent> {
    private static Logger logger = Logger.getLogger(MetricsEventHandler.class.getName());
    private static final int CLEAR_INTERVAL = LogConfig.getInstance().getChunkInterval();
    private static final int MAX_CHUNK_SIZE = LogConfig.getInstance().getChunkSize();
    private ChunkBuilder chunkBuilder = new ChunkBuilder();
    private AtomicBoolean running = new AtomicBoolean(true);
    private AtomicLong start = new AtomicLong(System.currentTimeMillis());
    private final KafkaMessageProducer kafkaMessageProducer;
    private final Thread clearThread = ClogThreadFactory.getInstance().newThread(new Runnable(){

        @Override
        public void run() {
            while (MetricsEventHandler.this.running.get()) {
                try {
                    if (System.currentTimeMillis() - MetricsEventHandler.this.start.get() < (long)CLEAR_INTERVAL || MetricsEventHandler.this.chunkBuilder.getChunkSize() <= 0) continue;
                    MetricsEventHandler.this.clear();
                }
                catch (Exception e) {}
                continue;
                finally {
                    try {
                        Thread.sleep(CLEAR_INTERVAL);
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    });
    private final ThreadLocal<TSerializer> serializer;

    public MetricsEventHandler(KafkaMessageProducer kafkaMessageProducer) {
        this.clearThread.start();
        this.serializer = new ThreadLocal<TSerializer>(){

            @Override
            protected TSerializer initialValue() {
                return new TSerializer();
            }
        };
        this.kafkaMessageProducer = kafkaMessageProducer;
    }

    public void onEvent(MetricsEvent event, long sequence, boolean endOfBatch) throws Exception {
        try {
            this.chunkBuilder.putMsg(event.getMetrics());
            if (this.chunkBuilder.getChunkSize() >= MAX_CHUNK_SIZE) {
                this.clear();
            }
        }
        catch (Exception e) {
            logger.log(Level.SEVERE, "metricsEventHandler onEvent error:" + e.getMessage());
        }
    }

    private synchronized void clear() {
        try {
            byte[] bytes = this.serializer.get().serialize((TBase)this.chunkBuilder.getChunk());
            this.kafkaMessageProducer.setData("beast.metrics", bytes);
        }
        catch (TException e) {
            logger.log(Level.SEVERE, "Serialize metrics chunk to bytes failure", e);
        }
        finally {
            this.chunkBuilder.clear();
            this.start = new AtomicLong(System.currentTimeMillis());
        }
    }

    public void shutdown() {
        this.running.set(false);
        try {
            Thread.sleep(CLEAR_INTERVAL);
        }
        catch (Exception exception) {
            // empty catch block
        }
        this.clearThread.interrupt();
        if (this.chunkBuilder.getChunkSize() > 0) {
            this.clear();
        }
        this.serializer.remove();
    }
}

