package com.beast.clog.agent.works.handler;

import com.beast.clog.agent.config.LogConfig;
import com.beast.clog.agent.works.ClogThreadFactory;
import com.beast.clog.agent.works.events.KafkaEvent;
import com.beast.clog.common.utils.NetAddressList;
import com.lmax.disruptor.EventHandler;
import java.util.Properties;
import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;

/* loaded from: input_file:com/beast/clog/agent/works/handler/KafkaEventHandler.class */
public class KafkaEventHandler implements EventHandler<KafkaEvent> {
    private static Logger logger = Logger.getLogger(KafkaEventHandler.class.getName());
    private static final int CLEAR_INTERVAL = LogConfig.getInstance().getChunkInterval();
    private Producer<String, byte[]> producer;
    private boolean kafkaSendEnable = true;
    private int kafkaResetTimer = 0;
    private int kafkaResetInterval = 1000;
    private final Thread kafkaResetThread = ClogThreadFactory.getInstance().newThread(new Runnable() { // from class: com.beast.clog.agent.works.handler.KafkaEventHandler.1
        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    KafkaEventHandler.this.kafkaResetTimer += KafkaEventHandler.this.kafkaResetInterval;
                    if (KafkaEventHandler.this.kafkaResetTimer > KafkaEventHandler.this.kafkaResetInterval * 10) {
                        KafkaEventHandler.this.kafkaSendEnable = true;
                        KafkaEventHandler.this.kafkaResetTimer = 0;
                    }
                    try {
                        Thread.sleep(KafkaEventHandler.this.kafkaResetInterval);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } catch (Exception e2) {
                    try {
                        Thread.sleep(KafkaEventHandler.this.kafkaResetInterval);
                    } catch (InterruptedException e3) {
                        e3.printStackTrace();
                    }
                } catch (Throwable th) {
                    try {
                        Thread.sleep(KafkaEventHandler.this.kafkaResetInterval);
                    } catch (InterruptedException e4) {
                        e4.printStackTrace();
                    }
                    throw th;
                }
            }
        }
    });

    public KafkaEventHandler() {
        this.kafkaResetThread.start();
        String brokerList = LogConfig.getInstance().getBrokerList();
        try {
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", new NetAddressList(brokerList).toString());
            properties.setProperty("client.id", String.valueOf(LogConfig.getInstance().getAppId()));
            this.producer = new KafkaProducer(properties, new StringSerializer(), new ByteArraySerializer());
            logger.log(Level.INFO, "Kafka client has been initialized successfully: " + brokerList);
        } catch (Exception e) {
            logger.log(Level.SEVERE, "Kafka client has been initialized failed: " + brokerList);
            e.printStackTrace();
        }
    }

    public void onEvent(KafkaEvent kafkaEvent, long j, boolean z) throws Exception {
        try {
            if (this.producer != null && this.kafkaSendEnable && kafkaEvent.getMessage().length > 0) {
                if (kafkaEvent.getTopic().equals("beast.clog")) {
                    this.producer.send(new ProducerRecord("beast.clog", UUID.randomUUID().toString(), kafkaEvent.getMessage()));
                } else if (kafkaEvent.getTopic().equals("beast.clog.env")) {
                    this.producer.send(new ProducerRecord("beast.clog.env", UUID.randomUUID().toString(), kafkaEvent.getMessage()));
                } else if (kafkaEvent.getTopic().equals("beast.metrics")) {
                    this.producer.send(new ProducerRecord("beast.metrics", UUID.randomUUID().toString(), kafkaEvent.getMessage()));
                }
            }
        } catch (Exception e) {
            this.kafkaSendEnable = false;
            logger.log(Level.SEVERE, "kafkaEventHandler onEvent error:" + e.getMessage());
        }
    }

    public void shutdown() {
        try {
            Thread.sleep(CLEAR_INTERVAL);
        } catch (Exception e) {
        }
        this.producer.close();
    }
}
