package com.thebeastshop.dts.sender;

import com.thebeastshop.common.validation.Validation;
import com.thebeastshop.dts.data.DataRule;
import com.thebeastshop.dts.domain.AppDomain;
import com.thebeastshop.dts.domain.DataRuleDomain;
import com.thebeastshop.dts.domain.RecordDomain;
import com.thebeastshop.dts.kafka.KafkaDriver;
import com.thebeastshop.dts.record.DTSRecord;
import com.thebeastshop.dts.record.DTSRetryRecord;
import com.thebeastshop.dts.vo.DTSApp;
import com.thebeastshop.kit.prop.annotation.DynamicPropValue;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/thebeastshop/dts/sender/SenderManagement.class */
public class SenderManagement {
    private static final Logger logger = LoggerFactory.getLogger(SenderManagement.class);
    private Map<String, DataSender> sendersMap = new ConcurrentHashMap();
    private Map<String, DataSender> localSendersMap = new ConcurrentHashMap();

    @DynamicPropValue("kafka.address")
    private String kafkaAddress;

    @DynamicPropValue("local.kafka.address")
    private String localKafkaAddress;

    @DynamicPropValue("local.debug.enable")
    private Boolean localDebugEnable;

    @DynamicPropValue("kafka.topic.major.partitions")
    private Integer majorTopicPartitions;

    @DynamicPropValue("kafka.topic.retry.partitions")
    private Integer retryTopicPartitions;
    private KafkaDriver kafkaDriver;
    private KafkaDriver localKafkaDriver;

    @Autowired
    private DataRuleDomain dataRuleDomain;

    @Autowired
    private AppDomain appDomain;

    @Autowired
    private RecordDomain recordDomain;

    public void initialize() throws Exception {
        this.kafkaDriver = KafkaDriver.drive(this.kafkaAddress);
        if (this.localDebugEnable == null) {
            this.localDebugEnable = false;
        }
        if (this.localDebugEnable.booleanValue() && StringUtils.isNotBlank(this.localKafkaAddress)) {
            logger.info(" [DTS] 初始化本地调试kafka: " + this.localKafkaAddress);
            this.localKafkaDriver = KafkaDriver.drive(this.localKafkaAddress);
        }
        reload();
    }

    public void reload() throws Exception {
        List<DataRule> findAllEnableDataRule = this.dataRuleDomain.findAllEnableDataRule();
        this.sendersMap.clear();
        reloadMajorSenders(findAllEnableDataRule);
        reloadRetrySenders(this.appDomain.findAllApps());
        refreshAllTopics();
    }

    private boolean needLocalSender() {
        return (this.localDebugEnable == null || !this.localDebugEnable.booleanValue() || this.localKafkaDriver == null || this.localKafkaAddress.equals(this.kafkaAddress)) ? false : true;
    }

    private synchronized void reloadMajorSenders(List<DataRule> list) throws Exception {
        for (DataRule dataRule : list) {
            addMajorSender(dataRule, false);
            if (needLocalSender()) {
                addMajorSender(dataRule, true);
            }
        }
    }

    private synchronized void reloadRetrySenders(List<DTSApp> list) throws Exception {
        for (DTSApp dTSApp : list) {
            addRetrySender(dTSApp, false);
            if (needLocalSender()) {
                addRetrySender(dTSApp, true);
            }
        }
    }

    private void refreshAllTopics() throws Exception {
        for (DataSender dataSender : this.sendersMap.values()) {
            if (dataSender instanceof KafkaSender) {
                ((KafkaSender) dataSender).checkAndCreateTopic();
            }
        }
    }

    public void send(DataRule dataRule, DTSRecord dTSRecord) {
        Validation.assertNotNull(dataRule, "[DTS ERROR]: 发送数据的规则为空！");
        Validation.assertNotNull(dTSRecord, "[DTS ERROR]: 发送的数据记录为空！");
        DataSender majorSender = getMajorSender(dataRule, false);
        logger.info(" [DTS] sender topic: " + majorSender.getTopicName() + " : Address: " + majorSender.getAddress());
        majorSender.send(dTSRecord);
        if (needLocalSender()) {
            DataSender majorSender2 = getMajorSender(dataRule, true);
            logger.info(" [DTS] LOCAL DEBUG : sender topic: " + majorSender2.getTopicName() + " : Address: " + majorSender2.getAddress());
            majorSender2.send(dTSRecord);
        }
        this.recordDomain.saveHistoryRecord(dataRule, dTSRecord);
    }

    public void send(DataRule dataRule, List<DTSRecord> list) {
        Validation.assertNotNull(dataRule, "[DTS ERROR]: 发送数据的规则为空！");
        Validation.assertNotEmpty(list, "[DTS ERROR]: 发送的数据记录列表UUID为空！");
        Iterator<DTSRecord> it = list.iterator();
        while (it.hasNext()) {
            send(dataRule, it.next());
        }
    }

    public void retry(DTSRetryRecord dTSRetryRecord) {
        Validation.assertNotNull(dTSRetryRecord, "[DTS ERROR]: 重试发送的数据记录为空！");
        Validation.assertNotNull(dTSRetryRecord.getAppId(), "[DTS ERROR]: 重试发送数据的APP为空！");
        getRetrySender(this.appDomain.findAppById(dTSRetryRecord.getEnv(), dTSRetryRecord.getAppId()), false).send(dTSRetryRecord);
    }

    private String getMajorTopicName(DataRule dataRule) {
        return dataRule.getMajorTopicName();
    }

    private String getRetryTopicName(DTSApp dTSApp) {
        return dTSApp.getRetryTopicName();
    }

    private Map<String, DataSender> getSendersMap(boolean z) {
        return z ? this.localSendersMap : this.sendersMap;
    }

    private DataSender getMajorSender(DataRule dataRule, boolean z) {
        String majorTopicName = getMajorTopicName(dataRule);
        Map<String, DataSender> sendersMap = getSendersMap(z);
        DataSender dataSender = sendersMap.get(majorTopicName);
        if (dataSender == null) {
            synchronized (this) {
                if (!sendersMap.containsKey(majorTopicName)) {
                    addMajorSender(dataRule, z);
                }
                dataSender = sendersMap.get(majorTopicName);
            }
        }
        return dataSender;
    }

    private DataSender getRetrySender(DTSApp dTSApp, boolean z) {
        String retryTopicName = getRetryTopicName(dTSApp);
        Map<String, DataSender> sendersMap = getSendersMap(z);
        DataSender dataSender = sendersMap.get(retryTopicName);
        if (dataSender == null) {
            synchronized (this) {
                if (!sendersMap.containsKey(retryTopicName)) {
                    addRetrySender(dTSApp, z);
                }
                dataSender = sendersMap.get(retryTopicName);
            }
        }
        return dataSender;
    }

    private synchronized void addMajorSender(DataRule dataRule, boolean z) {
        String majorTopicName = getMajorTopicName(dataRule);
        Map<String, DataSender> sendersMap = getSendersMap(z);
        if (sendersMap.get(majorTopicName) == null) {
            sendersMap.put(majorTopicName, z ? new KafkaSender(majorTopicName, this.localKafkaDriver, this.majorTopicPartitions.intValue()) : new KafkaSender(majorTopicName, this.kafkaDriver, this.majorTopicPartitions.intValue()));
        }
    }

    private synchronized void addRetrySender(DTSApp dTSApp, boolean z) {
        String retryTopicName = getRetryTopicName(dTSApp);
        Map<String, DataSender> sendersMap = getSendersMap(z);
        if (sendersMap.get(retryTopicName) == null) {
            sendersMap.put(retryTopicName, z ? new KafkaSender(retryTopicName, this.localKafkaDriver, this.retryTopicPartitions.intValue()) : new KafkaSender(retryTopicName, this.kafkaDriver, this.retryTopicPartitions.intValue()));
        }
    }
}
