inflearn logo
강의

강의

N
챌린지

챌린지

멘토링

멘토링

N
클립

클립

로드맵

로드맵

지식공유

Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)

Orders Microservice 수정 - Order Kafka Producer

Kafka Connect Sink 사용할 때 LocalDateTime은 어떻게 하나요??

512

Calice

작성한 질문수 6

0

안녕하세요.
강사님의 강의를 듣고 개인 프로젝트에 적용하고 있습니다.

LocalDateTime이 없을 때는 잘 수행되었는데 제 개인 프로젝트 엔티티에는 LocalDateTime이 있습니다. 이 부분에서 자꾸 에러가 나와서 어떻게 처리해야 하는지 알려주시면 감사하겠습니다.

 

다음은 제가 LocalDateTime을 처리하기 위해 따로 만든 엔티티 입니다.

package com.dotd.product.kafkaConnectTest;


import jdk.jfr.Name;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.CreatedDate;
import org.springframework.data.jpa.domain.support.AuditingEntityListener;

import javax.persistence.*;
import java.time.LocalDateTime;

@Entity
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Table(name = "nametest")
@EntityListeners(AuditingEntityListener.class)
public class NameTest {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Integer id;

    private String name;

    @CreatedDate
    @Column(name = "created_at")
    private LocalDateTime createdAt;


}

 

 

 

또한, Kafka Connect Sink가 한 번 에러가 발생하면 state : FAILED로 바뀌면서 작동을 안 하는 거 같은데 어떻게 정상화 하나요..??

 

127.0.0.1:8083/connectors/nametest-sink-connect/status

이 요청을 보냈을 때 이렇게 나옵니다.

 

{
    "name": "nametest-sink-connect",
    "connector": {
        "state": "RUNNING",
        "worker_id": "192.168.0.14:8083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "FAILED",
            "worker_id": "192.168.0.14:8083",
            "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\r\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)\r\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)\r\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)\r\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)\r\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)\r\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)\r\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\r\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\r\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\r\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\r\n\tat java.base/java.lang.Thread.run(Thread.java:834)\r\nCaused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: Exception chain:\r\njava.sql.BatchUpdateException: Column 'created_at' cannot be null\r\n\r\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:124)\r\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)\r\n\t... 10 more\r\nCaused by: java.sql.SQLException: Exception chain:\r\njava.sql.BatchUpdateException: Column 'created_at' cannot be null\r\n\r\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.getAllMessagesException(JdbcSinkTask.java:154)\r\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:104)\r\n\t... 11 more\r\n"
        }
    ],
    "type": "sink"
}

 

spring-boot jpa 아키텍처 spring-cloud kafka msa

답변 1

0

인프런 AI 인턴

안녕하세요, 인프런 AI 인턴이에요. Kafka Connect Sink에서 LocalDateTime을 처리하는 방법에 대해서 알려드릴게요.

Kafka Connect Sink는 기본적으로 Avro 형식을 지원합니다. 그래서 LocalDateTime과 같은 Java 8의 새로운 날짜시간 API를 사용할 수 있도록 설정하셔야 합니다.

먼저, Kafka Connect Sink 설정 파일에 다음과 같은 내용을 추가해주세요.

"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "<스키마 레지스트리 URL>",
"value.converter.schema.registry.url": "<스키마 레지스트리 URL>",
"transforms": "ValueToStruct",
"transforms.ValueToStruct.type": "org.apache.kafka.connect.transforms.HoistField$Value"

위 설정에서 <스키마 레지스트리 URL> 부분에는 실제 스키마 레지스트리의 URL을 입력해야 합니다.

그리고 LocalDateTime을 사용하는 필드인 경우, 스키마를 정의해야 합니다. 예를 들어, 다음과 같이 스키마를 정의할 수 있습니다.

{
  "namespace": "com.dotd.product.kafkaConnectTest",
  "type": "record",
  "name": "NameTest",
  "fields": [
    {
      "name": "id",
      "type": "int"
    },
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "createdAt",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    }
  ]
}

createdAt 필드의 logicalTypetimestamp-millis로 설정하여 LocalDateTime을 변환하도록 합니다.

이렇게 설정을 하면 Kafka Connect Sink가 LocalDateTime을 처리할 수 있게 됩니다.

추가로, Kafka Connect Sink가 에러가 발생하고 state가 FAILED로 전환되면, Kafka Connect의 상태를 확인하고 로그를 분석해야 합니다. 문제를 해결하기 위해서는 로그에서 발생한 에러 메시지를 확인하고 그에 맞는 조치를 취해야 합니다.

Kafka Connect Sink를 정상화하기 위해서는 에러 메시지의 원인을 파악하고 그에 따른 조치를 취해야 합니다. 예를 들어, Sink Connector 설정의 문제, 데이터 소스에 대한 접근 권한 부족 등 여러 가지 원인이 있을 수 있으므로 상세한 에러 메시지를 살펴보고 조치를 취하시면 됩니다.

이러한 경우, 일반적으로 Kafka Connect Sink의 문서나 커뮤니티나 고객센터에 문의하여 도움을 받을 수도 있습니다.

다른 질문이 있으시면 언제든지 물어보세요! 좋은 결과 있기를 바라요.

kafka 업데이트 강의 듣고 시포요

0

85

1

강의 교안

0

73

1

마이크로서비스간 통신 시, 인증 처리

0

79

1

api gateway 에서 인증 처리

0

65

1

섹션 19 질문드립니다

0

53

1

강의 자료 업데이트

0

82

1

부하분산 강의 섹션

0

57

1

강의자료는 어디에서?

0

71

1

강의 자료는 어디서 다운 받을 수 있나요?

0

110

1

전체 사용자 조회시 오류

0

58

1

혹시 pk 외 별도의 id 를 부여한 이유가 있을까요 ??

0

110

2

학습 방향

0

95

2

카프카 커넥터 사용 목적 문의

0

85

2

kafka 강의

0

108

2

서비스 디스커버리 종류

0

87

2

강의 자료에 대해서 궁금해요

0

116

2

GlobalFilter, LoggingFilter가 동작하지 않습니다.

0

90

2

Kafka Source Connect 버전 에러

0

85

2

소스커넥터는 사용안한 거 맞죠?

0

81

2

강의자료 업데이트 문의

0

95

2

강의에서 BCryptPasswordEncoder 에 역할(5-2)

0

56

1

강의 업데이트 계획이 궁금합니다.

0

112

2

MSA 애플리케이션에 Spring Web과 Spring Data JPA를 사용하는 것이 바람직한지 궁금합니다. (MSA 설계와 관련된 질문입니다)

0

161

2

어떤 것이 업데이트 된 건가요?

0

164

2