主题
Spring Boot使用Kafka
1、添加依赖
在你的pom.xml
或build.gradle
文件中添加Spring Kafka和Kafka客户端的依赖。对于Maven,你的pom.xml
应该包含以下依赖:
xml
<dependencies>
<!-- Spring Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.8</version> <!-- 请根据实际情况选择最新版本 -->
</dependency>
<!-- Kafka clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version> <!-- 请根据实际情况选择最新版本 -->
</dependency>
</dependencies>
对于Gradle,你的build.gradle
应该包含以下依赖:
groovy
dependencies {
implementation 'org.springframework.kafka:spring-kafka:2.7.8'
implementation 'org.apache.kafka:kafka-clients:2.8.0'
}
2、配置Kafka
在application.properties
或application.yml
文件中配置Kafka的属性。例如:
properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myConsumerGroup
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.acks=all
spring.kafka.producer.retries=3
或者在application.yml
中:
yaml
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: myConsumerGroup
enable-auto-commit: false
auto-offset-reset: earliest
producer:
acks: all
retries: 3
3、创建Kafka配置类
创建一个配置类来定义Kafka消费者和生产者的bean。例如:
java
@Configuration
public class KafkaConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "myConsumerGroup");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
4、创建Kafka生产者和消费者
使用上面配置的KafkaTemplate
和ConcurrentKafkaListenerContainerFactory
来发送消息和监听消息。
生产者示例:
java
@Service
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topicName, String message) {
this.kafkaTemplate.send(topicName, message);
}
}
消费者示例:
java
@Component
public class KafkaConsumer {
@KafkaListener(topics = "myTopic")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}