package com.thebeastshop.dts.sdk;

import com.thebeastshop.dts.kafka.KafkaDriver;
import com.thebeastshop.dts.sdk.annotation.DTSTable;
import com.thebeastshop.dts.sdk.exception.DTSConnectionRefusedException;
import com.thebeastshop.dts.sdk.exception.EmptyTopicException;
import com.thebeastshop.dts.sdk.handler.DTSHandler;
import com.thebeastshop.dts.sdk.handler.RetryDTSHandler;
import com.thebeastshop.dts.sdk.utils.DTSMeta;
import com.thebeastshop.dts.vo.RegisterInfo;
import com.thebeastshop.dts.vo.RegisterItem;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

/* loaded from: input_file:com/thebeastshop/dts/sdk/DTSKafkaManager.class */
public class DTSKafkaManager {
    private static Logger log = LoggerFactory.getLogger(DTSKafkaManager.class);
    private static DTSKafkaManager dtsKafkaManager;
    private List<ConcurrentMessageListenerContainer> kafkaContainerList = new ArrayList();
    private ConcurrentMessageListenerContainer retryKafkaContainer;
    private DTSRecycleSender recycleSender;

    public static DTSKafkaManager load() {
        if (dtsKafkaManager == null) {
            dtsKafkaManager = new DTSKafkaManager();
        }
        return dtsKafkaManager;
    }

    public static DTSRecycleSender loadRecycleSender() {
        return load().getRecycleSender();
    }

    public void initRecycleSenderIfNeed(RegisterInfo registerInfo) {
        if (this.recycleSender != null) {
            return;
        }
        this.recycleSender = new DTSRecycleSender(registerInfo.getKafkaBootstrapServers(), registerInfo.getRecycleTopic());
        log.info("[BEAST-DTS]成功建立回收通道{}", registerInfo.getRecycleTopic());
    }

    public void initSubscriberContainer(RegisterInfo registerInfo, DTSHandler dTSHandler) {
        DTSTable dTSTable = (DTSTable) dTSHandler.getClass().getAnnotation(DTSTable.class);
        for (RegisterItem registerItem : registerInfo.getRegisterItems()) {
            if (registerItem.getTableName().equals(dTSTable.value())) {
                if (StringUtils.isBlank(registerItem.getTopic())) {
                    throw new EmptyTopicException(MessageFormat.format("[BEAST-DTS]数据订阅处理器[{0}]没有获取到相关TOPIC", dTSHandler.getClass().getSimpleName()));
                }
                this.kafkaContainerList.add(KafkaDriver.drive(registerInfo.getKafkaBootstrapServers()).startConsumer(registerItem.getTopic(), DTSMeta.load().getAppName(), dTSHandler, dTSTable.concurrent()));
                log.info("[BEAST-DTS]成功为{}建立数据订阅通道{}", dTSHandler.getClass().getSimpleName(), registerItem.getTopic());
                return;
            }
        }
        throw new DTSConnectionRefusedException(MessageFormat.format("[BEAST-DTS]数据订阅处理器[{0}]没有成功连接到数据通道", dTSHandler.getClass().getSimpleName()));
    }

    public void initRetryContainerIfNeed(RegisterInfo registerInfo) {
        if (this.retryKafkaContainer != null) {
            return;
        }
        this.retryKafkaContainer = KafkaDriver.drive(registerInfo.getKafkaBootstrapServers()).startConsumer(registerInfo.getRetryTopic(), DTSMeta.load().getAppName(), new RetryDTSHandler(), 1);
        log.info("[BEAST-DTS]成功建立重试通道{}", registerInfo.getRetryTopic());
    }

    public void initAllKafkaContainer(RegisterInfo registerInfo) {
        Iterator<ConcurrentMessageListenerContainer> it = this.kafkaContainerList.iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
        this.kafkaContainerList.clear();
        Iterator it2 = registerInfo.getRegisterItems().iterator();
        while (it2.hasNext()) {
            DTSHandler handler = DTSHandlerManager.getHandler(((RegisterItem) it2.next()).getTableName());
            if (handler != null) {
                initSubscriberContainer(registerInfo, handler);
            }
        }
    }

    public DTSRecycleSender getRecycleSender() {
        return this.recycleSender;
    }
}
