hyunjinkim
@hyunjinkim
受講生
1,361
受講レビュー
93
講義評価
4.9
안녕하세요.
데이터 & AI 분야에서 일하고 있는 15년차 현직자입니다.
정보관리기술사를 취득한 이후 지금까지 얻은 지식을 많은 사람들에게 공유하고자 컨텐츠 제작하고 있습니다.
반갑습니다. :)
Contact: hjkim_sun@naver.com
講義
受講レビュー
- Airflowマスタークラス
- Airflowマスタークラス
投稿
Q&A
consumer가 모든 파티션을 읽지 않는 이슈가 있습니다
프로듀서가 계속 데이터를 올리고 있는데도 컨슈머가 모든 파티션을 컨슘하고 있지 않다면 이 현상은 정상입니다. 캡쳐해주신대로 consumer 마다 파티션 지정은 완료된 상태입니다. 위에 올려주신대로 보면 컨슈머1은 파티션 0, 1, 2 담당하고 있고 컨슈머 2는 파티션 3,4,5 담당하고 있는데 컨슈머1: 파티션 1만 처리 중컨슈머2: 파티션 3, 5 만 처리중 이런 상태이죠? 이건 컨슈머가 컨슘할 때 여러 파티션에서 메시지를 골고루 꺼내오지 않아서 그렇습니다. poll_consumer에서 메시지를 한번에 100개씩 꺼내오도록 설정되어 있을 거에요. 바구니 크기가 100이라 해볼께요. 그럼 컨슈머는 특정 파티션 하나에서 우선 가져올 수 있는 만큼 가져옵니다. 그럼 특정 파티션 하나에서 이미 100개가 다 찹니다. 그래서 다른 파티션에서 가져올 여유가 없어서 지금 마치 하나의 파티션만 컨슘하고 있는 것처럼 보이게 됩니다. (예: 컨슈머1: 파티션 1 / 컨슈머2: 파티션3 ) 그러다가 컨슈머가 어느 순간 파티션을 바꾸어 가지고 오기도 합니다. (예: 컨슈머1: 파티션 1 / 컨슈머2: 파티션5 ) 지금 올려주신걸 보면 3개만 가져오는 걸로 보이는데 정확히는 파티션 3개를 동시 컨슘하고 있는게 아니라 여전히 파티션 2개를 컨슘하고 있는 중입니다. 3개로 보이는 이유는 commit 기록이 남아 있어서 보일 뿐입니다. (예: 파티션 3)저 상황에서 한번 캡쳐 떠보시고, 한 10초 뒤에 캡쳐 다시 떠보시면 파티션 2개만 컨슈머의 current offset이 증가한게 보일거에요. 나머지 1개의 current offset은 그대로 일겁니다. 다만 예전에 찍어놓은 commit 기록이 있어서 보일 뿐이에요. 그래서 밤바미님이 지금 테스트하신게, Producer의 속도가 압도적으로 높은 상황에서 consumer가 따라가지 못하는 상황을 본 것입니다. 일반적으로 kafka 는 컨슈머의 부하량이 더 높습니다. 그래서 컨슈머의 부하 처리에 더 관심이 높고, 컨슈머 프로그램을 파티션 개수만큼 띄우는 이유컨슈머 처리량으 느려지면 파티션을더 늘리고 컨슈머를 그만큼 더 띄우는 이유 모두 컨슈머가 속도를 못 따라갈때 대응하기 위한 방법들입니다. 그래서 컨슈머 6개를 올려보세요. 서버는 중복되어도 상관없습니다. 서버1에서 2개, 서버2에서 2개, 서버3에서 2개 올리시고kafka web ui에서 consumer 의 컨슘 현황 캡쳐해보고, 10초 뒤에 다시 캡쳐해서 current_offset 비교해보시면 파티션 6개 처리되고 있는게 보일거에요. 그리고 consume_consumer.py 에서 로직을 이렇게 바꿔서 테스트해보세요. # 로직 처리 부분 # Kafka 레코드에 대한 전처리, Target Sink 등 수행 self.logger.info(f'message 처리 로직 시작') from collections import defaultdict partition_count = defaultdict(int) for msg in msg_lst: if msg is None: continue if msg.error(): print("Error:", msg.error()) continue partition_count[msg.partition()] += 1 print("Partition message count:") for partition, count in partition_count.items(): print(f"partition {partition}: {count}") #msg_val_lst = [json.loads(msg.value().decode('utf-8')) for msg in msg_lst] #df = pd.DataFrame(msg_val_lst) #print(df[:10]) 그럼 파티션별로 메시지를 몇 개 꺼내오는지 출력해서 확인할 수 있습니다.구동해보시면 파티션 1개에서만 가져오는게 보일 거에요. 간혹 파티션 2개에서 꺼내오는 순간이 있긴한데 대부분 파티션 1개에서만 가져옵니다. 한번 테스트해보시겠어요?
- 0
- 3
- 39
Q&A
consumer가 모든 파티션을 읽지 않는 이슈가 있습니다
안녕하세요 밤바미님!먼저 늦게 답변드려 죄송합니다. 다양한 실험을 하고 계시는군요 ^^ 혹시 컨슈머를 추가하고나서 메시지는 계속 유입되고 있는 상태인가요? 아니면 메시지 유입은 중단된 상태인가요? 만약 메시지 유입이 중단된 상태라면 producer를 기동시켜 보시겠어요?
- 0
- 3
- 39
Q&A
수료증 발급
안녕하세요 payo9003님 수강률 100%로 보이기는 하는데 아직 안되나요? 현재 상태 한번만 더 확인해주시면 제가 인프런에 문의해볼께요.감사합니다.
- 0
- 3
- 38
Q&A
람다 아키텍처에서 speed layer 관련 질문드립니다
안녕하세요! 질문에 순서대로 답변 드릴께요.Speed Layer도 데이터를 처리하기 위해 저장소를 가지고 있습니다. 어딘가 저장이 되긴 합니다. 그래야 서비스 Fail 후 재시작 했을 때 재처리 같은 작업이 가능합니다. 그런데 설명드린 람다 아키텍처는 개념 아키텍처라고 이해하셔야 합니다. 스피드 레이어가 최종 처리한 데이터를 어디에 저장할 것인가는 람다 아키텍처가 알려주지는 않습니다. 누가 알려주는지는 사실 논리 아키텍처를 작성하는 단계에서 아키텍트가 결정하면 됩니다. 서빙 레이어는 배치 레이어가 만든 데이터 + 스피드 레이어가 만든 데이터를 같이 볼 수 있어야 하므로 보통 S3 Object Storage 또는 사용하고 있는 DBMS에 맞는 저장소에 저장하게 됩니다. 예를 들면 서빙 레이어로 Trino 를 쓴다면 스피드 레이어가 처리한 데이터는 Trino에 저장하면 되겠죠? 또는 S3 Object Storage에 저장해도 됩니다. 그럼 질문하신 것처럼 해당 저장소에 스피드 레이어가 처리한 데이터들이 실시간으로 쌓일 겁니다. S3 Object Storage에 쌓인다면 밤바미님 말씀처럼 일주일 정도 지난 것들은 삭제하도록 설정을 걸어놓기도 합니다. 그럼 서빙 레이어가 존재하는 영역에는 스피드 레이어가 쌓아온 일주일치 데이터가 배치 데이터와 함께 공존하게 될 텐데, 중복은 어떻게 제거하냐가 핵심이지요? 사실 이것도 상세 도구를 선택하게 되면 도구마다 어떻게 해결할 것인지, 아키텍처를 어떻게 잡을 지는 천차만별이라 정답이라 할 수 있는 건 없습니다. 최근에는 Parquet 같은 포맷보다, Iceberg 또는 Deltalake 라는 오픈 테이블 포맷을 많이 씁니다. 이런 포맷들은 OLAP성 처리에도 강하면서 OLTP성 처리에도 강한 특성을 보이는데 동일한 Iceberg 테이블에 배치처리 + 스피드 레이어 처리를 반영하게 되면 별도 View 분리없이 하나의 소스만 가지고 쿼리를 할 수도 있습니다. 또는 kafka 의 sink connector 라는 기능을 이용해서 처리된 데이터를 하나의 테이블로 merge 침으로써 배치 레이어가 처리한 테이블과 같은 테이블을 사용할 수도 있습니다. 아니면 스피드 레이어는 A라는 테이블에 데이터를 쌓습니다. 배치 레이어는 B라는 테이블에 데이터를 쌓습니다. B라는 테이블에는 전일자 24시까지의 데이터가 모두 존재한다고 한다면 SELECT * FROM A WHERE TIMESTAMP >= CURRENT_DATEUNION ALL SELECT * FROM B WHERE TIMESTAMP 이런식으로 결합한 뷰를 정의해서 써도 됩니다. 하지만 이렇게 뷰를 분리하는 것보다는 하나의 테이블에 통합 관리하는 방법이 더 좋지 않을까 합니다. 결론은 람다 아키텍처는 개념적인 아키텍처라 너무 거기에 매몰되어서 생각하지 않으시는게 좋습니다. 실제 현업에서는 해결할 수 있는 방법이 다양해서요.시원한 답변은 되지 않으셨겠지만 우선 강의 뒤에 있는 실습을 해보시면서 감을 잡으시는게 좋을거에요. 또 질문 있으면 언제든 남겨주세요 ^^
- 0
- 1
- 28
Q&A
에러 발생 관련 질문드립니다.
안녕하세요 ejs1127님상태를 봤을 때 워커가 제대로 기동이 안된것 같아요. sudo docker compose down 하고 재기동했을 때 어느정도 시간이 지나고 나서 sudo docker ps 명령으로 컨테이너 상태들 캡쳐해서 한번 보여주실래요?
- 0
- 2
- 53
Q&A
vscode 작업화면에 오류가 발생하지 않습니다.
안녕하세요 ejs1127 님 우선 vscode에 기본적인 파이썬 익스텐션이 설치되어 있어야 합니다. 아래에 저 익스텐션이 설치되어 있는지 확인해보세요.(사진) 그리고 vscode에 파이썬 인터프리터가 잡혀있나요? 파이썬 인터프리터가 안잡혀있으면 오탈자 등 체크가 안됩니다. 인터프리터가 잡혀있는지 여부는 오른쪽 하단에 (3.12.3)과 같이 파이썬 버전이 보이면 됩니다. (사진)
- 0
- 2
- 40
Q&A
아키텍처 관련 질문
안녕하세요 nealzs 님 네 맞습니다. 모든 기업이 100% 원천 데이터를 가져와 별도 저장소에 적재해 둔 후 사용하고 있습니다. 15년 전쯤에는 DW 구축이 한창 인기였는데 그때 쓰던 용어로 설명드리자면 원천에서 데이터를 조회하여 ODS 영역이라는 곳에 1차 적재를 해둡니다. ODS 영역은 원천 데이터를 거의 가공없이 가져와 저장해두는 장소입니다 (가져온 시간 등 특정 컬럼은 몇 개 더 만들면서 가져올 수도 있습니다 )ODS 영역의 데이터를 그냥 데이터레이크 또는 레이크하우스라고 볼 수도 있고 필요에 따라 ODS 에 있는 데이터를 1차 가공하여 일명 스타스키마 라고 부르는 구조는 형태로 만들어 저장해 둘수도 있습니다.어쨌든 nealzs 님의 말대로 원천에서 데이터를 가져와 저장해두는 구조는 맞습니다. 답변이 됐을까요?
- 0
- 1
- 49
Q&A
WSL에서 git push 가 안되요 ㅠ
안녕하세요! git push 할 때 username, password 넣으라고 뜨나요?혹시 더 이상 묻지 않고 계속 틀리다고만 나오면, 이미 내부적으로 캐싱된 인증정보를 초기화하셔야해요.WSL에서 아래 명령으로 인증정보를 초기화하고 다시 git push 해보시겠어요?git config --global --unset-all user.password git config --global --unset-all user.token git credential reject 그럼 아마도 다시 username, password를 넣으라고 나올겁니다. 그 때 토큰값 넣으시면되요.혹시 분명 토큰을 잘 넣었는데도 계속 안된다고 하면, 토큰 발급 받을 때 권한을 잘 받았는지도 보셔야해요. 토큰 생성시 repo 부분에 v 체크하셔야 합니다.해보시고 결과 알려주세요 😀
- 0
- 2
- 78
Q&A
plugins 폴더 생성
안녕하세요 전현지님혹시 폴더 생성은 어떻게 하셨어요? vscode 내에서 만들었나요? vscode 에서 생성했던 WSL OS에서 생성하셨든 WSL에서 안보일리는 없는데 아마도 다른 곳에 만드시지 않았나 합니다.아래 인프런 AI 인턴이 말한대로 아래 경로로 만들어져야 합니다. airflow/ ├── dags/ ├── plugins/ ├── shell/ └── your_script.sh혹시 그래도 안되면 좀 더 자세한 증상이나 캡쳐를 올려주시겠어요?
- 0
- 2
- 53
Q&A
chatGPT&Airflow로 블로그 자동 포스팅하기 는 Deprecated 가 필요합니다.
안녕하세요 박찬웅님! 알려주셔서 감사합니다. 원래 다른 블로그로 대체할까 생각도 했지만 시대 흐름상 블로그 자동 포스팅이 좋은 예시는 아닌 것 같아서 올려주신것처럼 Deprecated 처리했습니다. 감사합니다.
- 0
- 2
- 50





