Skip to content

Spring Boot使用Kafka

1、添加依赖

在你的pom.xmlbuild.gradle文件中添加Spring Kafka和Kafka客户端的依赖。对于Maven,你的pom.xml应该包含以下依赖:

xml
<dependencies>
  <!-- Spring Kafka -->
  <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.7.8</version> <!-- 请根据实际情况选择最新版本 -->
  </dependency>
  
  <!-- Kafka clients -->
  <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.8.0</version> <!-- 请根据实际情况选择最新版本 -->
  </dependency>
</dependencies>

对于Gradle,你的build.gradle应该包含以下依赖:

groovy
dependencies {
    implementation 'org.springframework.kafka:spring-kafka:2.7.8'
    implementation 'org.apache.kafka:kafka-clients:2.8.0'
}

2、配置Kafka

application.propertiesapplication.yml文件中配置Kafka的属性。例如:

properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myConsumerGroup
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.acks=all
spring.kafka.producer.retries=3

或者在application.yml中:

yaml
spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: myConsumerGroup
      enable-auto-commit: false
      auto-offset-reset: earliest
    producer:
      acks: all
      retries: 3

3、创建Kafka配置类

创建一个配置类来定义Kafka消费者和生产者的bean。例如:

java
@Configuration
public class KafkaConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "myConsumerGroup");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

4、创建Kafka生产者和消费者

使用上面配置的KafkaTemplateConcurrentKafkaListenerContainerFactory来发送消息和监听消息。

生产者示例

java
@Service
public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topicName, String message) {
        this.kafkaTemplate.send(topicName, message);
    }
}

消费者示例

java
@Component
public class KafkaConsumer {

    @KafkaListener(topics = "myTopic")
    public void listen(String message) {
        System.out.println("Received message: " + message);
    }
}