Skip to content

使用Kafka Template集成Kafka

在Spring Boot项目中集成Kafka,通常会使用org.springframework.kafka包中的KafkaTemplate。以下是一个简单的步骤和示例代码,说明如何在Spring Boot应用中使用KafkaTemplate来发送和接收消息。

1. 添加依赖

首先,在你的pom.xmlbuild.gradle文件中添加Spring Kafka的依赖:

Maven

xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-kafka</artifactId>
</dependency>

Gradle

groovy
implementation 'org.springframework.boot:spring-boot-starter-kafka'

2. 配置Kafka

application.propertiesapplication.yml文件中配置Kafka的属性:

application.properties

properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup

application.yml

yaml
spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: myGroup

3. 创建KafkaTemplate Bean

在你的配置类中创建一个KafkaTemplate bean:

java
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;

@Configuration
public class KafkaConfig {

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>();
    }
}

4. 使用KafkaTemplate发送消息

创建一个服务类来发送消息到Kafka:

java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    private final KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topicName, String message) {
        kafkaTemplate.send(topicName, message);
    }
}

5. 使用KafkaListener接收消息

创建一个监听器来接收Kafka的消息:

java
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumerService {

    @KafkaListener(topics = "myTopic")
    public void consume(String message) {
        System.out.println("Received message: " + message);
    }
}

以上就是使用Spring Boot和KafkaTemplate进行Kafka集成的基本步骤。你可以根据需要调整配置和逻辑。