/*
 * Decompiled with CFR 0.152.
 */
package com.thebeastshop.datahub.client.kafka;

import com.thebeastshop.datahub.client.exception.DatahubKafkaException;
import com.thebeastshop.datahub.client.kafka.KafkaDriver;
import com.thebeastshop.datahub.client.utils.DatahubCallback;
import com.thebeastshop.datahub.client.utils.MessageUtil;
import com.thebeastshop.datahub.common.vo.ResponseMessage;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.errors.TopicExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;

public class ResponseMessageListener
implements MessageListener<String, ResponseMessage> {
    private static final Logger logger = LoggerFactory.getLogger(ResponseMessageListener.class);
    private final int RESPONSE_TOPIC_CREATE_MAX_RETRY_COUNT = 10;
    private final int RESPONSE_TOPIC_PARTITIONS = 1;
    private final KafkaDriver kafkaDriver;
    private ConcurrentMessageListenerContainer kafkaContainer;
    private final String responseTopicName = MessageUtil.getResponseTopic();
    private ConcurrentHashMap<String, DatahubCallback> cachedCallback = new ConcurrentHashMap();

    public String getResponseTopicName() {
        return this.responseTopicName;
    }

    public void putCallback(String transId, DatahubCallback callback) {
        if (StringUtils.isNotEmpty((CharSequence)transId) && callback != null) {
            this.cachedCallback.put(transId, callback);
        }
    }

    public void removeCachedCallback(String transId) {
        this.cachedCallback.remove(transId);
    }

    public ResponseMessageListener(KafkaDriver kafkaDriver) {
        this.kafkaDriver = kafkaDriver;
        this.checkAndCreateTopic(this.responseTopicName, 1);
        this.kafkaContainer = kafkaDriver.startConsumer(this.responseTopicName, this.responseTopicName, this, 1);
    }

    public void checkAndCreateTopic(String topicName, int partitions) {
        try {
            this.kafkaDriver.createTopic(topicName, partitions);
            logger.info("[Datahub] \u521b\u5efaTopic '" + topicName + "' \u6210\u529f");
        }
        catch (TopicExistsException e) {
            logger.info("[Datahub] Topic '" + topicName + "' \u5df2\u5b58\u5728");
        }
        catch (Throwable th) {
            Throwable cause = th.getCause();
            if (cause instanceof TopicExistsException) {
                logger.info("[Datahub] Topic '" + topicName + "' \u5df2\u5b58\u5728");
            }
            logger.error("[Datahub]: \u521b\u5efaTopic '" + topicName + "' \u5931\u8d25, " + th.getMessage());
            throw new DatahubKafkaException("\u521b\u5efaTopic '" + topicName + "' \u5931\u8d25", th);
        }
    }

    public void onMessage(ConsumerRecord<String, ResponseMessage> consumerRecord) {
        try {
            ResponseMessage message = (ResponseMessage)consumerRecord.value();
            DatahubCallback callback = this.cachedCallback.get(message.getTransId());
            if (callback != null) {
                this.cachedCallback.remove(message.getTransId());
                callback.onResponse(message);
            }
        }
        catch (Throwable th) {
            logger.error("[DATAHUB]Response\u9519\u8bef\uff1a " + th);
        }
    }
}

