/*
 * Decompiled with CFR 0.152.
 */
package com.thebeastshop.kit.rocketmq.springboot;

import cn.hutool.core.util.ReflectUtil;
import com.thebeastshop.kit.rocketmq.RDelay;
import com.thebeastshop.kit.rocketmq.RTopicInvoker;
import com.thebeastshop.kit.rocketmq.RocketMQTopicException;
import com.thebeastshop.kit.rocketmq.Topic;
import com.thebeastshop.kit.rocketmq.springboot.TemplateParserContext;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.expression.BeanExpressionContextAccessor;
import org.springframework.context.expression.BeanFactoryAccessor;
import org.springframework.context.expression.EnvironmentAccessor;
import org.springframework.context.expression.MapAccessor;
import org.springframework.core.env.Environment;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.ParserContext;
import org.springframework.expression.PropertyAccessor;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;

public class RocketMQConsumer
implements RocketMQListener<String>,
RocketMQPushConsumerLifecycleListener {
    private final StandardEvaluationContext evaluationContext = new StandardEvaluationContext();
    private final ExpressionParser parser = new SpelExpressionParser();
    private final TemplateParserContext parserContext = new TemplateParserContext();
    @Resource
    private Environment environment;
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final Map<String, RTopicInvoker> TOPIC_INVOKER_CACHE = new ConcurrentHashMap<String, RTopicInvoker>();

    @PostConstruct
    public void init() {
        this.evaluationContext.addPropertyAccessor((PropertyAccessor)new BeanExpressionContextAccessor());
        this.evaluationContext.addPropertyAccessor((PropertyAccessor)new BeanFactoryAccessor());
        this.evaluationContext.addPropertyAccessor((PropertyAccessor)new MapAccessor());
        this.evaluationContext.addPropertyAccessor((PropertyAccessor)new EnvironmentAccessor());
        this.evaluationContext.setRootObject((Object)this.environment);
    }

    public void onMessage(String s) {
    }

    public void prepareStart(DefaultMQPushConsumer consumer) {
        Method[] methods;
        Class<?> clazz = this.getClass();
        for (Method method : methods = ReflectUtil.getMethods(clazz)) {
            Class<?>[] paramTypes = method.getParameterTypes();
            Topic topicAnnotation = method.getAnnotation(Topic.class);
            if (topicAnnotation == null) continue;
            String topicVal = topicAnnotation.value();
            String topicTemplate = StringUtils.isBlank((CharSequence)topicVal) ? "" : topicVal.trim();
            String topic = !topicTemplate.contains("${") ? topicTemplate : this.environment.getProperty(this.parser.parseExpression(topicTemplate, (ParserContext)this.parserContext).getExpressionString(), topicTemplate);
            try {
                consumer.subscribe(topic, "*");
                this.log.info("[RocketMQ] \u8ba2\u9605Topic[{}]", (Object)topic);
            }
            catch (MQClientException e) {
                throw new RuntimeException("[RocketMQ] \u8ba2\u9605Topic[\"" + topic + "\"; tags = \"*\"]\u5931\u8d25: ", e);
            }
            Class<?> paramType = paramTypes[0];
            this.TOPIC_INVOKER_CACHE.put(topic, new RTopicInvoker(paramType, method));
        }
        consumer.registerMessageListener((messages, context) -> {
            if (CollectionUtils.isNotEmpty((Collection)messages)) {
                for (MessageExt message : messages) {
                    try {
                        String topic = message.getTopic();
                        RTopicInvoker invoker = this.TOPIC_INVOKER_CACHE.get(topic);
                        if (invoker == null) {
                            throw new RocketMQTopicException(topic);
                        }
                        this.logMessage(message);
                        invoker.invoke(this, message);
                    }
                    catch (Throwable th) {
                        this.logMessageError(message, context, th);
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
    }

    private void logMessageError(MessageExt message, ConsumeConcurrentlyContext context, Throwable th) {
        try {
            String delay;
            String topic = message.getTopic();
            StringBuilder builder = new StringBuilder("[RocketMQ] \u6267\u884c\u6d88\u8d39\u8005\u4efb\u52a1\u65f6\u53d1\u751f\u9519\u8bef: ").append(th.getMessage()).append("\n\n");
            builder.append("==========================================================================================\n");
            String text = new String(message.getBody());
            String id = message.getMsgId();
            String tags = message.getTags();
            builder.append("\tid = ").append(id).append(", topic = ").append(topic).append(", ack index = ").append(context.getAckIndex());
            if (StringUtils.isNotEmpty((CharSequence)tags)) {
                builder.append(", tags = ").append(tags);
            }
            if (StringUtils.isNotEmpty((CharSequence)(delay = RDelay.timeLevelToString(message.getDelayTimeLevel())))) {
                builder.append(", delay = ").append(delay);
            }
            builder.append("\n\tbody: " + text).append("\n");
            builder.append("==========================================================================================\n");
            this.log.error(builder.toString(), th);
        }
        catch (Throwable oth) {
            this.log.error("[RocketMQ] \u6253\u5370\u9519\u8bef\u65e5\u5fd7\u65f6\u51fa\u73b0\u9519\u8bef: ", oth);
        }
    }

    private void logMessage(MessageExt message) {
        String delay;
        StringBuilder content = new StringBuilder("[RocketMQ \u6d88\u8d39\u8005] \u63a5\u53d7\u6d88\u606f(").append("id = ").append(message.getMsgId()).append(", topic = ").append(message.getTopic());
        String tags = message.getTags();
        if (StringUtils.isNotEmpty((CharSequence)tags)) {
            content.append(", tags = ").append(tags);
        }
        if (StringUtils.isNotEmpty((CharSequence)(delay = RDelay.timeLevelToString(message.getDelayTimeLevel())))) {
            content.append(", delay = ").append(delay);
        }
        content.append("):\n\t");
        content.append(new String(message.getBody(), StandardCharsets.UTF_8));
        this.log.info(content.toString());
    }
}

