inflearn logo
강의

강의

N
챌린지

챌린지

멘토링

멘토링

N
클립

클립

로드맵

로드맵

지식공유

[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!

직렬화, 역직렬화 관련

460

jhahn

작성한 질문수 4

0

안녕하세요. 좋은 강의 감사합니다.

직렬화, 역직렬화 관련해서 질문이 있습니다.

저는 Spring 프레임워크를 사용해서 프로듀서와, 컨슈머를 각각 따로 서버를 만들어서 개발 하고 있습니다.

수업 내용은 단순히 String이지만 제가 실무에 사용하려고하는건 웹 애플리케이션 과 같이 DTO 클래스로

받은 데이터를 그대로 프로듀서에서 send()에 담아서 보내고 있습니다. 컨슈머에서 DTO로 받으려고

관련 자료를 찾아 보니 JsonSerializer가 있어 해보았습니다. Object 로 받아서 여러 DTO를 받을 수 있는

Consumer factory를 만들다 보니 여러 에러가 발생해서 ObjectMapper를 사용했습니다.

@Component
public class ObjectMapperService {
    private final ObjectMapper objectMapper = new ObjectMapper();

    public <T> T convertValue(String json, Class<T> valueType) throws JsonProcessingException {
        return objectMapper.readValue(json, valueType);
    }
}
@Component
@Slf4j
@Data
@RequiredArgsConstructor
public class KafkaConsumer {
    private final ObjectMapperService mapper;

    @KafkaListener(topics = "member-join", containerFactory = "commonKafkaListenerContainerFactory")
    public void receiveMemberJoin(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) throws Exception {
        String value = consumerRecord.value();
        MemberJoin memberJoin = mapper.convertValue(value, MemberJoin.class);
        String recommendCode = memberJoin.getRecommendCode();
        log.info("recommendCode: {}", recommendCode);
        log.info("received payload = {}", memberJoin.toString());
        acknowledgment.acknowledge();
    }
}

이런식으로 사용해서 DTO클래스로 사용했는데 실무에서는 어떤식으로 하는지 궁금합니다. 혹시 참고할 수 있는 정보가 있을까요?

감사합니다.

kafka 데이터-엔지니어링 직렬화 역직렬화 spring

답변 2

0

데브원영 DVWY

안녕하세요. 스프링 프레임워크에 대해 문의주신내용 확인했습니다. 스프링 카프카는 이번 강의에서 다루지 않기 때문에 답변이 어려운점 양해부탁드립니다.

다만, 일반적인 컨슈머/프로듀서 사용 입장에서 말씀드리자면 실무의 사용형태는 요구조건에 따라 다르다고 말씀드릴 수 있습니다. 만약 팀간 느슨한 데이터 연결을 원할 때는 json으로 데이터를 보내고 받는 쪽에서 DTO없이 개별 필드에 대해 파싱하는 형태로 사용합니다. 개인적으로도 JSON 데이터를 주고 받고 받는 쪽에서 오류처리 혹은 분기처리를 통해 데이터를 연동하도록 설정하는 편입니다.

0

인프런 AI 인턴

안녕하세요, 인프런 AI 인턴입니다.

실무에서 Kafka의 프로듀서와 컨슈머를 구현할 때 직렬화와 역직렬화는 매우 중요한 부분입니다. DTO 클래스를 사용하여 데이터를 전송하는 경우, JsonSerializerJsonDeserializer를 통해 간편하게 구현할 수 있습니다. Spring Boot에서는 @JsonComponent 어노테이션을 사용하여 직렬화와 역직렬화를 더 쉽게 구현할 수 있습니다.

예를 들어, 다음과 같이 @JsonComponent 어노테이션을 사용한 사용자 정의 직렬화 및 역직렬화 구현 예제를 참조할 수 있습니다.

import java.io.IOException;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.ObjectCodec;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;

import org.springframework.boot.jackson.JsonComponent;

@JsonComponent
public class MyJsonComponent {

    public static class Serializer extends JsonSerializer<MyObject> {

        @Override
        public void serialize(MyObject value, JsonGenerator jgen, SerializerProvider serializers) throws IOException {
            jgen.writeStartObject();
            jgen.writeStringField("name", value.getName());
            jgen.writeNumberField("age", value.getAge());
            jgen.writeEndObject();
        }
    }

    public static class Deserializer extends JsonDeserializer<MyObject> {

        @Override
        public MyObject deserialize(JsonParser jsonParser, DeserializationContext ctxt) throws IOException {
            ObjectCodec codec = jsonParser.getCodec();
            JsonNode tree = codec.readTree(jsonParser);
            String name = tree.get("name").textValue();
            int age = tree.get("age").intValue();
            return new MyObject(name, age);
        }
    }
}

이 예제는 Spring Boot 환경에서 사용자 정의 객체를 직렬화 및 역직렬화하는 방법을 보여줍니다. 이렇게 하면 특정 DTO에 대한 직렬화 및 역직렬화 로직을 캡슐화하여 관리할 수 있으며, Spring이 자동으로 이를 처리하도록 할 수 있습니다.

이 방법 외에도, 실무에서는 특정 클래스에 대한 직렬화/역직렬화 방식을 고정하기보다는 경우에 따라 다양한 방법을 사용할 수 있습니다. 각 프로젝트의 요구 사항과 데이터 구조에 따라 가장 적절한 방식을 선택하게 됩니다.

참고하실 수 있는 자료로는 Spring 공식 문서가 있습니다. 더 자세한 내용과 예제는 아래의 링크를 찾아보시기 바랍니다.

Spring Boot Reference Documentation v3.1.5 - Custom Serializers and Deserializers

실무에서 사용하는 방식에는 여러 가지가 있으며, 특정 프로젝트에 가장 적합한 방식을 선택하는 것이 중요합니다. 제공해 드린 예제와 문서가 도움이 되었기를 바랍니다.

감사합니다.

0

jhahn

답변 감사합니다.

제가 말씀드린건 특정 DTO가 아닌 어느 DTO건 공통으로 관리가 되는지 궁금했습니다.

스프링 부트 컨트롤러에서 @RequestBody를 같이 사용 해서 DTO를 공통으로 받을 수 없는지가 궁금합니다.

감사합니다.

consume 이후 lag가 줄어들지 않음

0

75

2

안녕하세요. 강의의 카프카 버전과 현재 시점의 카프카의 차이점 문의 드립니다.

0

118

2

멱등성 프로듀서 retries 관련 질문입니다.

0

99

2

채팅 서비스 개발 시 주의점이 있을까요?

0

100

2

충분히 큰 파티션 생성시 궁금증이 존재합니다.

0

88

2

KTable 키가 없는 레코드 처리

0

73

2

컨슈머 테스트 코드 작성

0

94

2

리밸런스 onPartitionRevoked이 필요한 상황

0

74

2

카프카 클러스터에서 감당 가능한 파티션(레플리카) 수 문의

0

120

2

reset offset 질문

0

77

2

KStreamJoinKTable 실행시 오류

0

78

2

auto.commit.interval.ms 옵션 관련 질문 드립니다.

0

142

2

파티션, 컨슈머 그룹, 컨슈머 관련 질문

0

117

1

java, kotlin

0

130

2

shutdownThread 에 대한 문의 입니다.

0

155

2

zookeeper실행시 오류가 발생합니다.

0

271

2

커스텀 소스 커넥터에서 Thread.sleep (1000) 은 왜 하는거에요?

0

153

2

처리량을 늘리기 위해서 파티션을 늘리고 컨슈머를 늘려야한다고 설명하셨는데요

0

181

3

파티션 개수와 컨슈머 개수의 처리량 관련 질문

0

164

2

동영상 및 이미지 처리 관련 문의 드립니다.

0

226

2

주키퍼 없이 사용 문의 드립니다.

0

303

1

kafka 를 띄우니 오류가 발생하고 종료합니다.

0

281

3

zookeeper 실행시 오류 .. 무엇을 해야 할까요?

0

220

2

파티션 추가로 해결할 수 있지만 늘어난 파티션은 줄일 수 없지 않나요?

0

192

2