package com.yiqiniu.easytrans.queue.impl.kafka;

import com.yiqiniu.easytrans.protocol.EasyTransRequest;
import com.yiqiniu.easytrans.queue.consumer.EasyTransConsumeAction;
import com.yiqiniu.easytrans.queue.consumer.EasyTransMsgConsumer;
import com.yiqiniu.easytrans.queue.consumer.EasyTransMsgListener;
import com.yiqiniu.easytrans.queue.impl.kafka.KafkaQueueProperties;
import com.yiqiniu.easytrans.serialization.ObjectSerializer;
import com.yiqiniu.easytrans.util.NamedThreadFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/yiqiniu/easytrans/queue/impl/kafka/KafkaEasyTransMsgConsumerImpl.class */
public class KafkaEasyTransMsgConsumerImpl implements EasyTransMsgConsumer {
    private static Logger logger = LoggerFactory.getLogger(KafkaEasyTransMsgConsumerImpl.class);
    private static final String RETRY_COUNT_KEY = "retryCount";
    private static final String ORGINAL_TOPIC = "orginalTopic";
    private KafkaConsumer<String, byte[]> consumer;
    private KafkaConsumer<String, byte[]> reconsumer;
    private Thread dispatchThread;
    private Thread reconsumeThread;
    private KafkaQueueProperties.ConsumerConfig cfg;
    private ExecutorService threadPool;
    private ObjectSerializer serializer;
    private KafkaEasyTransMsgPublisherImpl retryQueueMsgProducer;
    private List<Map<TopicPartition, LinkedList<ConsumerRecord<String, byte[]>>>> retryRecords;
    private Map<String, Integer> retryQueuePartitionCount;
    private Map<String, EasyTransMsgListener> subscribedKafkaTopics = new ConcurrentHashMap(8);
    private HashSet<String> subscribedReconsumeKafkaTopics = new HashSet<>(4);
    private Integer[] retryLevelThreshold = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/yiqiniu/easytrans/queue/impl/kafka/KafkaEasyTransMsgConsumerImpl$MessageHandler.class */
    public class MessageHandler implements Callable<ConsumerRecord<String, byte[]>> {
        ConsumerRecord<String, byte[]> consumeRecord;

        public MessageHandler(ConsumerRecord<String, byte[]> consumerRecord) {
            this.consumeRecord = consumerRecord;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public ConsumerRecord<String, byte[]> call() throws Exception {
            try {
                Iterator it = this.consumeRecord.headers().headers(KafkaEasyTransMsgConsumerImpl.ORGINAL_TOPIC).iterator();
                EasyTransMsgListener easyTransMsgListener = (EasyTransMsgListener) KafkaEasyTransMsgConsumerImpl.this.subscribedKafkaTopics.get(it.hasNext() ? (String) KafkaEasyTransMsgConsumerImpl.this.serializer.deserialize(((Header) it.next()).value()) : this.consumeRecord.topic());
                Headers<Header> headers = this.consumeRecord.headers();
                HashMap hashMap = new HashMap(8);
                for (Header header : headers) {
                    hashMap.put(header.key(), KafkaEasyTransMsgConsumerImpl.this.serializer.deserialize(header.value()));
                }
                if (EasyTransConsumeAction.CommitMessage != easyTransMsgListener.consume(hashMap, (EasyTransRequest) KafkaEasyTransMsgConsumerImpl.this.serializer.deserialize((byte[]) this.consumeRecord.value()))) {
                    KafkaEasyTransMsgConsumerImpl.this.reconsumeLater(this.consumeRecord);
                }
                return null;
            } catch (Exception e) {
                KafkaEasyTransMsgConsumerImpl.logger.error("error handling message for topic:" + this.consumeRecord.topic() + ",sent to retry queue for later proccessing", e);
                try {
                    KafkaEasyTransMsgConsumerImpl.this.reconsumeLater(this.consumeRecord);
                    return null;
                } catch (Exception e2) {
                    KafkaEasyTransMsgConsumerImpl.logger.error("publish to reconsume queue failed", e);
                    return this.consumeRecord;
                }
            }
        }
    }

    public KafkaEasyTransMsgConsumerImpl(KafkaQueueProperties.ConsumerConfig consumerConfig, ObjectSerializer objectSerializer, KafkaEasyTransMsgPublisherImpl kafkaEasyTransMsgPublisherImpl) {
        this.serializer = objectSerializer;
        this.cfg = consumerConfig;
        this.consumer = new KafkaConsumer<>(consumerConfig.getNativeCfg());
        this.reconsumer = new KafkaConsumer<>(consumerConfig.getNativeCfg());
        this.retryQueueMsgProducer = kafkaEasyTransMsgPublisherImpl;
        this.threadPool = Executors.newFixedThreadPool(consumerConfig.getConsumerThread(), new NamedThreadFactory("KafkaMsgHandler"));
        List<List<Integer>> reconsume = consumerConfig.getReconsume();
        initRetryThreshold(reconsume);
        initRetryRecordsMap();
        initRetryQueueSubscribe(reconsume);
        initRetryQueuePartitionCountMap();
    }

    private void initRetryQueueSubscribe(List<List<Integer>> list) {
        for (int i = 0; i < list.size(); i++) {
            this.subscribedReconsumeKafkaTopics.add(contactRetryTopicName(Integer.valueOf(i)));
        }
        this.reconsumer.subscribe(this.subscribedReconsumeKafkaTopics);
    }

    private void initRetryQueuePartitionCountMap() {
        this.retryQueuePartitionCount = new HashMap(4);
        Iterator<String> it = this.subscribedReconsumeKafkaTopics.iterator();
        while (it.hasNext()) {
            String next = it.next();
            this.retryQueuePartitionCount.put(next, Integer.valueOf(this.reconsumer.partitionsFor(next).size()));
        }
    }

    private void initRetryThreshold(List<List<Integer>> list) {
        this.retryLevelThreshold = new Integer[list.size() - 1];
        this.retryLevelThreshold[0] = list.get(0).get(1);
        for (int i = 1; i < list.size() - 1; i++) {
            this.retryLevelThreshold[i] = Integer.valueOf(this.retryLevelThreshold[i - 1].intValue() + list.get(i).get(1).intValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void initRetryRecordsMap() {
        this.retryRecords = new LinkedList();
        for (int i = 0; i < this.cfg.getReconsume().size(); i++) {
            this.retryRecords.add(new HashMap());
        }
    }

    public synchronized void subscribe(String str, Collection<String> collection, EasyTransMsgListener easyTransMsgListener) {
        Iterator<String> it = collection.iterator();
        while (it.hasNext()) {
            this.subscribedKafkaTopics.put(QueueKafkaHelper.getKafkaTopic(str, it.next()), easyTransMsgListener);
        }
        this.consumer.subscribe(this.subscribedKafkaTopics.keySet());
    }

    public synchronized void start() {
        if (this.dispatchThread == null) {
            this.dispatchThread = new Thread("KafkaMessagePollThread") { // from class: com.yiqiniu.easytrans.queue.impl.kafka.KafkaEasyTransMsgConsumerImpl.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    while (true) {
                        try {
                            KafkaEasyTransMsgConsumerImpl.this.pollAndDispatchMessage();
                        } catch (InterruptedException e) {
                            KafkaEasyTransMsgConsumerImpl.logger.warn("interuppted,exit KafkaMessagePollThread", e);
                            return;
                        } catch (Exception e2) {
                            KafkaEasyTransMsgConsumerImpl.logger.error("exception occourd in KafkaMessagePollThread!Poll Message 5 seconds later", e2);
                            try {
                                Thread.sleep(5000L);
                            } catch (InterruptedException e3) {
                                KafkaEasyTransMsgConsumerImpl.logger.warn("interuppted,exit KafkaMessagePollThread", e2);
                                return;
                            }
                        }
                    }
                }
            };
            this.dispatchThread.setDaemon(true);
            this.dispatchThread.start();
        }
        if (this.reconsumeThread == null) {
            this.reconsumeThread = new Thread("KafkaReconsumePollThread") { // from class: com.yiqiniu.easytrans.queue.impl.kafka.KafkaEasyTransMsgConsumerImpl.2
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    while (true) {
                        try {
                            KafkaEasyTransMsgConsumerImpl.this.reconsumeRecords();
                        } catch (Exception e) {
                            KafkaEasyTransMsgConsumerImpl.logger.error("KafkaReconsumePollThread Error Occur", e);
                            KafkaEasyTransMsgConsumerImpl.this.reconsumer.resume(KafkaEasyTransMsgConsumerImpl.this.reconsumer.paused());
                            KafkaEasyTransMsgConsumerImpl.this.initRetryRecordsMap();
                            try {
                                Thread.sleep(5000L);
                            } catch (InterruptedException e2) {
                                KafkaEasyTransMsgConsumerImpl.logger.info("KafkaReconsumePollThread interrupted,exit thread");
                                return;
                            }
                        }
                    }
                }
            };
            this.reconsumeThread.setDaemon(true);
            this.reconsumeThread.start();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reconsumeRecords() throws InterruptedException {
        ConsumerRecord<String, byte[]> consumerRecord;
        ConsumerRecords poll = this.reconsumer.poll(1000L);
        for (TopicPartition topicPartition : poll.partitions()) {
            Map<TopicPartition, LinkedList<ConsumerRecord<String, byte[]>>> map = this.retryRecords.get(getRetryTimeLevelFromTopicName(topicPartition.topic()));
            LinkedList<ConsumerRecord<String, byte[]>> linkedList = map.get(topicPartition);
            if (linkedList == null) {
                linkedList = new LinkedList<>();
                map.put(topicPartition, linkedList);
            }
            linkedList.addAll(poll.records(topicPartition));
        }
        int i = 0;
        List<List<Integer>> reconsume = this.cfg.getReconsume();
        Iterator<Map<TopicPartition, LinkedList<ConsumerRecord<String, byte[]>>>> it = this.retryRecords.iterator();
        while (it.hasNext()) {
            for (Map.Entry<TopicPartition, LinkedList<ConsumerRecord<String, byte[]>>> entry : it.next().entrySet()) {
                LinkedList<ConsumerRecord<String, byte[]>> value = entry.getValue();
                if (value.size() != 0) {
                    Iterator<ConsumerRecord<String, byte[]>> it2 = value.iterator();
                    ArrayList arrayList = new ArrayList(4);
                    ConsumerRecord<String, byte[]> consumerRecord2 = null;
                    while (true) {
                        consumerRecord = consumerRecord2;
                        if (!it2.hasNext()) {
                            break;
                        }
                        ConsumerRecord<String, byte[]> next = it2.next();
                        if (System.currentTimeMillis() - next.timestamp() <= reconsume.get(i).get(0).intValue()) {
                            break;
                        }
                        arrayList.add(new MessageHandler(next));
                        it2.remove();
                        consumerRecord2 = next;
                    }
                    executeJobs(arrayList);
                    if (value.size() == 0) {
                        this.reconsumer.commitSync(Collections.singletonMap(entry.getKey(), new OffsetAndMetadata(consumerRecord.offset() + 1)));
                        this.reconsumer.resume(Arrays.asList(entry.getKey()));
                    } else {
                        this.reconsumer.pause(Arrays.asList(entry.getKey()));
                    }
                }
            }
            i++;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void pollAndDispatchMessage() throws InterruptedException {
        Iterator it = this.consumer.poll(10000L).iterator();
        LinkedList linkedList = new LinkedList();
        while (it.hasNext()) {
            linkedList.add(new MessageHandler((ConsumerRecord) it.next()));
        }
        executeJobs(linkedList);
        this.consumer.commitAsync();
    }

    private void executeJobs(List<MessageHandler> list) throws InterruptedException {
        for (Future future : this.threadPool.invokeAll(list)) {
            boolean z = false;
            while (!z) {
                try {
                    z = future.get() == null;
                    if (!z) {
                        future = this.threadPool.submit(new MessageHandler((ConsumerRecord) future.get()));
                        Thread.sleep(1000L);
                    }
                } catch (ExecutionException e) {
                    throw new RuntimeException("Unexpected,it should not throw Exception", e);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reconsumeLater(ConsumerRecord<String, byte[]> consumerRecord) throws InterruptedException, ExecutionException {
        Headers<Header> headers = consumerRecord.headers();
        ArrayList arrayList = new ArrayList(8);
        Integer num = -1;
        boolean z = false;
        for (Header header : headers) {
            if (header.key().equals(RETRY_COUNT_KEY)) {
                num = (Integer) this.serializer.deserialize(header.value());
            } else {
                if (header.key().equals(ORGINAL_TOPIC)) {
                    z = true;
                }
                arrayList.add(header);
            }
        }
        Integer valueOf = Integer.valueOf(num.intValue() + 1);
        arrayList.add(new RecordHeader(RETRY_COUNT_KEY, this.serializer.serialization(valueOf)));
        if (!z) {
            arrayList.add(new RecordHeader(ORGINAL_TOPIC, this.serializer.serialization(consumerRecord.topic())));
        }
        String calcRetryTopic = calcRetryTopic(consumerRecord.topic(), valueOf);
        this.retryQueueMsgProducer.publishKafkaMessage(new ProducerRecord<>(calcRetryTopic, Integer.valueOf(consumerRecord.partition() % this.retryQueuePartitionCount.get(calcRetryTopic).intValue()), (Long) null, consumerRecord.key(), consumerRecord.value(), arrayList)).get();
    }

    private String calcRetryTopic(String str, Integer num) {
        return contactRetryTopicName(Integer.valueOf(calcRetryTimeLevel(num)));
    }

    private String contactRetryTopicName(Integer num) {
        return getConsumerId() + "_reconsume_" + num;
    }

    private int calcRetryTimeLevel(Integer num) {
        int i = 0;
        while (i < this.retryLevelThreshold.length && num.intValue() >= this.retryLevelThreshold[i].intValue()) {
            i++;
        }
        return i;
    }

    private int getRetryTimeLevelFromTopicName(String str) {
        return Integer.parseInt(str.substring(str.lastIndexOf("_") + 1));
    }

    public String getConsumerId() {
        return this.cfg.getNativeCfg().getProperty("group.id");
    }
}
