/*
 * Decompiled with CFR 0.152.
 */
package com.beast.clog.agent.works.handler;

import com.beast.clog.agent.config.LogConfig;
import com.beast.clog.agent.works.events.KafkaEvent;
import com.beast.clog.common.utils.NetAddressList;
import com.lmax.disruptor.EventHandler;
import java.util.Properties;
import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;

public class KafkaEventHandler
implements EventHandler<KafkaEvent> {
    private static Logger logger = Logger.getLogger(KafkaEventHandler.class.getName());
    private static final int CLEAR_INTERVAL = LogConfig.getInstance().getChunkInterval();
    private Producer<String, byte[]> producer;

    public KafkaEventHandler() {
        String brokerList = LogConfig.getInstance().getBrokerList();
        try {
            Properties props = new Properties();
            NetAddressList addressList = new NetAddressList(brokerList);
            props.setProperty("bootstrap.servers", addressList.toString());
            this.producer = new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new ByteArraySerializer());
            logger.log(Level.INFO, "Kafka client has been initialized successfully: " + brokerList);
        }
        catch (Exception e) {
            logger.log(Level.SEVERE, "Kafka client has been initialized failed: " + brokerList);
            e.printStackTrace();
        }
    }

    public void onEvent(KafkaEvent event, long sequence, boolean endOfBatch) throws Exception {
        if (event.getMessage().length > 0) {
            if (event.getTopic().equals("beast.clog")) {
                this.producer.send(new ProducerRecord("beast.clog", (Object)UUID.randomUUID().toString(), (Object)event.getMessage()));
            } else if (event.getTopic().equals("beast.clog.env")) {
                this.producer.send(new ProducerRecord("beast.clog.env", (Object)UUID.randomUUID().toString(), (Object)event.getMessage()));
            } else if (event.getTopic().equals("beast.metrics")) {
                this.producer.send(new ProducerRecord("beast.metrics", (Object)UUID.randomUUID().toString(), (Object)event.getMessage()));
            }
        }
    }

    public void shutdown() {
        try {
            Thread.sleep(CLEAR_INTERVAL);
        }
        catch (Exception exception) {
            // empty catch block
        }
        this.producer.close();
    }
}

