package com.thebeastshop.dts.sender;

import com.thebeastshop.common.prop.annotation.DynamicPropValue;
import com.thebeastshop.common.validation.Validation;
import com.thebeastshop.dts.data.DataRule;
import com.thebeastshop.dts.domain.AppDomain;
import com.thebeastshop.dts.domain.DataRuleDomain;
import com.thebeastshop.dts.kafka.KafkaDriver;
import com.thebeastshop.dts.record.DTSRecord;
import com.thebeastshop.dts.record.DTSRetryRecord;
import com.thebeastshop.dts.vo.DTSApp;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/thebeastshop/dts/sender/SenderManagement.class */
public class SenderManagement {
    private Map<String, DataSender> sendersMap = new ConcurrentHashMap();

    @DynamicPropValue("kafka.address")
    private String kafkaAddress;

    @DynamicPropValue("kafka.topic.major.partitions")
    private Integer majorTopicPartitions;

    @DynamicPropValue("kafka.topic.retry.partitions")
    private Integer retryTopicPartitions;
    private KafkaDriver kafkaDriver;

    @Autowired
    private DataRuleDomain dataRuleDomain;

    @Autowired
    private AppDomain appDomain;

    public void initialize() throws Exception {
        this.kafkaDriver = KafkaDriver.drive(this.kafkaAddress);
        reload();
    }

    public void reload() throws Exception {
        List<DataRule> findAllEnableDataRule = this.dataRuleDomain.findAllEnableDataRule();
        this.sendersMap.clear();
        reloadMajorSenders(findAllEnableDataRule);
        reloadRetrySenders(this.appDomain.findAllApps());
        refreshAllTopics();
    }

    private synchronized void reloadMajorSenders(List<DataRule> list) throws Exception {
        Iterator<DataRule> it = list.iterator();
        while (it.hasNext()) {
            addMajorSender(it.next());
        }
    }

    private synchronized void reloadRetrySenders(List<DTSApp> list) throws Exception {
        Iterator<DTSApp> it = list.iterator();
        while (it.hasNext()) {
            addRetrySender(it.next());
        }
    }

    private void refreshAllTopics() throws Exception {
        for (DataSender dataSender : this.sendersMap.values()) {
            if (dataSender instanceof KafkaSender) {
                ((KafkaSender) dataSender).checkAndCreateTopic();
            }
        }
    }

    public void send(DataRule dataRule, DTSRecord dTSRecord) {
        Validation.assertNotNull(dataRule, "[DTS ERROR]: 发送数据的规则为空！");
        Validation.assertNotNull(dTSRecord, "[DTS ERROR]: 发送的数据记录为空！");
        getMajorSender(dataRule).send(dTSRecord);
    }

    public void send(DataRule dataRule, List<DTSRecord> list) {
        Validation.assertNotNull(dataRule, "[DTS ERROR]: 发送数据的规则为空！");
        Validation.assertNotEmpty(list, "[DTS ERROR]: 发送的数据记录列表UUID为空！");
        Iterator<DTSRecord> it = list.iterator();
        while (it.hasNext()) {
            send(dataRule, it.next());
        }
    }

    public void retry(DTSRetryRecord dTSRetryRecord) {
        Validation.assertNotNull(dTSRetryRecord, "[DTS ERROR]: 重试发送的数据记录为空！");
        Validation.assertNotNull(dTSRetryRecord.getAppId(), "[DTS ERROR]: 重试发送数据的APP为空！");
        getRetrySender(this.appDomain.findAppById(dTSRetryRecord.getEnv(), dTSRetryRecord.getAppId())).send((DTSRecord) dTSRetryRecord);
    }

    private String getMajorTopicName(DataRule dataRule) {
        return dataRule.getMajorTopicName();
    }

    private String getRetryTopicName(DTSApp dTSApp) {
        return dTSApp.getRetryTopicName();
    }

    private DataSender getMajorSender(DataRule dataRule) {
        String majorTopicName = getMajorTopicName(dataRule);
        DataSender dataSender = this.sendersMap.get(majorTopicName);
        if (dataSender == null) {
            synchronized (this) {
                if (!this.sendersMap.containsKey(majorTopicName)) {
                    addMajorSender(dataRule);
                }
                dataSender = this.sendersMap.get(majorTopicName);
            }
        }
        return dataSender;
    }

    private DataSender getRetrySender(DTSApp dTSApp) {
        String retryTopicName = getRetryTopicName(dTSApp);
        DataSender dataSender = this.sendersMap.get(retryTopicName);
        if (dataSender == null) {
            synchronized (this) {
                if (!this.sendersMap.containsKey(retryTopicName)) {
                    addRetrySender(dTSApp);
                }
                dataSender = this.sendersMap.get(retryTopicName);
            }
        }
        return dataSender;
    }

    private synchronized void addMajorSender(DataRule dataRule) {
        String majorTopicName = getMajorTopicName(dataRule);
        if (this.sendersMap.get(majorTopicName) == null) {
            this.sendersMap.put(majorTopicName, new KafkaSender(majorTopicName, this.kafkaDriver, this.majorTopicPartitions.intValue()));
        }
    }

    private synchronized void addRetrySender(DTSApp dTSApp) {
        String retryTopicName = getRetryTopicName(dTSApp);
        if (this.sendersMap.get(retryTopicName) == null) {
            this.sendersMap.put(retryTopicName, new KafkaSender(retryTopicName, this.kafkaDriver, this.retryTopicPartitions.intValue()));
        }
    }
}
