package com.thebeastshop.kit.rocketmq.springboot;

import cn.hutool.core.util.ReflectUtil;
import com.thebeastshop.kit.rocketmq.RDelay;
import com.thebeastshop.kit.rocketmq.RMessage;
import com.thebeastshop.kit.rocketmq.RTopicInvoker;
import com.thebeastshop.kit.rocketmq.RocketMQTopicException;
import com.thebeastshop.kit.rocketmq.Topic;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/thebeastshop/kit/rocketmq/springboot/RocketMQConsumer.class */
public class RocketMQConsumer implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final Map<String, RTopicInvoker> TOPIC_INVOKER_CACHE = new ConcurrentHashMap();

    public void onMessage(String str) {
    }

    public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
        for (Method method : ReflectUtil.getMethods(getClass())) {
            Class<?>[] parameterTypes = method.getParameterTypes();
            Topic topic = (Topic) method.getAnnotation(Topic.class);
            if (topic != null) {
                String value = topic.value();
                String trim = StringUtils.isBlank(value) ? "" : value.trim();
                try {
                    defaultMQPushConsumer.subscribe(trim, "*");
                    this.TOPIC_INVOKER_CACHE.put(trim, new RTopicInvoker(parameterTypes[0], method));
                } catch (MQClientException e) {
                    throw new RuntimeException("[RocketMQ] 订阅Topic[\"" + trim + "\"; tags = \"*\"]失败: ", e);
                }
            }
        }
        defaultMQPushConsumer.registerMessageListener((list, consumeConcurrentlyContext) -> {
            if (CollectionUtils.isNotEmpty(list)) {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    MessageExt messageExt = (MessageExt) it.next();
                    try {
                        String topic2 = messageExt.getTopic();
                        RTopicInvoker rTopicInvoker = this.TOPIC_INVOKER_CACHE.get(topic2);
                        if (rTopicInvoker == null) {
                            throw new RocketMQTopicException(topic2);
                        }
                        logMessage(messageExt);
                        rTopicInvoker.invoke(this, messageExt);
                    } catch (Throwable th) {
                        logMessageError(messageExt, consumeConcurrentlyContext, th);
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
    }

    private void logMessageError(MessageExt messageExt, ConsumeConcurrentlyContext consumeConcurrentlyContext, Throwable th) {
        try {
            String topic = messageExt.getTopic();
            StringBuilder append = new StringBuilder("[RocketMQ] 执行消费者任务时发生错误: ").append(th.getMessage()).append("\n\n");
            append.append("==========================================================================================\n");
            String str = new String(messageExt.getBody());
            String property = messageExt.getProperty(RMessage.KEY_MSG_ID);
            String tags = messageExt.getTags();
            append.append("\tid = ").append(property).append(", topic = ").append(topic).append(", ack index = ").append(consumeConcurrentlyContext.getAckIndex());
            if (StringUtils.isNotEmpty(tags)) {
                append.append(", tags = ").append(tags);
            }
            String timeLevelToString = RDelay.timeLevelToString(messageExt.getDelayTimeLevel());
            if (StringUtils.isNotEmpty(timeLevelToString)) {
                append.append(", delay = ").append(timeLevelToString);
            }
            append.append("\n\tbody: " + str).append("\n");
            append.append("==========================================================================================\n");
            this.log.error(append.toString(), th);
        } catch (Throwable th2) {
            this.log.error("[RocketMQ] 打印错误日志时出现错误: ", th2);
        }
    }

    private void logMessage(MessageExt messageExt) {
        StringBuilder append = new StringBuilder("[RocketMQ 消费者] 接受消息(").append("id = ").append(messageExt.getProperty(RMessage.KEY_MSG_ID)).append(", topic = ").append(messageExt.getTopic());
        String tags = messageExt.getTags();
        if (StringUtils.isNotEmpty(tags)) {
            append.append(", tags = ").append(tags);
        }
        String timeLevelToString = RDelay.timeLevelToString(messageExt.getDelayTimeLevel());
        if (StringUtils.isNotEmpty(timeLevelToString)) {
            append.append(", delay = ").append(timeLevelToString);
        }
        append.append("):\n\t");
        append.append(new String(messageExt.getBody(), StandardCharsets.UTF_8));
        this.log.info(append.toString());
    }
}
