package com.thebeastshop.dts.sender;

import com.thebeastshop.dts.kafka.KafkaDriver;
import com.thebeastshop.dts.record.DTSRecord;
import org.apache.kafka.common.errors.TopicExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/thebeastshop/dts/sender/KafkaSender.class */
public class KafkaSender implements DataSender {
    private static Logger logger = LoggerFactory.getLogger(KafkaSender.class);
    private final String topicName;
    private final KafkaDriver kafkaDriver;
    private final int partitions;

    public KafkaSender(String str, KafkaDriver kafkaDriver, int i) {
        this.topicName = str;
        this.kafkaDriver = kafkaDriver;
        this.partitions = i;
    }

    public void checkAndCreateTopic() throws Exception {
        try {
            this.kafkaDriver.createTopic(this.topicName, this.partitions);
            logger.info("[DTS] 创建Topic '" + this.topicName + "' 成功");
        } catch (TopicExistsException e) {
            logger.info("[DTS] Topic '" + this.topicName + "' 已存在");
        } catch (Throwable th) {
            if (th.getCause() instanceof TopicExistsException) {
                logger.info("[DTS] Topic '" + this.topicName + "' 已存在");
            } else {
                logger.error("[DTS ERROR]: 创建Topic '" + this.topicName + "' 失败, " + th.getMessage());
                throw th;
            }
        }
    }

    @Override // com.thebeastshop.dts.sender.DataSender
    public void send(DTSRecord dTSRecord) {
        this.kafkaDriver.send(dTSRecord.getDbName() + "." + dTSRecord.getTableName(), dTSRecord, this.topicName);
    }

    @Override // com.thebeastshop.dts.sender.DataSender
    public String getTopicName() {
        return this.topicName;
    }

    @Override // com.thebeastshop.dts.sender.DataSender
    public String getAddress() {
        return this.kafkaDriver.getBootstrapServers();
    }
}
