package com.aliyun.drc.clusterclient;

import com.aliyun.drc.clusterclient.impl.DrcClientListener;
import com.aliyun.drc.clusterclient.message.ClusterMessage;
import com.aliyun.drc.clusterclient.partition.Partition;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aliyun/drc/clusterclient/ClusterListener.class */
public abstract class ClusterListener extends Thread {
    private static final Logger logger = LoggerFactory.getLogger(ClusterListener.class);
    private static final long MAX_OCCUPY_TIME_IN_MS = 2000;
    private static final long DEFAULT_WAIT_TIME_IN_MS = 10;
    private Partition lastNotifiedPartition;
    private volatile boolean exited = false;
    private long beginTime = 0;
    private AtomicInteger count = new AtomicInteger(0);
    private List<Partition> listenedPartitions = new ArrayList();
    private List<DrcClientListener> listeners = new ArrayList();
    private BlockingQueue<List<ClusterMessage>> msgQueue = new LinkedBlockingQueue(4096);

    public abstract void notify(List<ClusterMessage> list) throws Exception;

    public abstract void noException(Exception exc);

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        setName("Notify-Message-Thread");
        while (!this.exited) {
            try {
                notify(this.msgQueue.take());
            } catch (InterruptedException e) {
                logger.warn("notify messages thread interrupted...");
                noException(e);
            } catch (Exception e2) {
                logger.error("notify messages thread exception: ", e2);
                noException(e2);
            }
        }
    }

    public synchronized long notifyMessages(Partition partition, List<ClusterMessage> list) {
        try {
            this.msgQueue.put(list);
        } catch (InterruptedException e) {
            logger.warn("put messages into msgQueue interrupted...");
        } catch (Exception e2) {
            logger.error("put messages into msgQueue exception: ", e2);
        }
        if (this.count.longValue() <= 1) {
            return 0L;
        }
        if (this.beginTime <= 0 || this.lastNotifiedPartition == null || this.lastNotifiedPartition != partition) {
            this.lastNotifiedPartition = partition;
            this.beginTime = System.currentTimeMillis();
            return 0L;
        }
        if (System.currentTimeMillis() - this.beginTime < MAX_OCCUPY_TIME_IN_MS) {
            return 0L;
        }
        this.beginTime = 0L;
        return DEFAULT_WAIT_TIME_IN_MS;
    }

    @Deprecated
    public synchronized void notifyWithoutHeartbeat(List<ClusterMessage> list) throws Exception {
        if (this.beginTime == 0) {
            this.beginTime = System.currentTimeMillis();
        }
        try {
            notify(list);
        } catch (Exception e) {
            noException(e);
        }
    }

    @Deprecated
    public long setNotifiedPartition(Partition partition) {
        if (this.count.longValue() <= 1) {
            this.beginTime = -1L;
            return 0L;
        }
        if (this.beginTime <= 0 || this.lastNotifiedPartition == null || this.lastNotifiedPartition != partition) {
            this.lastNotifiedPartition = partition;
            this.beginTime = 0L;
            return 0L;
        }
        if (System.currentTimeMillis() - this.beginTime < MAX_OCCUPY_TIME_IN_MS) {
            return 0L;
        }
        this.beginTime = 0L;
        return DEFAULT_WAIT_TIME_IN_MS;
    }

    public synchronized void addListenedPartition(DrcClientListener drcClientListener, Partition partition) {
        this.listeners.add(drcClientListener);
        this.listenedPartitions.add(partition);
        this.count.incrementAndGet();
    }

    public synchronized void removeListenedPartition(DrcClientListener drcClientListener, Partition partition) {
        this.listeners.remove(drcClientListener);
        this.listenedPartitions.remove(partition);
        this.count.decrementAndGet();
    }

    public void shutdown() throws Exception {
        this.exited = true;
        interrupt();
        join();
        logger.info("cluster listener has been shutdown...");
    }

    public int getMessageQueueSize() {
        return this.msgQueue.size();
    }
}
