package com.thebeastshop.kit.kafka.springboot;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson.parser.Feature;
import com.thebeastshop.common.spring.SpringAware;
import com.thebeastshop.common.utils.MetaUtil;
import com.thebeastshop.kit.kafka.utils.KafkaDriver;
import com.thebeastshop.kit.prop.PropConstants;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.kafka.listener.MessageListener;

/* loaded from: input_file:com/thebeastshop/kit/kafka/springboot/KafkaConsumerStarter.class */
public class KafkaConsumerStarter implements InitializingBean {
    private final Logger log = LoggerFactory.getLogger(getClass());
    private static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
    private static final String KAFKA_CONSUMER_CONFIG = "kafka.consumer.config";

    public void afterPropertiesSet() throws Exception {
        String properties = PropConstants.getProperties(KAFKA_CONSUMER_CONFIG);
        if (StringUtils.isNotBlank(properties)) {
            try {
                for (SbootKafkaConsumerVO sbootKafkaConsumerVO : (List) JSON.parseObject(properties, new TypeReference<List<SbootKafkaConsumerVO>>() { // from class: com.thebeastshop.kit.kafka.springboot.KafkaConsumerStarter.1
                }, new Feature[0])) {
                    KafkaDriver drive = KafkaDriver.drive(PropConstants.getProperties(KAFKA_BOOTSTRAP_SERVERS));
                    MessageListener messageListener = (MessageListener) SpringAware.registerBean(Class.forName(sbootKafkaConsumerVO.getHandler()));
                    this.log.info("开始启动kafka消费者{},监听的topic为{},模式为{},并行数量为{}", new Object[]{sbootKafkaConsumerVO.getHandler(), sbootKafkaConsumerVO.getTopic(), sbootKafkaConsumerVO.getAckMode(), Integer.valueOf(sbootKafkaConsumerVO.getConcurrency())});
                    drive.startConsumer(sbootKafkaConsumerVO.getTopic(), sbootKafkaConsumerVO.getGroup() == null ? MetaUtil.APP_NAME : sbootKafkaConsumerVO.getGroup(), messageListener, sbootKafkaConsumerVO.getConcurrency(), sbootKafkaConsumerVO.getAckMode());
                }
            } catch (Throwable th) {
                this.log.error("cannot parse consumer config", th);
                throw new RuntimeException("cannot parse consumer config");
            }
        }
    }
}
