package org.springframework.kafka.core;

import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;

/* loaded from: input_file:BOOT-INF/lib/spring-kafka-1.1.2.RELEASE.jar:org/springframework/kafka/core/KafkaTemplate.class */
public class KafkaTemplate<K, V> implements KafkaOperations<K, V> {
    protected final Log logger;
    private final ProducerFactory<K, V> producerFactory;
    private final boolean autoFlush;
    private RecordMessageConverter messageConverter;
    private volatile Producer<K, V> producer;
    private volatile String defaultTopic;
    private volatile ProducerListener<K, V> producerListener;

    public KafkaTemplate(ProducerFactory<K, V> producerFactory) {
        this(producerFactory, false);
    }

    public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean z) {
        this.logger = LogFactory.getLog(getClass());
        this.messageConverter = new MessagingMessageConverter();
        this.producerListener = new LoggingProducerListener();
        this.producerFactory = producerFactory;
        this.autoFlush = z;
    }

    public String getDefaultTopic() {
        return this.defaultTopic;
    }

    public void setDefaultTopic(String str) {
        this.defaultTopic = str;
    }

    public void setProducerListener(ProducerListener<K, V> producerListener) {
        this.producerListener = producerListener;
    }

    public MessageConverter getMessageConverter() {
        return this.messageConverter;
    }

    public void setMessageConverter(RecordMessageConverter recordMessageConverter) {
        this.messageConverter = recordMessageConverter;
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public ListenableFuture<SendResult<K, V>> sendDefault(V v) {
        return send(this.defaultTopic, v);
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public ListenableFuture<SendResult<K, V>> sendDefault(K k, V v) {
        return send(this.defaultTopic, (String) k, (K) v);
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public ListenableFuture<SendResult<K, V>> sendDefault(int i, K k, V v) {
        return send(this.defaultTopic, i, k, v);
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public ListenableFuture<SendResult<K, V>> send(String str, V v) {
        return doSend(new ProducerRecord<>(str, v));
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public ListenableFuture<SendResult<K, V>> send(String str, K k, V v) {
        return doSend(new ProducerRecord<>(str, k, v));
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public ListenableFuture<SendResult<K, V>> send(String str, int i, V v) {
        return doSend(new ProducerRecord<>(str, Integer.valueOf(i), null, v));
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public ListenableFuture<SendResult<K, V>> send(String str, int i, K k, V v) {
        return doSend(new ProducerRecord<>(str, Integer.valueOf(i), k, v));
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public ListenableFuture<SendResult<K, V>> send(Message<?> message) {
        return doSend(this.messageConverter.fromMessage(message, this.defaultTopic));
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public List<PartitionInfo> partitionsFor(String str) {
        return getTheProducer().partitionsFor(str);
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public Map<MetricName, ? extends Metric> metrics() {
        return getTheProducer().metrics();
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public <T> T execute(KafkaOperations.ProducerCallback<K, V, T> producerCallback) {
        return producerCallback.doInKafka(getTheProducer());
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public void flush() {
        getTheProducer().flush();
    }

    protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
        getTheProducer();
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Sending: " + producerRecord);
        }
        final SettableListenableFuture settableListenableFuture = new SettableListenableFuture();
        getTheProducer().send(producerRecord, new Callback() { // from class: org.springframework.kafka.core.KafkaTemplate.1
            /* JADX WARN: Multi-variable type inference failed */
            @Override // org.apache.kafka.clients.producer.Callback
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                if (exc != null) {
                    settableListenableFuture.setException(new KafkaProducerException(producerRecord, "Failed to send", exc));
                    if (KafkaTemplate.this.producerListener != null) {
                        KafkaTemplate.this.producerListener.onError(producerRecord.topic(), producerRecord.partition(), producerRecord.key(), producerRecord.value(), exc);
                        return;
                    }
                    return;
                }
                settableListenableFuture.set(new SendResult(producerRecord, recordMetadata));
                if (KafkaTemplate.this.producerListener == null || !KafkaTemplate.this.producerListener.isInterestedInSuccess()) {
                    return;
                }
                KafkaTemplate.this.producerListener.onSuccess(producerRecord.topic(), producerRecord.partition(), producerRecord.key(), producerRecord.value(), recordMetadata);
            }
        });
        if (this.autoFlush) {
            flush();
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Sent: " + producerRecord);
        }
        return settableListenableFuture;
    }

    private Producer<K, V> getTheProducer() {
        if (this.producer == null) {
            synchronized (this) {
                if (this.producer == null) {
                    this.producer = this.producerFactory.createProducer();
                }
            }
        }
        return this.producer;
    }
}
