package com.yiqiniu.easytrans.queue.impl.kafka;

import com.yiqiniu.easytrans.protocol.TransactionId;
import com.yiqiniu.easytrans.queue.impl.kafka.KafkaQueueProperties;
import com.yiqiniu.easytrans.queue.producer.EasyTransMsgPublishResult;
import com.yiqiniu.easytrans.queue.producer.EasyTransMsgPublisher;
import com.yiqiniu.easytrans.serialization.ObjectSerializer;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/yiqiniu/easytrans/queue/impl/kafka/KafkaEasyTransMsgPublisherImpl.class */
public class KafkaEasyTransMsgPublisherImpl implements EasyTransMsgPublisher {
    private static Logger log = LoggerFactory.getLogger(KafkaEasyTransMsgPublisherImpl.class);
    private Producer<String, byte[]> kafkaProducer;
    private ObjectSerializer serializer;
    private KafkaQueueProperties.ProducerConfig cfg;

    public KafkaEasyTransMsgPublisherImpl(KafkaQueueProperties.ProducerConfig producerConfig, ObjectSerializer objectSerializer) {
        this.serializer = objectSerializer;
        this.cfg = producerConfig;
        this.kafkaProducer = new KafkaProducer(this.cfg.getNativeCfg());
    }

    public EasyTransMsgPublishResult publish(String str, String str2, String str3, Map<String, Object> map, byte[] bArr) {
        String kafkaTopic = QueueKafkaHelper.getKafkaTopic(str, str2);
        int calcMessagePartition = calcMessagePartition(kafkaTopic, (TransactionId) map.get("pTrxId"));
        ArrayList arrayList = new ArrayList(map.size());
        for (Map.Entry<String, Object> entry : map.entrySet()) {
            arrayList.add(new RecordHeader(entry.getKey(), this.serializer.serialization(entry.getValue())));
        }
        try {
            log.info("message sent:" + ((RecordMetadata) this.kafkaProducer.send(new ProducerRecord(kafkaTopic, Integer.valueOf(calcMessagePartition), (Long) null, str3, bArr, arrayList)).get()));
            EasyTransMsgPublishResult easyTransMsgPublishResult = new EasyTransMsgPublishResult();
            easyTransMsgPublishResult.setTopic(str);
            easyTransMsgPublishResult.setMessageId(str3);
            return easyTransMsgPublishResult;
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException("message sent error", e);
        }
    }

    public int calcMessagePartition(String str, TransactionId transactionId) {
        return Math.abs(transactionId.hashCode()) % this.kafkaProducer.partitionsFor(str).size();
    }

    public Future<RecordMetadata> publishKafkaMessage(ProducerRecord<String, byte[]> producerRecord) {
        return this.kafkaProducer.send(producerRecord);
    }
}
