/*
 * Decompiled with CFR 0.152.
 */
package com.thebeastshop.datahub.client.kafka;

import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.config.ContainerProperties;

public class KafkaDriver {
    private static Map<String, KafkaDriver> kafkaDriverMap = new ConcurrentHashMap<String, KafkaDriver>();
    private String bootstrapServers;
    private KafkaTemplate kafkaTemplate;
    private String deserializer;
    private AdminClient adminClient;

    public KafkaDriver(KafkaTemplate kafkaTemplate, String bootstrapServers, String deserializer) {
        this.bootstrapServers = bootstrapServers;
        this.deserializer = deserializer;
        if (kafkaTemplate == null) {
            Map<String, Object> kafkaTemplateProperties = this.initKafkaTemplateProperties(bootstrapServers);
            DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(kafkaTemplateProperties);
            this.kafkaTemplate = new KafkaTemplate((ProducerFactory)producerFactory, true);
        } else {
            this.kafkaTemplate = kafkaTemplate;
        }
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers);
        this.adminClient = AdminClient.create((Properties)properties);
    }

    public ConcurrentMessageListenerContainer startConsumer(String topicName, String groupId, MessageListener listener, int concurrency) {
        Map<String, Object> propertiesMap = this.initKafkaConsumerProperties(groupId);
        DefaultKafkaConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory(propertiesMap);
        ContainerProperties containerProperties = new ContainerProperties(new String[]{topicName});
        containerProperties.setMessageListener((Object)listener);
        containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
        ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer((ConsumerFactory)consumerFactory, containerProperties);
        container.setConcurrency(concurrency);
        container.start();
        return container;
    }

    public void createTopic(String topicName, int numPartitions) throws Exception {
        NewTopic newTopic = new NewTopic(topicName, numPartitions, 1);
        CreateTopicsResult result = this.adminClient.createTopics((Collection)Lists.newArrayList((Object[])new NewTopic[]{newTopic}));
        KafkaFuture kafkaFuture = result.all();
        kafkaFuture.get();
    }

    public void removeTopic(String topicName) {
        this.adminClient.deleteTopics((Collection)Lists.newArrayList((Object[])new String[]{topicName}));
    }

    public void send(Object data, String topic) {
        this.kafkaTemplate.send(topic, null, (Object)JSON.toJSONString((Object)data));
    }

    public void send(Object key, Object data, String topic) {
        this.kafkaTemplate.send(topic, key, (Object)JSON.toJSONString((Object)data));
    }

    private Map<String, Object> initKafkaConsumerProperties(String groupId) {
        HashMap<String, Object> kafkaConsumerProperties = new HashMap<String, Object>();
        kafkaConsumerProperties.put("bootstrap.servers", this.bootstrapServers);
        kafkaConsumerProperties.put("group.id", groupId);
        kafkaConsumerProperties.put("enable.auto.commit", "false");
        kafkaConsumerProperties.put("session.timeout.ms", "60000");
        kafkaConsumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaConsumerProperties.put("value.deserializer", this.deserializer);
        kafkaConsumerProperties.put("max.poll.interval.ms", "1200000");
        return kafkaConsumerProperties;
    }

    private Map<String, Object> initKafkaTemplateProperties(String kafkaBootstrapServers) {
        HashMap<String, Object> kafkaTemplateProperties = new HashMap<String, Object>();
        kafkaTemplateProperties.put("bootstrap.servers", kafkaBootstrapServers);
        kafkaTemplateProperties.put("retries", 3);
        kafkaTemplateProperties.put("batch.size", 16384);
        kafkaTemplateProperties.put("linger.ms", 1);
        kafkaTemplateProperties.put("buffer.memory", 0x2000000);
        kafkaTemplateProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaTemplateProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return kafkaTemplateProperties;
    }

    public AdminClient getAdminClient() {
        return this.adminClient;
    }

    public void setAdminClient(AdminClient adminClient) {
        this.adminClient = adminClient;
    }

    public String getBootstrapServers() {
        return this.bootstrapServers;
    }
}

