• 카테고리

    질문 & 답변
  • 세부 분야

    데이터 엔지니어링

  • 해결 여부

    해결됨

[섹션2] 메세지 비동기 전송 부분에 기본적인 질문인데요

23.11.03 00:35 작성 조회수 236

0

카프카 관련 질문이라기 보다는.. 자바에 익숙하지 않아서 자바에 관한 질문입니다.

 

kafkaProducer.send(producerRecord, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            logger.info("partiion: " + metadata.partition());
            logger.info("offset: " + metadata.offset());
            logger.info("timestamp: " + metadata.timestamp());
        } else {
            logger.error("exception error from broker: " + exception.getMessage());
        }
    }
});

여기서 sendThread에서 callback에 대한 부분을 호출할때, 이런식으로 동작하는것으로 생각했습니다.

그래서 sendThread에서 broker에서 response를받아 callback에 해당하는 부분을 채워넣을때 이와 같이 동작한다고 생각합니다. (java에 익숙하지 않아서... python코드로 그냥 이해한대로 적어보겠씁니다.. )

 

def responseCallback(record, callback) {
  callback.onCompletion(record, exception)
}

이런식으로 callback 객체의 onCompletion 메서드를 호출하고 받은 정보를 parameter로 넘기는것으로 이해했는데요.

 

근데, lambda형식으로 바꾸게 되면,

kafkaProducer.send(producerRecord, (metadata, exception) -> {
        if (exception == null) {
            logger.info("partiion: " + metadata.partition());
            logger.info("offset: " + metadata.offset());
            logger.info("timestamp: " + metadata.timestamp());
        } else {
            logger.error("exception error from broker: " + exception.getMessage());
        }
    }
});

이렇게 코드를 작성되는데, 이렇게 되면 callback 함수를 호출할때, onCompletion 메서드를 호출을 안하게 되는건가요?

callback(metadata, exception)
이와같이 호출을 하는건가요??

 

lambda에서의 호출방법으로 호출하는건지, 기존의 callback 객체를 호출하는 방식이 맞는건지.. 어떠한 부분이 맞는건지 궁금합니다.

답변 1

답변을 작성해보세요.

1

안녕하십니까,

질문의 개괄적인 내용은 이해했는데, 정확하게 잘 이해했는지 모르겠군요. 읽어 보시고, 원하는 답변이 아니면 다시 말씀해 주십시요.

Kafka 제품을 설계한 개발자가 KafkaProducer의 send(Producer 객체, Callback Interface를 구현한 객체) 메소드를 설계할 때, Callback Interface를 구현한 객체가 아니라 그냥 Callback에서 특정 이벤트에 사용될 메시지를 출력하는 기능 등을 구현한 Event 클래스를 바깥에서 호출하여 객체로 생성한 뒤 입력하는 방식을 적용했어도 될 텐데, 굳이 Callback 인터페이스를 구현한 객체를 입력 받도록 설계한 이유에 대해서 제가 개인적으로 생각한 이유는 아래와 같습니다.

먼저 Java가 처음 탄생할 때 Interface 라는 키워드를 도입했는데, 이는 기존 C++과 같은 객체 지향 언어를 사용하는 개발 그룹에 신선한(?) 충격을 주었습니다. Interface는 말 그대로 '규약'으로 정의 될 수 있으며, 특정 클래스가 이 interace를 구현하겠다고 선언하면 반드시 Interface에서 선언된 메소드들을 구현해 줘야 합니다.

interface에서 선언된 메소드를 반드시 구현해 줘야 한다는 차원에서 얼핏 보면 이를 적용하는 것이 불편해 보일 수 있지만, 이 interface에서 선언된 메소드를 공공연하게 개발자에게 드러내준다는 점과, 이에 대한 구현은 객체별로 다르게 수행할 수 있다는 차원에서 전체적인 제품 구현 측면에서 상당히 효율적인 방법이 되었고, 초기에 Java 언어가 기존 C++ 과 다른 큰 장점으로 어필되었습니다.

만일 Callback 클래스를 KafkaProducer의 send()를 호출하는 사용자가 직접 Event 클래스로 만들어서 구현해야 한다면, 사용자별로 서로 다르게 구현할 수 있기 때문에 이 Event을 사용하는 Thread에서는 어떻게 호출해야 할 지 모를 것이기 때문에 당연히 이 방식은 send( )설계시 제외 되었을 겁니다(이걸 모르셔서 질문하지는 않으셨을 것 같지만, 혹시나 해서 적습니다 ^^).

만일 Kafka 라이브러리에서 Callback 클래스를(Interface가 아니라) Event클래스로 생성하고, 이 클래스의 생성 인자로 OnCompletion()메소드에서 사용될 다양한 내부 멤버 변수들을 입력해서 Event객체로 외부에서 생성한 뒤에 send()메소드에 입력하는 방식의 단점의 경우

  1. onCompletion()을 사용자의 입맛에 맞게 Customization 할 수 없습니다. 물론 내부 멤버 변수들을 잘 설계하고, 기능적인 제약을 규정해서 이를 API에 잘 담는 방식도 좋지만, onCompletion()은 개별 kafka send()를 이용하려는 사용자별로 서로 다르게 구현을 하고 싶은 경우가 더 많을 것입니다(Kafka 라이브러리 설계자는 분명히 그럴거라고 생각하고 Callback Interface 구현 클래스를 입력 받도록 설계했습니다)

     

  2. 특정 Event가 발생할 때마다 Event가 어떻게 구현되어 있는지를 내부 API문서를 참조하거나, 직접 소스 코드를 까봐야 합니다. 하지만 Interface를 이용하면 onCompletion()을 send()를 사용하는 개발자가 직접 기술하므로 내부 라이브러리를 더 깊이 까볼 필요 없이 해당 이벤트의 발생시 이 이벤트를 어떻게 처리할 지를 명확하게 알 수 있습니다.

이러한 장점 때문에 Java에서는 비동기 기반의 Event 처리 시(주로 multi thread 기반) Callback Interface 구현 객체를 활용해서 API가 동작하는 방식을 선호 합니다.

파이썬은 Interface가 없습니다. 하지만 파이썬은 (더 강력한) function 기반의 비동기 처리를 가능하게 해줍니다. 함수의 인자로 사용자가 선언한 function을 일반적인 function 선언이나 lambda expression 기술 등으로 동적으로 입력 받을 수 있습니다.

요약드리면 Callback interface를 구현한 객체나 (파이썬의) function/lambda expression은 반드시 입력 받지 않아도 되며, Kafka라이브러리에서 미리 구현된 Event 클래스를 입력해도 됩니다. 하지만 Kafka API 설계자는 Callback Interface를 사용하는 방식이 좀 더 유연하고 다양한 방식으로 Event를 처리할 수 있기 때문에 해당 방식을 적용했다고 생각합니다.

 

감사합니다.

 

너무 자세한 답변 감사합니다...

 

너무 자세하게 답변해주셨지만 약간 질문이 정확하지 않아서 답변내용이 잘못 간것 같네요.. 죄송합니다.

 

질문의 요지는 callback interface를 넣어주는 인자의 위치가 다르다는데에 있습니다.

첫번째의 callback object를 직접 넘겨주는 방식에서는 onCompletion자체에 인자를 넘겨주고 있습니다.

 

// peseudo-code
kafkaProducer.send(record, new Callback() {
  @Override
  public void onCompletion(RecordMetadata meta, Exception exception) {
    // 여기선 Callback 생성자 쪽에서 인자를 넘겨 받고 있지 않습니다. 
    // onCompletion 자체에서 인자를 받고 있습니다. 
    // 그렇다면 해당 interface를 사용하는 입장에서 아래와 같이 사용될것이라고 생각합니다. 
    // callback.onCompletion(meta, exception) 
    ... 
  }
})

 

반면, lambda를 사용하는 입장에선 onCompletion에 인자를 넘겨주는 방식이 아닌 생성자에 바로 인자를 넘겨주는 방식을 사용하고 있습니다.

 

// peseudo-code
kafkaProducer.send(record, (meta, exception) -> {
    // 여기선 class(object) 가 아닌 lambda 식을 바로 사용했습니다. 
    // 그러면 호출하는 쪽에선 callback.onCompletion(...) 과 다르게
    // 이렇게 사용했을것 같습니다. 
    // callback(meta, exception)
    ... 
  }
})

 

interface에 class를 호출한 것과 lambda식을 사용한 것에 파라미터를 넘겨주는것이 서로 다른 위치에 있고, 그렇다면 호출해서 넘겨주는 곳에서도 다르다고 생각합니다. <- 이게 궁금했습니다.

그래서 class를 호출하는 곳에서는 callback.onCompletion 과 같이 호출할 것이고, lambda식으로 사용하는쪽에서는 callback(...) 이렇게 호출했을 것이고..

그렇다면, 호출하는쪽에서 type에 따라서 호출하는 형태를 분리했을 것이고 이렇게 이해했습다.

이게 맞게 이해한것일까요?

음, 이건 Event Callback을 설계한 사람의 사상이 그럴 뿐입니다. 굳이 생성자로 Callback Interace의 implements 구현체에 인자로 넣어줄 필요없이 Event의 처리 유형에 따라 메소드가 달라지므로 거기에 인자를 넣어서 호출해 주면 될것이라고 생각했기 때문에 그렇게 작성한 것 같습니다.

그리고 lambda에 인자로 넣어주는 것은 객체의 생성 인자가 아니라 오히려 함수의 인자라고 보시는게 맞을 것 같습니다.

 

자세하고 빠른 답변 감사합니다!