일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
- ssh 접속 실패 로그
- OPC
- 스마트공장
- OPC UA Client Write
- springboot Gradle
- ssh log
- Java 버전 여러개 사용
- c#
- mssql table 용량
- SCM
- MSSQL
- CPU 사용량
- delete truncate 차이
- ssh 접속 로그
- Jar 경로
- OPC UA
- JOIN DBMS별 차이점
- OPC Write
- OPC UA Write
- kafka
- c# 강의
- Xamarin
- Gradle JDK Path
- Gradle Jar
- MS-SQL
- kafka listener
- lock
- table용량
- OPC DA
- Gradle 빌드 오류
- Today
- Total
IT’s Ha
[Spring Boot] Apache Kafka 실습 본문
SpringBoot로 Apache Kafka 예제를 진행하려고 합니다.
1. 프로젝트 생성
저는 Intellij를 가지고 진행하도록 하겠습니다.
먼저, 프로젝트를 생성합니다. 프로젝트명은 kafkastudy로 만들겠습니다. 그리고 메이븐으로 저는 하였지만, Gradle도 상관없습니다.
다음으로 이동하여, Spring Web과 Spring for Apache Kafka, Lombok, Spring Boot DevTools 이렇게 선택하고 완료 해주세요.
2. Config 설정
테스트를 하기 위해, 기존 설정되어 있는 Kafka 서버와 생성한 topic을 미리 입력하겠습니다.
설정이 안되어있으면, 이전 Apache Kafka 설치 글을 참고하시면 됩니다.
2024.01.29 - [ETC] - [ETC]Windows에 Apache Kafka 설치
application.properties를 열어 아래와 같이 추가하겠습니다.
#kafkaServer Config IP:Port
#spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.bootstrap-servers=192.168.1.155:9092
spring.kafka.template.default-topic=dev-topic
test.project.topic=dev-topic
- spring.kafka.bootstrap-servers는 kafka서버의 연결 설정 선언을 미리 합니다.
- spring.kafka.template.default-topic는 기본 topic을 설정합니다.
3. Producer Config 생성
아래에 config Package를 생성합니다. 그리고 KafkaProducerConfig 클래스를 생성합니다.
package org.example.kafkastudy.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
public Map<String, Object> prodcerConfig(){
Map<String,Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String,String> producerFactory(){
return new DefaultKafkaProducerFactory<>(prodcerConfig());
}
@Bean
public KafkaTemplate<String,String> kafkaTemplate(ProducerFactory<String,String> producerFactory){
return new KafkaTemplate<>(producerFactory);
}
}
4. Consumer Config 생성
producerConfig생성과 동일하게 ConsumerConfig도 생성합니다.
package org.example.kafkastudy.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${setting.autocommit}")
private String autoCommit;
@Value("${setting.earliest}")
private String earliest;
@Autowired
private TaskExecutorConfig taskExecutorConfig;
public Map<String, Object> ConsumerConfig(){
Map<String,Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 그룹 생성
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
//오프셋 수동 관리
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, this.autoCommit);
//
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.earliest );
// poll 요청을 보내고, 다음 poll 요청을 보내는데 까지의 최대 시간 설정
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000);
return props;
}
@Bean
public ConsumerFactory<String,String> consumerFactory(){
return new DefaultKafkaConsumerFactory<>(ConsumerConfig());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> factory(ConsumerFactory<String,String> consumerFactory){
ConcurrentKafkaListenerContainerFactory<String,String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(3);// 하나의 리스너에 스레드 3개로 처리
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setListenerTaskExecutor(taskExecutorConfig.executor());
return factory;
}
}
5. Listener 생성
package org.example.kafkastudy.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class KafkaListeners {
@KafkaListener(topics = "${setting.topics}",groupId = "${spring.kafka.consumer.group-id}")
public void consume(ConsumerRecord<String, String> consumerRecord, @Header(KafkaHeaders.OFFSET) Long offset
, Acknowledgment acknowledgment
, Consumer<?, ?> consumer){
try {
log.info("Consumer Data = {}, Offset = {}, Header OffSet = {}, Partition = {}"
, consumerRecord.value(), consumerRecord.offset(),offset,consumerRecord.partition());
//처리 후 커밋
//해당 비지니스 로직 처리 후 커밋로직 작성
consumer.commitAsync();
}
catch (Exception e){
log.error(e.getMessage());
}
}
}
consumer.commitAsync는 해당 offset에 대한 수신을 저장하는 역할은 한다. 주석과 같이 해당 비지니스 로직을 추가하여 커밋을 처리하면, 비지니스 로직 처리중 문제가 발생하였을 경우 예방 할 수 있다. 하지만, 해당 프로세스가 잘못처리하면 큰오류가 발생 할 수 있으니 조심해야합니다.
6. 실행 테스트
실행한뒤 이러한 메세지가 뜬다면 정상적으로 동작 완료 되었습니다.
다음, Producer에서 데이터를 보냈을 경우 정상적으로 동작하는지 테스트는 cmd창을 추가 로 오픈한뒤 아래의 kafka 설치 된 주소에서 실행 할 수 있습니다.(위에 참조된 설치글 참고)
kafka 폴더로 접속 후
bin\windows\kafka-console-producer.bat --bootstrap-server localhost:9092 --topic dev-topic
데이터를 서버로 전송하였을때, Springboot에서도 정상적으로 로그가 올라오는지 확인합니다.
위와 같이 정상적으로 올라오게 된다면, Consumer가 제대로 동작하는 것을 확인 할 수 있습니다.
오늘은 SpringBoot Apache kafka 실습을 하였습니다. 궁금하신점이나 잘못된 내용있으면, 댓글이나 메일 부탁드립니다. 감사합니다.
'Java > Spring Boot' 카테고리의 다른 글
[Spring Boot] Gradle Build시 jar파일 위치 설정 (0) | 2024.03.05 |
---|---|
[SpringBoot] Gradle Java 버전 오류 (0) | 2024.03.04 |
[Spring Boot] 배너 벼경 (2) | 2023.10.25 |
[Spring Boot] Spring Boot 프로젝트 시작(IntelliJ IDEA) (5) | 2023.10.24 |