/*
 * Decompiled with CFR 0.152.
 */
package com.thebeastshop.kit.kafka.utils;

import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.thebeastshop.kit.prop.PropConstants;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.config.ContainerProperties;

public class KafkaDriver {
    private static KafkaDriver kafkaDriver;
    private String bootstrapServers;
    private KafkaTemplate kafkaTemplate;
    private AdminClient adminClient;
    private String customTopic;

    public static KafkaDriver drive(String bootstrapServers) {
        if (kafkaDriver == null) {
            kafkaDriver = new KafkaDriver(bootstrapServers, true);
        }
        return kafkaDriver;
    }

    public static KafkaDriver reDrive(String bootstrapServers) {
        kafkaDriver = new KafkaDriver(bootstrapServers, true);
        return kafkaDriver;
    }

    public static KafkaDriver newDriverWithTopic(String bootstrapServers, String topic) {
        KafkaDriver kafkaDriver = new KafkaDriver(bootstrapServers, false);
        kafkaDriver.setCustomTopic(topic);
        return kafkaDriver;
    }

    public static KafkaDriver newDriver(String bootstrapServers) {
        return new KafkaDriver(bootstrapServers, false);
    }

    public void setDefaultTopic(String defaultTopic) {
        if (this.kafkaTemplate == null) {
            throw new RuntimeException("please init kafka driver first");
        }
        this.kafkaTemplate.setDefaultTopic(defaultTopic);
    }

    public KafkaDriver(String bootstrapServers) {
        this(bootstrapServers, false);
    }

    public KafkaDriver(String bootstrapServers, boolean isInitAdminClient) {
        this.bootstrapServers = bootstrapServers;
        Map<String, Object> kafkaTemplateProperties = this.initKafkaTemplateProperties(bootstrapServers);
        DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(kafkaTemplateProperties);
        this.kafkaTemplate = new KafkaTemplate((ProducerFactory)producerFactory, true);
        if (isInitAdminClient) {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", bootstrapServers);
            properties.put("client.id", PropConstants.getAppId());
            this.adminClient = AdminClient.create((Properties)properties);
        }
    }

    public ConcurrentMessageListenerContainer startConsumer(String topicName, String groupId, MessageListener listener, int concurrency, AbstractMessageListenerContainer.AckMode ackMode) {
        Map<String, Object> propertiesMap = this.initKafkaConsumerProperties(groupId);
        DefaultKafkaConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory(propertiesMap);
        ContainerProperties containerProperties = new ContainerProperties(new String[]{topicName});
        containerProperties.setMessageListener((Object)listener);
        containerProperties.setAckMode(ackMode == null ? AbstractMessageListenerContainer.AckMode.BATCH : ackMode);
        ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer((ConsumerFactory)consumerFactory, containerProperties);
        container.setConcurrency(concurrency);
        container.start();
        return container;
    }

    public void createTopic(String topicName, int numPartitions) throws Exception {
        NewTopic newTopic = new NewTopic(topicName, numPartitions, 1);
        CreateTopicsResult result = this.adminClient.createTopics((Collection)Lists.newArrayList((Object[])new NewTopic[]{newTopic}));
        KafkaFuture kafkaFuture = result.all();
        kafkaFuture.get();
    }

    public void removeTopic(String topicName) {
        this.adminClient.deleteTopics((Collection)Lists.newArrayList((Object[])new String[]{topicName}));
    }

    public void send(Object data, String topic) {
        this.kafkaTemplate.send(topic, null, (Object)JSON.toJSONString((Object)data));
    }

    public void send(Object data) {
        this.kafkaTemplate.sendDefault(data);
    }

    public void sendWithTopic(Object data) {
        this.kafkaTemplate.send(this.getCustomTopic(), null, (Object)JSON.toJSONString((Object)data));
    }

    private Map<String, Object> initKafkaConsumerProperties(String groupId) {
        HashMap<String, Object> kafkaConsumerProperties = new HashMap<String, Object>();
        kafkaConsumerProperties.put("bootstrap.servers", this.bootstrapServers);
        kafkaConsumerProperties.put("group.id", groupId);
        kafkaConsumerProperties.put("enable.auto.commit", "false");
        kafkaConsumerProperties.put("session.timeout.ms", "60000");
        kafkaConsumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaConsumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaConsumerProperties.put("max.poll.interval.ms", "1200000");
        kafkaConsumerProperties.put("client.id", PropConstants.getAppId() + "_" + new Random().nextInt(1000));
        return kafkaConsumerProperties;
    }

    private Map<String, Object> initKafkaTemplateProperties(String kafkaBootstrapServers) {
        HashMap<String, Object> kafkaTemplateProperties = new HashMap<String, Object>();
        kafkaTemplateProperties.put("bootstrap.servers", kafkaBootstrapServers);
        kafkaTemplateProperties.put("retries", 3);
        kafkaTemplateProperties.put("batch.size", 16384);
        kafkaTemplateProperties.put("linger.ms", 1);
        kafkaTemplateProperties.put("buffer.memory", 0x2000000);
        kafkaTemplateProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaTemplateProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaTemplateProperties.put("client.id", PropConstants.getAppId() + "_" + new Random().nextInt(1000));
        return kafkaTemplateProperties;
    }

    public AdminClient getAdminClient() {
        return this.adminClient;
    }

    public void setAdminClient(AdminClient adminClient) {
        this.adminClient = adminClient;
    }

    public KafkaTemplate getKafkaTemplate() {
        return this.kafkaTemplate;
    }

    public void setKafkaTemplate(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public String getCustomTopic() {
        return this.customTopic;
    }

    public void setCustomTopic(String customTopic) {
        this.customTopic = customTopic;
    }
}

