IT’s Ha

[Spring Boot] Apache Kafka 실습 본문

Java/Spring Boot

[Spring Boot] Apache Kafka 실습

Deleloper Ha 2024. 2. 5. 23:26
728x90
반응형

SpringBoot로 Apache Kafka 예제를 진행하려고 합니다.


1.  프로젝트 생성

저는 Intellij를 가지고 진행하도록 하겠습니다.
먼저, 프로젝트를 생성합니다. 프로젝트명은 kafkastudy로 만들겠습니다. 그리고 메이븐으로 저는 하였지만, Gradle도 상관없습니다.

다음으로 이동하여, Spring Web과 Spring for Apache Kafka, Lombok, Spring Boot DevTools 이렇게 선택하고 완료 해주세요.

2.  Config 설정

테스트를 하기 위해, 기존 설정되어 있는 Kafka 서버와 생성한 topic을 미리 입력하겠습니다.
설정이 안되어있으면, 이전 Apache Kafka 설치 글을 참고하시면 됩니다.

2024.01.29 - [ETC] - [ETC]Windows에 Apache Kafka 설치

 

[ETC]Windows에 Apache Kafka 설치

안녕하세요. 이번포스팅은 Windows에 Apache 카프카 설치입니다. Apache Kafka 란? Apache Kafka 설치 Zookeeper 설치 Zookeeper & Kafka 실행 Topic 테스트 1. Apache Kafka 란? 아파치 카프카(Apache Kafka)는 분산형 실시간

oppr123.tistory.com

application.properties를 열어 아래와 같이 추가하겠습니다. 

#kafkaServer Config IP:Port
#spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.bootstrap-servers=192.168.1.155:9092
spring.kafka.template.default-topic=dev-topic

test.project.topic=dev-topic
  • spring.kafka.bootstrap-servers는 kafka서버의 연결 설정 선언을 미리 합니다.
  • spring.kafka.template.default-topic는 기본 topic을 설정합니다.

3. Producer Config 생성

아래에 config Package를 생성합니다. 그리고 KafkaProducerConfig 클래스를 생성합니다. 

package org.example.kafkastudy.config;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

public class KafkaProducerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    public Map<String, Object> prodcerConfig(){
        Map<String,Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return  props;
    }
    @Bean
    public ProducerFactory<String,String> producerFactory(){
        return new DefaultKafkaProducerFactory<>(prodcerConfig());
    }

    @Bean
    public KafkaTemplate<String,String> kafkaTemplate(ProducerFactory<String,String> producerFactory){
        return new KafkaTemplate<>(producerFactory);
    }
}

4. Consumer Config 생성

producerConfig생성과 동일하게 ConsumerConfig도 생성합니다.

package org.example.kafkastudy.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${setting.autocommit}")
    private String autoCommit;

    @Value("${setting.earliest}")
    private String earliest;

    @Autowired
    private TaskExecutorConfig taskExecutorConfig;
    public Map<String, Object> ConsumerConfig(){
        Map<String,Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        // 그룹 생성
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);

        //오프셋 수동 관리
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, this.autoCommit);
        //
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.earliest );
        // poll 요청을 보내고, 다음 poll 요청을 보내는데 까지의 최대 시간 설정
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000);
        return  props;
    }
    @Bean
    public ConsumerFactory<String,String> consumerFactory(){
        return new DefaultKafkaConsumerFactory<>(ConsumerConfig());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> factory(ConsumerFactory<String,String> consumerFactory){
        ConcurrentKafkaListenerContainerFactory<String,String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(3);// 하나의 리스너에 스레드 3개로 처리
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setListenerTaskExecutor(taskExecutorConfig.executor());

        return factory;
    }
}

5. Listener 생성

package org.example.kafkastudy.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;

import org.springframework.stereotype.Service;

@Service
@Slf4j
public class KafkaListeners {
    @KafkaListener(topics = "${setting.topics}",groupId = "${spring.kafka.consumer.group-id}")
    public void consume(ConsumerRecord<String, String> consumerRecord, @Header(KafkaHeaders.OFFSET) Long offset
            , Acknowledgment acknowledgment
            , Consumer<?, ?> consumer){
        try {

            log.info("Consumer Data = {}, Offset = {}, Header OffSet = {}, Partition = {}"
                    , consumerRecord.value(), consumerRecord.offset(),offset,consumerRecord.partition());
            //처리 후 커밋
            //해당 비지니스 로직 처리 후 커밋로직 작성
            consumer.commitAsync();
        }
        catch (Exception e){
            log.error(e.getMessage());
        }

    }
}

 

consumer.commitAsync는 해당 offset에 대한 수신을 저장하는 역할은 한다. 주석과 같이 해당 비지니스 로직을 추가하여 커밋을 처리하면, 비지니스 로직 처리중 문제가 발생하였을 경우 예방 할 수 있다. 하지만, 해당 프로세스가 잘못처리하면 큰오류가 발생 할 수 있으니 조심해야합니다.

6. 실행 테스트 

실행한뒤 이러한 메세지가 뜬다면 정상적으로 동작 완료 되었습니다. 
다음, Producer에서 데이터를 보냈을 경우 정상적으로 동작하는지 테스트는 cmd창을 추가 로 오픈한뒤 아래의 kafka 설치 된 주소에서 실행 할 수 있습니다.(위에 참조된 설치글 참고)
kafka 폴더로 접속 후 

bin\windows\kafka-console-producer.bat --bootstrap-server localhost:9092 --topic dev-topic

데이터를 서버로 전송하였을때, Springboot에서도 정상적으로 로그가 올라오는지 확인합니다. 

위와 같이 정상적으로 올라오게 된다면, Consumer가 제대로 동작하는 것을 확인 할 수 있습니다.

오늘은 SpringBoot Apache kafka 실습을 하였습니다. 궁금하신점이나 잘못된 내용있으면, 댓글이나 메일 부탁드립니다. 감사합니다.  

728x90
반응형
Comments