package com.aliyun.drc.clusterclient.impl;

import com.alibaba.fastjson.JSONObject;
import com.aliyun.drc.client.DRCClient;
import com.aliyun.drc.client.DRCClientFactory;
import com.aliyun.drc.client.DataFilter;
import com.aliyun.drc.clusterclient.ClusterListener;
import com.aliyun.drc.clusterclient.partition.Checkpoint;
import com.aliyun.drc.clusterclient.partition.PartitionImpl;
import com.aliyun.drc.clusterclient.partition.PartitionPool;
import com.aliyun.drc.regionmanager.RegionRouterInfo;
import com.aliyun.drc.util.AbnormalThreadHook;
import com.aliyun.drc.util.ThreadMonitor;
import java.lang.Thread;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aliyun/drc/clusterclient/impl/ClientCluster.class */
public class ClientCluster implements AbnormalThreadHook {
    private static Logger logger = LoggerFactory.getLogger(ClientCluster.class);
    private String guid;
    private volatile boolean suspend;
    private List<ClusterListener> listeners;
    private PartitionPool partitions;
    private RegionRouterInfo regionRouterInfo;
    private int listenerIndex = 0;
    private Map<String, DrcClientListener> clientListeners = new ConcurrentHashMap();
    private Map<String, DRCClient> clientPools = new ConcurrentHashMap();
    private Map<Thread, String> clientThreadPools = new HashMap();
    private final ThreadMonitor threadMonitor = new ThreadMonitor();

    public final String getGuid() {
        return this.guid;
    }

    public void setGuid(String str) {
        this.guid = str;
    }

    public void setListeners(List<ClusterListener> list) {
        this.listeners = list;
    }

    public void shutdown() throws Exception {
        Iterator<Map.Entry<String, DRCClient>> it = this.clientPools.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().stopService();
        }
        logger.info("cluster has been shutdown");
    }

    public void startClientThreadMonitor() {
        this.threadMonitor.start();
    }

    public void doStart(JSONObject jSONObject) throws Exception {
        PartitionImpl partitionImpl = new PartitionImpl(jSONObject.getJSONObject("partition").getString("name"));
        partitionImpl.setTopic(jSONObject.getString("topic"));
        ClusterListener nextListener = getNextListener();
        DrcClientListener drcClientListener = new DrcClientListener(nextListener, this, this.regionRouterInfo.getDataType());
        drcClientListener.setPartition(partitionImpl);
        Properties properties = new Properties();
        properties.put("manager.host", this.regionRouterInfo.getClusterUrl());
        properties.put("guid", getGuid());
        DRCClient create = DRCClientFactory.create(DRCClientFactory.Type.MYSQL, properties);
        StringBuffer stringBuffer = new StringBuffer();
        for (String str : ((String) jSONObject.getJSONArray("tables").get(0)).split("\\|")) {
            stringBuffer.append(str).append(".*|");
        }
        create.addDataFilter(new DataFilter(stringBuffer.toString()));
        Checkpoint checkpoint = new Checkpoint(jSONObject.getString("offset"));
        com.aliyun.drc.client.impl.Checkpoint checkpoint2 = new com.aliyun.drc.client.impl.Checkpoint();
        if (checkpoint.getInstance() != null) {
            checkpoint2.setServerId(checkpoint.getInstance());
        }
        if (checkpoint.getFilePosition() != null) {
            checkpoint2.setPosition(checkpoint.getFilePosition());
        }
        if (checkpoint.getTimestamp() != null) {
            checkpoint2.setTimestamp(checkpoint.getTimestamp());
        }
        if (checkpoint.getId() != null) {
            checkpoint2.setRecordId(checkpoint.getId());
        }
        if (this.regionRouterInfo.isUsePublicIp()) {
            create.usePublicIp();
        }
        create.initService(this.regionRouterInfo.getUsername(), jSONObject.getString("topic"), this.regionRouterInfo.getPassword(), checkpoint2, (String) null);
        create.addListener(drcClientListener);
        Thread startService = create.startService();
        startService.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: com.aliyun.drc.clusterclient.impl.ClientCluster.1
            @Override // java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread, Throwable th) {
                ClientCluster.logger.error(th.toString());
            }
        });
        if (this.suspend) {
            create.suspend();
        }
        this.clientPools.put(jSONObject.getString("partition"), create);
        this.clientThreadPools.put(startService, jSONObject.getString("partition"));
        this.threadMonitor.addMonitoredThread(startService, this);
        this.partitions.addPartition(partitionImpl);
        nextListener.addListenedPartition(drcClientListener, partitionImpl);
        this.clientListeners.put(jSONObject.getString("partition"), drcClientListener);
    }

    public void doStop(String str) {
        this.partitions.remove(str);
        DrcClientListener drcClientListener = this.clientListeners.get(str);
        drcClientListener.getListener().removeListenedPartition(drcClientListener, drcClientListener.getPartition());
        Iterator<Map.Entry<Thread, String>> it = this.clientThreadPools.entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            } else if (it.next().getValue().equalsIgnoreCase(str)) {
                it.remove();
                break;
            }
        }
        DRCClient dRCClient = this.clientPools.get(str);
        if (dRCClient == null) {
            logger.warn(String.valueOf(str) + " find null client");
            return;
        }
        try {
            dRCClient.stopService();
        } catch (Exception e) {
            logger.error("stop " + str + " interrupted " + e.getMessage());
        }
    }

    @Override // com.aliyun.drc.util.AbnormalThreadHook
    public void notifyThreadFailed(Thread thread) {
        String str = this.clientThreadPools.get(thread);
        if (str != null) {
            this.clientThreadPools.remove(str);
            DRCClient dRCClient = this.clientPools.get(str);
            if (dRCClient != null) {
                try {
                    dRCClient.stopService();
                } catch (Exception e) {
                    logger.error("client error", (Throwable) e);
                }
                this.clientPools.remove(str);
            }
        }
    }

    private ClusterListener getNextListener() {
        if (this.listenerIndex >= this.listeners.size()) {
            this.listenerIndex = 0;
        }
        List<ClusterListener> list = this.listeners;
        int i = this.listenerIndex;
        this.listenerIndex = i + 1;
        return list.get(i);
    }

    public void setPartitions(PartitionPool partitionPool) {
        this.partitions = partitionPool;
    }

    public void suspendClient() {
        this.suspend = true;
        for (Map.Entry<String, DRCClient> entry : this.clientPools.entrySet()) {
            entry.getValue().suspend();
            logger.info("partition:" + entry.getKey() + ",suspend");
        }
    }

    public void resumeClient() {
        this.suspend = false;
        for (Map.Entry<String, DRCClient> entry : this.clientPools.entrySet()) {
            entry.getValue().resume();
            logger.info("partition:" + entry.getKey() + ",resume");
        }
    }

    public void setRegionRouterInfo(RegionRouterInfo regionRouterInfo) {
        this.regionRouterInfo = regionRouterInfo;
    }
}
