package com.thebeastshop.dts.subscriber;

import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.thebeastshop.common.prop.annotation.DynamicPropValue;
import com.thebeastshop.common.utils.BeanUtil;
import com.thebeastshop.dts.config.ConfigManagement;
import com.thebeastshop.dts.config.DTSConfig;
import com.thebeastshop.dts.config.SubscriberConfig;
import com.thebeastshop.dts.data.DataProcessor;
import com.thebeastshop.dts.enums.DTSEnv;
import com.thebeastshop.dts.enums.DTSSubscriberStatus;
import com.thebeastshop.dts.enums.SubscriberConfigType;
import com.thebeastshop.dts.exception.DTSInitializeException;
import com.thebeastshop.dts.vo.HostDTO;
import com.thebeastshop.dts.zk.DTSChildrenListener;
import com.thebeastshop.dts.zk.SubscriberInstance;
import com.thebeastshop.dts.zk.ZkDriver;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component("subscriberManagement")
/* loaded from: input_file:com/thebeastshop/dts/subscriber/SubscriberManagement.class */
public class SubscriberManagement {
    private static Logger logger = LoggerFactory.getLogger(SubscriberManagement.class);

    @Autowired
    private ConfigManagement configManagement;

    @Resource(name = "dataAssembler")
    private DataProcessor dataAssembler;

    @DynamicPropValue("zk.address")
    private String zkAddress;

    @Value("${server.port}")
    private String port;
    private Map<String, Subscriber> subscribers = new LinkedHashMap();
    private final SubscriberStatusListener statusListener = new SubscriberStatusListener() { // from class: com.thebeastshop.dts.subscriber.SubscriberManagement.1
        @Override // com.thebeastshop.dts.subscriber.SubscriberStatusListener
        public void onStatusChanged(Subscriber subscriber) {
            try {
                SubscriberManagement.this.putSubscriberToZk(subscriber);
            } catch (UnknownHostException e) {
                SubscriberManagement.logger.error(" [DTS] 更新ZK的订阅通道状态失败: " + e);
            }
        }
    };

    private SubscriberFactory getFactory(SubscriberConfig subscriberConfig) throws DTSInitializeException {
        SubscriberConfigType type = subscriberConfig.getType();
        if (type == null) {
            throw new DTSInitializeException("Subscriber type can not be null!");
        }
        if (type == SubscriberConfigType.ALIYUN_DTS) {
            return new AliyunSubscriberFactory(this.statusListener);
        }
        throw new DTSInitializeException("Unsupported subscriber type '" + subscriberConfig.getType() + "'!");
    }

    private String zkSubscriberParentPath(SubscriberConfig subscriberConfig) {
        return "/dts/" + subscriberConfig.getEnv().name() + "-subscribers/" + subscriberConfig.getUid();
    }

    public List<SubscriberInstance> getClusterInstances(DTSEnv dTSEnv, String str) {
        Subscriber subscriber = getSubscriber(str);
        if (subscriber == null || subscriber.getConfig().getEnv() != dTSEnv) {
            return null;
        }
        String zkSubscriberParentPath = zkSubscriberParentPath(subscriber.getConfig());
        ZkDriver drive = ZkDriver.drive();
        List list = null;
        try {
            list = drive.getNodePathList(zkSubscriberParentPath);
        } catch (Exception e) {
            logger.error(" [DTS ERROR] 从ZK获取Subscriber集群信息失败：" + e);
        }
        if (list == null) {
            return Lists.newArrayList();
        }
        ArrayList arrayList = new ArrayList();
        Iterator it = list.iterator();
        while (it.hasNext()) {
            arrayList.add((SubscriberInstance) drive.getNodeData(zkSubscriberParentPath + "/" + ((String) it.next()), SubscriberInstance.class));
        }
        return arrayList;
    }

    public void initialize() throws Exception {
        Map<DTSEnv, DTSConfig> loadConfigs = this.configManagement.loadConfigs();
        Iterator<DTSEnv> it = loadConfigs.keySet().iterator();
        while (it.hasNext()) {
            initialize(loadConfigs.get(it.next()));
        }
    }

    public Subscriber getSubscriber(String str) {
        return this.subscribers.get(str);
    }

    private SubscriberInstance zkSubscriberInstance(Subscriber subscriber) throws UnknownHostException {
        SubscriberInstance subscriberInstance = (SubscriberInstance) BeanUtil.buildFrom(subscriber.getConfig(), SubscriberInstance.class);
        subscriberInstance.setStatus(subscriber.getStatus());
        subscriberInstance.setHost(HostDTO.fromAddress(InetAddress.getLocalHost(), this.port));
        subscriberInstance.setMaster(subscriber.isMaster());
        return subscriberInstance;
    }

    private void initialize(DTSConfig dTSConfig) throws Exception {
        synchronized (this.subscribers) {
            logger.info("[DTS] =====> 初始化环境 [" + dTSConfig.getEnv().name() + "]");
            ZkDriver drive = ZkDriver.drive(this.zkAddress);
            for (SubscriberConfig subscriberConfig : dTSConfig.getSubscriberConfigs()) {
                if (subscriberConfig.getEnable().booleanValue()) {
                    subscriberConfig.setProcessor(this.dataAssembler);
                    Subscriber subscriber = getFactory(subscriberConfig).getSubscriber(subscriberConfig);
                    subscriber.initialize();
                    logger.info("[DTS] =====> |-- 初始化订阅者 [" + subscriberConfig.getType().name() + " : " + subscriberConfig.getName() + "] 成功!");
                    this.subscribers.put(subscriber.getUID(), subscriber);
                    putSubscriberToZk(subscriber);
                    drive.createChildrenListener(zkSubscriberParentPath(subscriberConfig), new DTSChildrenListener<SubscriberInstance>() { // from class: com.thebeastshop.dts.subscriber.SubscriberManagement.2
                        public void onChildAdded(SubscriberInstance subscriberInstance) {
                            SubscriberManagement.logger.info(" [DTS] 监听到ZK的订阅通道有添加实例: " + subscriberInstance.getHost().getIp() + ":" + subscriberInstance.getHost().getPort());
                        }

                        public void onChildUpdated(SubscriberInstance subscriberInstance) {
                            SubscriberManagement.logger.info(" [DTS] 监听到ZK的订阅通道有更新实例: " + subscriberInstance.getHost().getIp() + ":" + subscriberInstance.getHost().getPort());
                            try {
                                SubscriberManagement.this.reloadAll();
                            } catch (Exception e) {
                                SubscriberManagement.logger.error(" [DTS] Reload订阅通道异常：" + e);
                            }
                        }

                        public void onChildRemoved(SubscriberInstance subscriberInstance) {
                            Subscriber subscriber2;
                            SubscriberManagement.logger.info(" [DTS] 监听到ZK的订阅通道有删除实例: " + subscriberInstance.getHost().getIp() + ":" + subscriberInstance.getHost().getPort());
                            try {
                                if ((InetAddress.getLocalHost().getAddress() + ":" + SubscriberManagement.this.port).equals(subscriberInstance.getHost().getIp() + ":" + subscriberInstance.getHost().getPort()) && (subscriber2 = SubscriberManagement.this.getSubscriber(subscriberInstance.getUid())) != null) {
                                    SubscriberManagement.this.putSubscriberToZk(subscriber2);
                                }
                            } catch (UnknownHostException e) {
                                SubscriberManagement.logger.error(" [DTS] 获取本机地址失败: " + e);
                            }
                        }
                    });
                }
            }
        }
    }

    private void reload(DTSEnv dTSEnv, Subscriber subscriber) throws Exception {
        reload(this.configManagement.loadConfigs().get(dTSEnv));
        refresh(subscriber);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reloadAll() throws Exception {
        reloadAll(this.configManagement.loadConfigs());
    }

    public void reloadAll(Map<DTSEnv, DTSConfig> map) throws Exception {
        Iterator<DTSEnv> it = map.keySet().iterator();
        while (it.hasNext()) {
            reload(map.get(it.next()));
        }
        refreshAll();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public SubscriberInstance putSubscriberToZk(Subscriber subscriber) throws UnknownHostException {
        ZkDriver drive = ZkDriver.drive();
        SubscriberConfig config = subscriber.getConfig();
        SubscriberInstance zkSubscriberInstance = zkSubscriberInstance(subscriber);
        HostDTO host = zkSubscriberInstance.getHost();
        drive.putData2EphemeralNode(zkSubscriberParentPath(config) + "/" + host.getIp() + ":" + host.getPort(), JSON.toJSONString(zkSubscriberInstance));
        return zkSubscriberInstance;
    }

    private void reload(DTSConfig dTSConfig) throws Exception {
        synchronized (this.subscribers) {
            logger.info("[DTS] =====> 更新环境 [" + dTSConfig.getEnv().name() + "]");
            for (SubscriberConfig subscriberConfig : dTSConfig.getSubscriberConfigs()) {
                if (this.subscribers.containsKey(subscriberConfig.getUid())) {
                    Subscriber subscriber = this.subscribers.get(subscriberConfig.getUid());
                    subscriber.resetConfig(subscriberConfig);
                    if (!subscriber.isInited() && subscriber.isOpened()) {
                        subscriber.initialize();
                        logger.info("[DTS] =====> |-- 更新初始化订阅者 [" + subscriberConfig.getType().name() + " : " + subscriberConfig.getName() + "] 成功!");
                    }
                } else if (subscriberConfig.getEnable().booleanValue()) {
                    subscriberConfig.setProcessor(this.dataAssembler);
                    Subscriber subscriber2 = getFactory(subscriberConfig).getSubscriber(subscriberConfig);
                    if (subscriberConfig.getOpened().booleanValue()) {
                        subscriber2.initialize();
                        logger.info("[DTS] =====> |-- 更新订阅者 [" + subscriberConfig.getType().name() + " : " + subscriberConfig.getName() + "] 成功!");
                    }
                    this.subscribers.put(subscriber2.getUID(), subscriber2);
                }
            }
        }
    }

    public void openSubscriber(DTSEnv dTSEnv, Subscriber subscriber) throws Exception {
        this.configManagement.openSubscriber(dTSEnv, subscriber.getUID());
        reload(dTSEnv, subscriber);
        putSubscriberToZk(subscriber);
    }

    public void closeSubscriber(DTSEnv dTSEnv, Subscriber subscriber) throws Exception {
        this.configManagement.closeSubscriber(dTSEnv, subscriber.getUID());
        reload(dTSEnv, subscriber);
        putSubscriberToZk(subscriber);
    }

    public boolean canStart(Subscriber subscriber) {
        DTSSubscriberStatus status = subscriber.getStatus();
        return (status == DTSSubscriberStatus.RUNNING || status == DTSSubscriberStatus.STARTING || status == DTSSubscriberStatus.ABNORMAL) ? false : true;
    }

    public boolean shouldStop(Subscriber subscriber) {
        DTSSubscriberStatus status = subscriber.getStatus();
        return !subscriber.isOpened() && subscriber.isInited() && (status == DTSSubscriberStatus.RUNNING || status == DTSSubscriberStatus.STARTING || status == DTSSubscriberStatus.ABNORMAL);
    }

    public void refresh(Subscriber subscriber) {
        if (subscriber.isOpened() && subscriber.isInited() && canStart(subscriber)) {
            try {
                subscriber.start();
                logger.info("[DTS] =====> 启动订阅者 [" + subscriber.getName() + " : " + subscriber.getUID() + "] 成功!");
                return;
            } catch (Throwable th) {
                logger.error("[DTS] =====> 错误：启动订阅者 [" + subscriber.getName() + " : " + subscriber.getUID() + "] 失败!\n   由于：" + th.getMessage());
                return;
            }
        }
        if (shouldStop(subscriber)) {
            try {
                SubscriberConfig config = subscriber.getConfig();
                subscriber.stop();
                this.subscribers.remove(subscriber.getUID());
                subscriber = getFactory(config).getSubscriber(config);
                this.subscribers.put(subscriber.getUID(), subscriber);
                logger.info("[DTS] =====> 关闭订阅者 [" + subscriber.getName() + " : " + subscriber.getUID() + "] 成功!");
            } catch (Throwable th2) {
                logger.error("[DTS] =====> 错误：关闭订阅者 [" + subscriber.getName() + " : " + subscriber.getUID() + "] 失败!\n   由于：" + th2.getMessage());
            }
        }
    }

    public void refreshAll() {
        synchronized (this.subscribers) {
            logger.info("[DTS] =====> 刷新订阅者");
            Iterator<Subscriber> it = this.subscribers.values().iterator();
            while (it.hasNext()) {
                refresh(it.next());
            }
        }
    }
}
