package com.thebeastshop.datahub.client.kafka;

import com.thebeastshop.datahub.client.DatahubErrorHandler;
import com.thebeastshop.datahub.client.exception.DatahubKafkaException;
import com.thebeastshop.datahub.client.utils.MessageUtil;
import com.thebeastshop.datahub.common.vo.ResponseMessage;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.errors.TopicExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/thebeastshop/datahub/client/kafka/ResponseMessageListener.class */
public class ResponseMessageListener implements MessageListener<String, ResponseMessage> {
    private static final Logger logger = LoggerFactory.getLogger(ResponseMessageListener.class);
    private final DatahubKafkaDriver datahubKafkaDriver;
    private ConcurrentMessageListenerContainer kafkaContainer;
    private final int RESPONSE_TOPIC_CREATE_MAX_RETRY_COUNT = 10;
    private final int RESPONSE_TOPIC_PARTITIONS = 1;
    private final String responseTopicName = MessageUtil.getResponseTopic();
    private final List<DatahubErrorHandler> errorHandlerList = new ArrayList();

    public String getResponseTopicName() {
        return this.responseTopicName;
    }

    public ResponseMessageListener(DatahubKafkaDriver datahubKafkaDriver) {
        this.datahubKafkaDriver = datahubKafkaDriver;
    }

    public void check() {
        checkAndCreateTopic(this.responseTopicName, 1);
    }

    public void startListen() {
        this.kafkaContainer = this.datahubKafkaDriver.startConsumer(this.responseTopicName, this.responseTopicName, this, 1);
    }

    public void addErrorHandler(DatahubErrorHandler datahubErrorHandler) {
        this.errorHandlerList.add(datahubErrorHandler);
    }

    public void checkAndCreateTopic(String str, int i) {
        try {
            this.datahubKafkaDriver.createTopic(str, i);
            logger.info("[Datahub] 创建Topic '" + str + "' 成功");
        } catch (TopicExistsException e) {
            logger.info("[Datahub] 创建Topic '" + str + "' （已存在）");
        } catch (Throwable th) {
            if (th.getCause() instanceof TopicExistsException) {
                logger.info("[Datahub] 创建Topic '" + str + "' （已存在）");
            } else {
                logger.error("[Datahub]: 创建Topic '" + str + "' 失败, " + th.getMessage());
                throw new DatahubKafkaException("创建Topic '" + str + "' 失败", th);
            }
        }
    }

    public void onMessage(ConsumerRecord<String, ResponseMessage> consumerRecord) {
        ResponseMessage responseMessage = (ResponseMessage) consumerRecord.value();
        logger.info("[DATAHUB] 接收到 Response 消息, message id: {}", responseMessage.getMessageId());
        if (CollectionUtils.isEmpty(this.errorHandlerList)) {
            return;
        }
        Iterator<DatahubErrorHandler> it = this.errorHandlerList.iterator();
        while (it.hasNext()) {
            try {
                it.next().processError(responseMessage);
            } catch (Throwable th) {
                logger.error("[DATAHUB] Response错误:", th);
            }
        }
    }
}
