package com.ctrip.framework.apollo.biz.message;

import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import com.ctrip.framework.apollo.biz.repository.ReleaseMessageRepository;
import com.ctrip.framework.apollo.core.utils.ApolloThreadFactory;
import com.ctrip.framework.apollo.tracer.Tracer;
import com.ctrip.framework.apollo.tracer.spi.Transaction;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/ctrip/framework/apollo/biz/message/ReleaseMessageScanner.class */
public class ReleaseMessageScanner implements InitializingBean {
    private static final Logger logger = LoggerFactory.getLogger(ReleaseMessageScanner.class);
    private static final int DEFAULT_SCAN_INTERVAL_IN_MS = 1000;

    @Autowired
    private Environment env;

    @Autowired
    private ReleaseMessageRepository releaseMessageRepository;
    private int databaseScanInterval;
    private List<ReleaseMessageListener> listeners = Lists.newCopyOnWriteArrayList();
    private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, ApolloThreadFactory.create("ReleaseMessageScanner", true));
    private long maxIdScanned;

    public void afterPropertiesSet() throws Exception {
        populateDataBaseInterval();
        this.maxIdScanned = loadLargestMessageId();
        this.executorService.scheduleWithFixedDelay(() -> {
            Transaction newTransaction = Tracer.newTransaction("Apollo.ReleaseMessageScanner", "scanMessage");
            try {
                scanMessages();
                newTransaction.setStatus("0");
            } catch (Throwable th) {
                newTransaction.setStatus(th);
                logger.error("Scan and send message failed", th);
            } finally {
                newTransaction.complete();
            }
        }, getDatabaseScanIntervalMs(), getDatabaseScanIntervalMs(), TimeUnit.MILLISECONDS);
    }

    public void addMessageListener(ReleaseMessageListener releaseMessageListener) {
        if (this.listeners.contains(releaseMessageListener)) {
            return;
        }
        this.listeners.add(releaseMessageListener);
    }

    private void scanMessages() {
        boolean z = true;
        while (z && !Thread.currentThread().isInterrupted()) {
            z = scanAndSendMessages();
        }
    }

    private boolean scanAndSendMessages() {
        List<ReleaseMessage> findFirst500ByIdGreaterThanOrderByIdAsc = this.releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(Long.valueOf(this.maxIdScanned));
        if (CollectionUtils.isEmpty(findFirst500ByIdGreaterThanOrderByIdAsc)) {
            return false;
        }
        fireMessageScanned(findFirst500ByIdGreaterThanOrderByIdAsc);
        int size = findFirst500ByIdGreaterThanOrderByIdAsc.size();
        this.maxIdScanned = findFirst500ByIdGreaterThanOrderByIdAsc.get(size - 1).getId();
        return size == 500;
    }

    private long loadLargestMessageId() {
        ReleaseMessage findTopByOrderByIdDesc = this.releaseMessageRepository.findTopByOrderByIdDesc();
        if (findTopByOrderByIdDesc == null) {
            return 0L;
        }
        return findTopByOrderByIdDesc.getId();
    }

    private void fireMessageScanned(List<ReleaseMessage> list) {
        for (ReleaseMessage releaseMessage : list) {
            for (ReleaseMessageListener releaseMessageListener : this.listeners) {
                try {
                    releaseMessageListener.handleMessage(releaseMessage, Topics.APOLLO_RELEASE_TOPIC);
                } catch (Throwable th) {
                    Tracer.logError(th);
                    logger.error("Failed to invoke message listener {}", releaseMessageListener.getClass(), th);
                }
            }
        }
    }

    private void populateDataBaseInterval() {
        this.databaseScanInterval = DEFAULT_SCAN_INTERVAL_IN_MS;
        try {
            String property = this.env.getProperty("apollo.message-scan.interval");
            if (!Objects.isNull(property)) {
                this.databaseScanInterval = Integer.parseInt(property);
            }
        } catch (Throwable th) {
            Tracer.logError(th);
            logger.error("Load apollo message scan interval from system property failed", th);
        }
    }

    private int getDatabaseScanIntervalMs() {
        return this.databaseScanInterval;
    }
}
