/*
 * Decompiled with CFR 0.152.
 */
package com.thebeastshop.datahub.client.kafka;

import com.thebeastshop.datahub.client.DatahubErrorHandler;
import com.thebeastshop.datahub.client.exception.DatahubKafkaException;
import com.thebeastshop.datahub.client.kafka.DatahubKafkaDriver;
import com.thebeastshop.datahub.client.utils.MessageUtil;
import com.thebeastshop.datahub.common.vo.ResponseMessage;
import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.errors.TopicExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.util.CollectionUtils;

public class ResponseMessageListener
implements MessageListener<String, ResponseMessage> {
    private static final Logger logger = LoggerFactory.getLogger(ResponseMessageListener.class);
    private final int RESPONSE_TOPIC_CREATE_MAX_RETRY_COUNT = 10;
    private final int RESPONSE_TOPIC_PARTITIONS = 1;
    private final DatahubKafkaDriver datahubKafkaDriver;
    private ConcurrentMessageListenerContainer kafkaContainer;
    private final String responseTopicName = MessageUtil.getResponseTopic();
    private final List<DatahubErrorHandler> errorHandlerList = new ArrayList<DatahubErrorHandler>();

    public String getResponseTopicName() {
        return this.responseTopicName;
    }

    public ResponseMessageListener(DatahubKafkaDriver datahubKafkaDriver) {
        this.datahubKafkaDriver = datahubKafkaDriver;
    }

    public void check() {
        this.checkAndCreateTopic(this.responseTopicName, 1);
    }

    public void startListen() {
        this.kafkaContainer = this.datahubKafkaDriver.startConsumer(this.responseTopicName, this.responseTopicName, this, 1);
    }

    public void addErrorHandler(DatahubErrorHandler errorHandler) {
        this.errorHandlerList.add(errorHandler);
    }

    public void checkAndCreateTopic(String topicName, int partitions) {
        try {
            this.datahubKafkaDriver.createTopic(topicName, partitions);
            logger.info("[Datahub] \u521b\u5efaTopic '" + topicName + "' \u6210\u529f");
        }
        catch (TopicExistsException e) {
            logger.info("[Datahub] \u521b\u5efaTopic '" + topicName + "' \uff08\u5df2\u5b58\u5728\uff09");
        }
        catch (Throwable th) {
            Throwable cause = th.getCause();
            if (cause instanceof TopicExistsException) {
                logger.info("[Datahub] \u521b\u5efaTopic '" + topicName + "' \uff08\u5df2\u5b58\u5728\uff09");
            }
            logger.error("[Datahub]: \u521b\u5efaTopic '" + topicName + "' \u5931\u8d25, " + th.getMessage());
            throw new DatahubKafkaException("\u521b\u5efaTopic '" + topicName + "' \u5931\u8d25", th);
        }
    }

    public void onMessage(ConsumerRecord<String, ResponseMessage> consumerRecord) {
        ResponseMessage message = (ResponseMessage)consumerRecord.value();
        logger.info("[DATAHUB] \u63a5\u6536\u5230 Response \u6d88\u606f, message id: {}", (Object)message.getMessageId());
        if (!CollectionUtils.isEmpty(this.errorHandlerList)) {
            for (DatahubErrorHandler errorHandler : this.errorHandlerList) {
                try {
                    errorHandler.processError(message);
                }
                catch (Throwable th) {
                    logger.error("[DATAHUB] Response\u9519\u8bef:", th);
                }
            }
        }
    }
}

