강의

멘토링

로드맵

Inflearn brand logo image

인프런 커뮤니티 질문&답변

송재근님의 프로필 이미지
송재근

작성한 질문수

스프링부트로 직접 만들면서 배우는 대규모 시스템 설계 - 게시판

Transactional Outbox 테스트

Transactional Outbox 테스트 시간대 5:25

해결된 질문

작성

·

137

0

현재 카프카를 종료하고 API로 2개의 데이터를 넣고 다시 카프카를 켰을 때, delete 가 동작하지 않습니다.

이전에 카프카를 켜놓은 상태에서 API 2개의 데이터 생성하는 경우는 바로 delete쿼리가 날라갔는데, 지금의 경우 어떤 부분에서 동작하지 않는지 잘 모르겠습니다.

 

따라서 압축 파일의 outbox-message-relay에 적으신 코드랑도 비교해봤는데, 동일하게 나오고 있습니다.

일단 카프카를 종료하면 다음과 같은 에러가 나오는데, 강사님과 동일한 에러인지 궁금합니다.

 

bernate: insert into article (board_id,content,created_at,modified_at,title,writer_id,article_id) values (?,?,?,?,?,?,?)
Hibernate: update board_article_count set article_count = article_count + 1 where board_id = ?
Hibernate: select bac1_0.board_id,bac1_0.article_count from board_article_count bac1_0 where bac1_0.board_id=?
2025-02-22T14:21:16.536+09:00  INFO 32925 --- [kuke-board-article-service] [nio-9000-exec-5] k.b.c.outboxmessagerelay.MessageRelay    : [MessageRelay.createOutbox] outboxEvent=OutboxEvent(outbox=Outbox(outboxId=151558979476971520, eventType=ARTICLE_CREATED, payload={"eventId":151558979474350080,"type":"ARTICLE_CREATED","payload":{"articleId":151558979451625472,"title":"hi","content":"my content","boardId":1,"writerId":1,"createdAt":[2025,2,22,14,21,16,530575000],"modifiedAt":[2025,2,22,14,21,16,530575000],"boardArticleCount":9}}, shardKey=4, createdAt=2025-02-22T14:21:16.536478))
Hibernate: select o1_0.outbox_id,o1_0.created_at,o1_0.event_type,o1_0.payload,o1_0.shard_key from outbox o1_0 where o1_0.outbox_id=?
Hibernate: insert into outbox (created_at,event_type,payload,shard_key,outbox_id) values (?,?,?,?,?)
2025-02-22T14:21:16.694+09:00  INFO 32925 --- [kuke-board-article-service] [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Node 1 disconnected.
2025-02-22T14:21:16.694+09:00  WARN 32925 --- [kuke-board-article-service] [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
2025-02-22T14:21:17.547+09:00 ERROR 32925 --- [kuke-board-article-service] [ mr-pub-event-3] k.b.c.outboxmessagerelay.MessageRelay    : [MessageRelay.publishEvent] outboxEvent=Outbox(outboxId=151558979476971520, eventType=ARTICLE_CREATED, payload={"eventId":151558979474350080,"type":"ARTICLE_CREATED","payload":{"articleId":151558979451625472,"title":"hi","content":"my content","boardId":1,"writerId":1,"createdAt":[2025,2,22,14,21,16,530575000],"modifiedAt":[2025,2,22,14,21,16,530575000],"boardArticleCount":9}}, shardKey=4, createdAt=2025-02-22T14:21:16.536478)

java.util.concurrent.TimeoutException: null
	at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095) ~[na:na]
	at kuke.board.common.outboxmessagerelay.MessageRelay.publishEvent(MessageRelay.java:46) ~[main/:na]
	at kuke.board.common.outboxmessagerelay.MessageRelay.publishEvent(MessageRelay.java:37) ~[main/:na]
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:580) ~[na:na]
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:354) ~[spring-aop-6.1.11.jar:6.1.11]
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) ~[spring-aop-6.1.11.jar:6.1.11]
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-6.1.11.jar:6.1.11]
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:768) ~[spring-aop-6.1.11.jar:6.1.11]
	at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:113) ~[spring-aop-6.1.11.jar:6.1.11]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]

2025-02-22T14:21:17.695+09:00  INFO 32925 --- [kuke-board-article-service] [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Node 1 disconnected.
2025-02-22T14:21:17.695+09:00  WARN 32925 --- [kuke-board-article-service] [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
2025-02-22T14:21:18.697+09:00  INFO 32925 --- [kuke-board-article-service] [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Node 1 disconnected.
2025-02-22T14:21:18.697+09:00  WARN 32925 --- [kuke-board-article-service] [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
Hibernate: select a1_0.article_id,a1_0.board_id,a1_0.content,a1_0.created_at,a1_0.modified_at,a1_0.title,a1_0.writer_id from article a1_0 where a1_0.article_id=?
Hibernate: insert into article (board_id,content,created_at,modified_at,title,writer_id,article_id) values (?,?,?,?,?,?,?)
Hibernate: update board_article_count set article_count = article_count + 1 where board_id = ?
Hibernate: select bac1_0.board_id,bac1_0.article_count from board_article_count bac1_0 where bac1_0.board_id=?
2025-02-22T14:21:19.531+09:00  INFO 32925 --- [kuke-board-article-service] [nio-9000-exec-7] k.b.c.outboxmessagerelay.MessageRelay    : [MessageRelay.createOutbox] outboxEvent=OutboxEvent(outbox=Outbox(outboxId=151558992038912000, eventType=ARTICLE_CREATED, payload={"eventId":151558992036290560,"type":"ARTICLE_CREATED","payload":{"articleId":151558992013565952,"title":"hi","content":"my content","boardId":1,"writerId":1,"createdAt":[2025,2,22,14,21,19,525215000],"modifiedAt":[2025,2,22,14,21,19,525215000],"boardArticleCount":10}}, shardKey=4, createdAt=2025-02-22T14:21:19.531129))
Hibernate: select o1_0.outbox_id,o1_0.created_at,o1_0.event_type,o1_0.payload,o1_0.shard_key from outbox o1_0 where o1_0.outbox_id=?
Hibernate: insert into outbox (created_at,event_type,payload,shard_key,outbox_id) values (?,?,?,?,?)
2025-02-22T14:21:19.699+09:00  INFO 32925 --- [kuke-board-article-service] [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Node 1 disconnected.
2025-02-22T14:21:19.699+09:00  WARN 32925 --- [kuke-board-article-service] [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
2025-02-22T14:21:20.544+09:00 ERROR 32925 --- [kuke-board-article-service] [ mr-pub-event-4] k.b.c.outboxmessagerelay.MessageRelay    : [MessageRelay.publishEvent] outboxEvent=Outbox(outboxId=151558992038912000, eventType=ARTICLE_CREATED, payload={"eventId":151558992036290560,"type":"ARTICLE_CREATED","payload":{"articleId":151558992013565952,"title":"hi","content":"my content","boardId":1,"writerId":1,"createdAt":[2025,2,22,14,21,19,525215000],"modifiedAt":[2025,2,22,14,21,19,525215000],"boardArticleCount":10}}, shardKey=4, createdAt=2025-02-22T14:21:19.531129)

java.util.concurrent.TimeoutException: null
	at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095) ~[na:na]
	at kuke.board.common.outboxmessagerelay.MessageRelay.publishEvent(MessageRelay.java:46) ~[main/:na]
	at kuke.board.common.outboxmessagerelay.MessageRelay.publishEvent(MessageRelay.java:37) ~[main/:na]
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:580) ~[na:na]
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:354) ~[spring-aop-6.1.11.jar:6.1.11]
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) ~[spring-aop-6.1.11.jar:6.1.11]
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-6.1.11.jar:6.1.11]
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:768) ~[spring-aop-6.1.11.jar:6.1.11]
	at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:113) ~[spring-aop-6.1.11.jar:6.1.11]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]

 

카프카 종료후 다시 연결해도 폴링은 되는ㄴ데 select만 하고 있고 기존에 outbox에 남아있는 2개의 데이터는 delete처리가 되지 않고 있습니다.

답변 2

1

쿠케님의 프로필 이미지
쿠케
지식공유자

재근님, 안녕하세요!

 

카프카가 종료되었으면, outbox에 저장된 이벤트는 카프카에 발행되지 못하므로, 삭제되지 않고 남아있어야 하는게 맞습니다!

outbox 데이터가 남아있으므로, 카프카가 정상화되었을 때 다시 이벤트가 전송될 수 있음을 의미합니다.

관계형 데이터베이스의 트랜잭션으로 저장되었던 이벤트 데이터가, 카프카로 발행되기 전까지 유실되지 않는 상황입니다.

 

로그를 보면,

Node 1 disconnected.

Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.

위와 같이 공유 주셨습니다.

카프카와의 연결이 끊겼음을 의미하네요.

카프카를 다시 실행하고 정상화되면(실행하고 정상 동작하기 까지 약간의 시간이 걸릴 수 있습니다.),

outbox 데이터는 이벤트가 전송된 후에 정상 삭제될 것 같네요!

 

그리고 카프카 종료 상태에서 쌓여있던 outbox 데이터는, 카프카가 정상화되면 재처리를 해야합니다.

이 부분은 스케줄링 동작으로 수행될 수 있도록 만들어두었습니다.

재처리 작업에 대해서도 코드 작업을 해두신 상황일까요?

로그를 보면 재처리 작업에 대한 데이터베이스 폴링 로그는 안보이네요.

재처리를 위한 폴링도 정상적으로 수행되고 있는 상황인지 디버깅 해보시면 좋을 것 같습니다!

 

혹시 더 궁금한 점 있으시면 편히 문의 주세요!

송재근님의 프로필 이미지
송재근
질문자

하하 샤드키를 %가 아니라 *로 해서 삭제가 되지 않았네요.....1주일동안 고생했는데 허무....

 

 

쿠케님의 프로필 이미지
쿠케
지식공유자

ㅋㅋㅋㅋ 헉.. 오타 찾는 과정 정말 고통스럽고 찾고 나면 허무하죠..

그래도 원인 파악하기 위해서 디버깅 하는 과정은 충분히 유의미했을겁니다!

계속 훈련이 되다 보면 점점 빨라지더라고요.. ㅎㅎㅎ

고생하셨네요, 화이팅입니다!

0

안녕하세요, 인프런 AI 인턴입니다. 송재근님.

현재 송재근님께서 겪고 계신 문제는 Kafka 연결 문제로 인한 TimeoutException입니다. 이로 인해 이벤트가 Kafka에 전달되지 않아 Outbox 테이블에 메시지가 남아있는 것으로 보입니다. 이는 일반적으로 Kafka 프로듀서의 비동기 처리에서 발생할 수 있으며, 아래 방법들로 문제를 해결할 수 있습니다.

해결 방법 제안:

  1. Kafka의 연결 상태 점검:

    • Kafka가 제대로 실행되고 있는지 확인해보세요. 특히 Kafka 서버가 실행 중인지, 해당 포트(예: 9092)가 제대로 열려 있는지 체크해야 합니다.
  2. 프로듀서 설정 확인:

    • 프로듀서의 재시도 설정을 확인하고, 적절한 재시도 횟수와 대기 시간을 설정해보세요. retriesdelivery.timeout.ms 옵션을 활용할 수 있습니다.
  3. 오류 처리:

    • TimeoutException이 발생할 경우, 이를 핸들링하는 로직을 추가하여 로깅하고, 필요시 수동으로 처리를 할 수 있도록 합니다.
  4. Outbox 테이블의 잔여 데이터 처리:

    • Kafka 연결을 복구한 후, Outbox 테이블에 남아있는 이벤트를 처리하는 스케줄러나 별도의 리레이 작업을 통해 삭제 및 발행 작업을 진행할 수 있습니다.

구체적인 설정과 코드를 보시면서 위의 문제 해결 방법이 도움이 되길 바랍니다. 추가적인 정보나 오류 로그가 있다면 제공해주시면 더 도와드릴 수 있을 것 같습니다.

관련된 추가 자료는 아래 유사한 주제를 설명하는 질문을 통해 보실 수 있습니다:

저는 질문자님의 이해를 돕고자 지식공유자님의 기존 답변을 참고하여 유사한 답변 링크를 추천해드리고 있습니다. 현재 베타 기능으로 답변이 만족스럽지 않을 수 있는 점 양해 부탁드립니다. 🙏 추가적으로 궁금한 점이 있으시면, 이어서 질문해 주세요. 곧 지식공유자께서 답변해 주실 것입니다.

송재근님의 프로필 이미지
송재근

작성한 질문수

질문하기