package com.beast.clog.agent.works.handler;

import com.beast.clog.agent.config.LogConfig;
import com.beast.clog.agent.works.events.KafkaEvent;
import com.beast.clog.common.Constants;
import com.beast.clog.common.utils.NetAddressList;
import com.lmax.disruptor.EventHandler;
import java.util.Properties;
import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;

/* loaded from: input_file:com/beast/clog/agent/works/handler/KafkaEventHandler.class */
public class KafkaEventHandler implements EventHandler<KafkaEvent> {
    private static Logger logger = Logger.getLogger(KafkaEventHandler.class.getName());
    private static final int CLEAR_INTERVAL = 500;
    private Producer<String, byte[]> producer;

    public KafkaEventHandler() {
        String brokerList = LogConfig.getInstance().getBrokerList();
        try {
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", new NetAddressList(brokerList).toString());
            this.producer = new KafkaProducer(properties, new StringSerializer(), new ByteArraySerializer());
            logger.log(Level.INFO, "Kafka client has been initialized successfully: " + brokerList);
        } catch (Exception e) {
            logger.log(Level.SEVERE, "Kafka client has been initialized failed: " + brokerList);
            e.printStackTrace();
        }
    }

    public void onEvent(KafkaEvent kafkaEvent, long j, boolean z) throws Exception {
        if (kafkaEvent.getMessage().length > 0) {
            if (kafkaEvent.getType().equals(Constants.MSG_TOPIC)) {
                this.producer.send(new ProducerRecord(kafkaEvent.getType(), UUID.randomUUID().toString(), kafkaEvent.getMessage()));
                logger.info("日志消息发送kafka");
            } else if (kafkaEvent.getType().equals("beast.clog.env")) {
                this.producer.send(new ProducerRecord("beast.clog.env", UUID.randomUUID().toString(), kafkaEvent.getMessage()));
                logger.info("环境消息发送kafka");
            }
        }
    }

    public void shutdown() {
        try {
            Thread.sleep(500L);
        } catch (Exception e) {
        }
        this.producer.close();
    }
}
