package com.thebeastshop.dts.sdk.handler;

import cn.hutool.core.util.ObjectUtil;
import com.thebeastshop.bgel.Bgel;
import com.thebeastshop.bgel.BgelContext;
import com.thebeastshop.dts.enums.DTSChannel;
import com.thebeastshop.dts.enums.DTSRetryStatus;
import com.thebeastshop.dts.field.DTSFieldNameMode;
import com.thebeastshop.dts.record.DTSRecord;
import com.thebeastshop.dts.record.DTSRecordBuilder;
import com.thebeastshop.dts.record.DTSRecordField;
import com.thebeastshop.dts.sdk.DTSKafkaManager;
import com.thebeastshop.dts.sdk.utils.DTSMetaObtainer;
import com.thebeastshop.dts.vo.RegisterItem;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.support.Acknowledgment;

/* loaded from: input_file:com/thebeastshop/dts/sdk/handler/AbsDTSBatchHandler.class */
public abstract class AbsDTSBatchHandler implements DTSBatchHandler<List<DTSRecord>> {
    private Logger log = LoggerFactory.getLogger(getClass());
    private List<RegisterItem> registerItems;
    private DTSFieldNameMode fieldNameMode;

    @Override // com.thebeastshop.dts.sdk.handler.IDTSHandler
    public abstract void dataProcess(List<DTSRecord> list) throws Throwable;

    @Override // com.thebeastshop.dts.sdk.handler.IDTSHandler
    public void dataReceive(List<DTSRecord> list) {
        List<DTSRecord> list2 = (List) list.stream().filter(dTSRecord -> {
            RegisterItem matchItem = matchItem(dTSRecord);
            if (ObjectUtil.isNull(matchItem)) {
                return false;
            }
            if (!StringUtils.isNotBlank(matchItem.getFilterEl())) {
                return true;
            }
            BgelContext bgelContext = new BgelContext();
            bgelContext.setEnv(generatorElDataMap(dTSRecord));
            boolean booleanValue = ((Boolean) Bgel.eval(matchItem.getFilterEl(), bgelContext)).booleanValue();
            if (!booleanValue) {
                this.log.debug("[BEAST-DTS]数据被EL过滤:{}", dTSRecord);
            }
            return booleanValue;
        }).collect(Collectors.toList());
        Iterator<DTSRecord> it = list.iterator();
        while (it.hasNext()) {
            this.log.info("[BEAST-DTS]接受到数据:{}", it.next());
        }
        try {
            dataProcess(list2);
        } catch (Throwable th) {
            this.log.error(MessageFormat.format("[BEAST-DTS]订阅器{0}出现异常", getClass().getSimpleName()), th);
            Iterator<DTSRecord> it2 = list.iterator();
            while (it2.hasNext()) {
                DTSKafkaManager.loadRecycleSender().recycle(DTSRecordBuilder.fromRecord(it2.next()).setAppId(DTSMetaObtainer.APP_NAME).setChannel(DTSChannel.RECYCLE).setStatus(DTSRetryStatus.UNTREATED).setMessage(th.getMessage()).buildRetryRecord());
            }
        }
    }

    public void onMessage(List<ConsumerRecord<String, String>> list) {
        super.onMessage((List) list);
    }

    public void onMessage(List<ConsumerRecord<String, String>> list, Acknowledgment acknowledgment) {
        dataReceive((List<DTSRecord>) list.stream().map(consumerRecord -> {
            return DTSRecordBuilder.fromJSON(this.fieldNameMode, (String) consumerRecord.value()).buildRecord();
        }).collect(Collectors.toList()));
        acknowledgment.acknowledge();
    }

    private Map<String, Object> generatorElDataMap(DTSRecord dTSRecord) {
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : dTSRecord.getFields().entrySet()) {
            hashMap.put(entry.getKey(), ((DTSRecordField) entry.getValue()).getValue());
        }
        return hashMap;
    }

    private RegisterItem matchItem(DTSRecord dTSRecord) {
        for (RegisterItem registerItem : this.registerItems) {
            if (registerItem.getTableName().equals(dTSRecord.getTableName())) {
                return registerItem;
            }
        }
        return null;
    }

    public List<RegisterItem> getRegisterItems() {
        return this.registerItems;
    }

    @Override // com.thebeastshop.dts.sdk.handler.IDTSHandler
    public void setRegisterItems(List<RegisterItem> list) {
        this.registerItems = list;
    }

    @Override // com.thebeastshop.dts.sdk.handler.IDTSHandler
    public void clearRegisterItems() {
        if (CollectionUtils.isNotEmpty(this.registerItems)) {
            this.registerItems.clear();
        }
    }

    @Override // com.thebeastshop.dts.sdk.handler.IDTSHandler
    public void addRegisterItem(RegisterItem registerItem) {
        if (this.registerItems == null) {
            this.registerItems = new ArrayList();
        }
        this.registerItems.add(registerItem);
    }

    @Override // com.thebeastshop.dts.sdk.handler.IDTSHandler
    public void setFieldNameMode(DTSFieldNameMode dTSFieldNameMode) {
        this.fieldNameMode = dTSFieldNameMode;
    }
}
