묻고 답해요
161만명의 커뮤니티!! 함께 토론해봐요.
인프런 TOP Writers
-
해결됨Airflow 마스터 클래스
dag 스케줄링 관련
안녕하세요? 좋은 강의에 진심으로 감사드립니다. 책으로 공부했으면 아직 헤매고 있을텐데, 친철하게 설명해 주셔서 그동안 들었던 강의를 바탕으로 어제 data.go.kr에서 기상청이 제공하는 대기정체지수를 다운로드 받는 dag을 만들어 수동으로 실행을 해보니, 수동으로는 csv 파일 생성까지 success가 되는 것을 확인하였습니다. 그런데 오늘 새벽에 자동으로 실행이 되도록 컴퓨터를 켜두었었는데, 아침에 보니 돌아가지 않았습니다. ㅠㅠ DAG에 크론 일정은 "0 5 \* \* \*"로 주고 , start_date는 어제 날짜 2023. 11. 5. in_timezone('Asia/Seoul')로 주었었습니다. (매일 새벽 5시에 돌아서 전일자로 업데이트된 데이터를 받고자 하였습니다.) airflow를 직접 사용해 보니 UTC와 KST가 혼재되어서 사용시에 헷갈리기도 하고, 실제로 제가 만들어 보니 권한 문제, 패키지 추가 문제 등 여러가지 난관에 봉착했었습니다만, 여기 질문 게시판을 참조하면서 모든 문제를 해결할 수 있었는데, 일정 셋팅 관련해서는 제가 아주 기본적인 airflow의 시간 개념을 잘 못 이해하고 있는 것 같아서 질문드립니다. 위의 두가지 말고 따로 확인해야 하는 사항이 있는지요? 여기 게시판의 첫번째 질문도 시간개념이었는데, 읽어봐도 제 입장에서는 제 문제 해결방법이 명확해 지는 것 같지는 않아서 이렇게 폐를 끼치게 되었습니다. 여담입니다만,기상청에서 제공하는 API 데이터들은 1루치 정도만 제공되어서, 매일 접속을 해서 처리를 해두지 않으면 데이터가 사라져 버립니다. 수작업으로 하는 것이 예상보다 피곤하였고, 중간중간 잊고 지나치는 경우가 생겨서, 데이터의 완결성이 떨어지는 등 아쉬운 점이 있었습니다. 제 계획은 노트북을 출근전에는 꺼놓고, 퇴근후 집에와서 WSL에 airflow 도커로 올려서 새로 켜놓고, 새벽에 Dag이 일정대로 돌아서, 아침에 일어나면 데이터들이 제가 원하는 형태대로 정리되고, 이메일로 완료되었다고 노티를 받는 것입니다. 가능할 거라고 보시는지요?
-
미해결Airflow 마스터 클래스
task 데코레이터 사용시 궁금증이 있습니다.
task 데코레이터를 사용하면서부터 함수호출값을 받게 변경되는 점이 궁금합니다. 변경 전def foo(): ... py_task = PythonOperator( python_callable=foo)변경 후@task(task_id='py_task') def foo(): ... py_task = foo() 변경 후에는 foo라는 함수원형을 입력하는 것이 아니라 함수를 호출하게 되는데 데코레이터로 감쌌기 때문에 괜찮아지는 건가요?
-
미해결카프카 완벽 가이드 - 커넥트(Connect) 편
JDBC Sink Connector 에서 Topic Commit 처리 문의
안녕하세요 강사님JDBC Sink Connector 에서 Topic 의 파티션이 다수인 경우 Topic 에 대한 Commit 처리를 어떻게 하는지 궁금합니다.예를 들어 CDC Connector 로 저장된 Topic 을 MySQL 로 Sink 하고자 하는 경우이고 Topic 스키마에 당연히 키값은 있고 파티션이 10개 정도 된다고 했을때 Sink Connector 에서 최대 1000 개 데이터를 batch 로 DB에 처리하도록 설정했다면 Topic 에 대한 Commit 처리를 offset 정보를 loop 돌면서 commit 하는걸까요?소스를 참고할수있다면 소스 레벨로 알려주시면 감사하겠습니다.
-
미해결[리뉴얼] 처음하는 SQL과 데이터베이스(MySQL) 부트캠프 [입문부터 활용까지]
jupiter노트북 말고 명령어 실행 할 수 있나요?
이걸 주피터 노트북에서 했던거처럼 python에서 실행하는건가요?
-
미해결따라하며 배우는 도커와 CI환경 [2023.11 업데이트]
travis 에서 aws EB 연결시 오류
travis 에서 정상적으로 배포 되어 s3에 압축파일이 정상적으로 들어갑니다. 또한, EB 환경에서도 실행중인 버전이 Travis-... 로 시작하구요. 그런데 도커 도메인을 클릭시 아무것도 뜨지 않습니다. 환경 Health 확인 시에도 OK 로 정상 이고요.결론은 정상적으로 배포가 불가능합니다.리뉴얼은 도대체 언제 되는건가요
-
해결됨카프카 완벽 가이드 - 코어편
[섹션2] 메세지 비동기 전송 부분에 기본적인 질문인데요
카프카 관련 질문이라기 보다는.. 자바에 익숙하지 않아서 자바에 관한 질문입니다. kafkaProducer.send(producerRecord, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { logger.info("partiion: " + metadata.partition()); logger.info("offset: " + metadata.offset()); logger.info("timestamp: " + metadata.timestamp()); } else { logger.error("exception error from broker: " + exception.getMessage()); } } });여기서 sendThread에서 callback에 대한 부분을 호출할때, 이런식으로 동작하는것으로 생각했습니다. 그래서 sendThread에서 broker에서 response를받아 callback에 해당하는 부분을 채워넣을때 이와 같이 동작한다고 생각합니다. (java에 익숙하지 않아서... python코드로 그냥 이해한대로 적어보겠씁니다.. ) def responseCallback(record, callback) { callback.onCompletion(record, exception) }이런식으로 callback 객체의 onCompletion 메서드를 호출하고 받은 정보를 parameter로 넘기는것으로 이해했는데요. 근데, lambda형식으로 바꾸게 되면, kafkaProducer.send(producerRecord, (metadata, exception) -> { if (exception == null) { logger.info("partiion: " + metadata.partition()); logger.info("offset: " + metadata.offset()); logger.info("timestamp: " + metadata.timestamp()); } else { logger.error("exception error from broker: " + exception.getMessage()); } } });이렇게 코드를 작성되는데, 이렇게 되면 callback 함수를 호출할때, onCompletion 메서드를 호출을 안하게 되는건가요?callback(metadata, exception)이와같이 호출을 하는건가요?? lambda에서의 호출방법으로 호출하는건지, 기존의 callback 객체를 호출하는 방식이 맞는건지.. 어떠한 부분이 맞는건지 궁금합니다.
-
미해결[리뉴얼] 처음하는 SQL과 데이터베이스(MySQL) 부트캠프 [입문부터 활용까지]
SAKILA 폴더에서 가져온 SCHEMA와 DATA가 정확히 뭘까요
SCHEMA는 관계 정보고 (DESC명령어로 보는)DATA는 관계에 들어가는 데이터(SELECT 명령어로 보는)인가요?DB정보를 넘길때는 보통 이 두개 파일을 넘기게 되나요?
-
미해결Airflow 마스터 클래스
task가 실행되지 않습니다.
task가 제대로 수행되지 않습니다작업의 로그를 확인하면아래와 같은 메세지만 나타나구요 *** Could not read served logs: Request URL is missing an 'http://' or 'https://' protocol. 아래에서 저와 같은 증상을 겪은 사람을 찾았습니다만링크메모리 사용량을 늘려서 해결했다고만 나타납니다 저는 이미 WSL의 메모리를 8GB에 swap도 2GB도 준 상태구요... airflow의 worker 자체의 에러로그를 찾아보았더니 아래와 같이 권한 문제가 나타납니다[2023-10-31 15:51:22,188: ERROR/ForkPoolWorker-15] [4c24a6e8-1133-46a5-99ac-5fd6bdb3c730] Failed to execute task [Errno 13] Permission denied: '/opt/airflow/logs/dag_id=dags_bash_operator'.zz5414-airflow-worker-1 | Traceback (most recent call last):zz5414-airflow-worker-1 | File "/usr/local/lib/python3.8/pathlib.py", line 1288, in mkdirzz5414-airflow-worker-1 | self._accessor.mkdir(self, mode)zz5414-airflow-worker-1 | FileNotFoundError: [Errno 2] No such file or directory: '/opt/airflow/logs/dag_id=dags_bash_operator/run_id=manual__2023-10-31T15:51:20.844476+00:00/task_id=bash_t1' worker 컨테이너안의 /opt/airflow/logs/dag_id=dags_bash_operator라는 파일에 권한이 없어서 발생하는 문제로 보입니다. 컨테이너 내부 파일의 권한은 어떻게 설정하는지를 모르겠습니다 제가 만든 dag말고도 example_bash_operator도 마찬가지로 실행되지 않고 같은 에러입니다. 해결해주실 수 있을까요?정말 airflow 열심히 배워보려고 했거든요 ㅠㅠ
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
스트림즈dsl의 state.dir에 대해
state.dir을 설명하시다가 /tmp의 생명주기가 다르다고 하셨는데 os 마다 /tmp의 데이터가 삭제되는 조건들이 다르다는 말씀인가요?
-
미해결따라하며 배우는 도커와 CI환경 [2023.11 업데이트]
docker build ./ 했는데 이미지 ID가 안나옵니다
mac os m2 모델을 사용하는데, 질문게시판에 있는대로 buildkit 부분을 건드리려고 설정에서 Docker Engine에 들어가니, 해당부분이 저는 없더라구요. 찾아봐도 이에 대한 언급은 없는데, m1/m2 mac silicon 도커에서는 이미지 Id를 다른 방식으로 찾아야하나요?
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
MSA에서 카프카 사용
학습 목적으로 카프카를 사용 중인데, MSA 구조에서의 카프카 프로듀서, 컨슈머 개념이 잘 이해가 가지 않습니다 ㅠspring boot로 MSA 구조를 구축한 상태입니다. 각 서비스별로 스프링 부트 서버가 존재합니다. 각 서비스가 하나의 데이터베이스 (MySQL 혹은 MongoDB)를 공유하여 사용하려고 합니다. 이 때 스프링 부트가 카프카 토픽에 데이터를 저장하고, 토픽에 있는 데이터를 DB에 저장하여 MSA 환경에서 DB의 일관성을 유지하고자 하는데 이 경우에 카프카를 사용하는 것이 적합할까요?또한 스프링 부트 서버에서 카프카 토픽에 데이터를 주고받을 프로듀서와 컨슈머, MySQL에 토픽의 데이터를 넣고 빼올 프로듀서와 컨슈머 이런식으로 한 서버 당 최소 4개씩을 각각 모두 설정해야하나요?
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
rebalancing 관리 관련 질문이 있습니다
안녕하세요 강사님rebalancing에 질문이 있습니다 commit이 Fail하여 rebalanced나 assigned partitions 같은 에러가 나올떄는보통 어떻게 관리를 하나요?rebalance가 안날순 없다고 알고 있습니다.보통 어떻게 이런 오류를 관리하고 처리하는지 알고싶습니다.따로 consumer를 restart하는 방법도 있나요? 그러면 문제가 될 게 있는지도 궁금합니다.
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
조인관련해서 실행하려면 오류가 뜹니다
jnilib가 없다는 에러가 뜨는데 뭔가 설정을 해야하는 걸까요?Exception in thread "global-table-join-application-76f56ff6-212f-4940-b4fb-fd8379e83d55-GlobalStreamThread" java.lang.UnsatisfiedLinkError: Can't load library: /var/folders/zy/b19yps9j095601_vkghc25wh0000gn/T/librocksdbjni17187958455810980136.jnilib
-
해결됨실리콘밸리 엔지니어와 함께하는 Apache Airflow
섹션1 apache airflow 설치하기 질문
강의 3:17 에서"그대로 카피하셔서 설치하면" 이라고 하셨는데 이게 무슨뜻이죠? 구체적인 방법을 알려주시면 감사하겠습니다.카피해서 터미널에 붙여넣기를 하면 오류가 떠서요
-
미해결Airflow 마스터 클래스
WSL 설치
WSL 설치시 하위시스템이 이미 설치되어있습니다.라고 나오는데 Ubuntu 22.04.1 LTS는 없네요.이미지 다운 어떻게 받아서 설치하나요?
-
해결됨mongoDB 기초부터 실무까지(feat. Node.js)
updateMany에서 user._id를 못찾는 상황
강좌대로 Blog.updateMany({ "user._id": userId }, { "user.name": name })로 하니 블로그 데이터 유저 정보가 변경이 계속 안되고 있습니다 ㅠㅠ 유저 데이터는 정상적으로 변경이 되었는데요 문제점을 모르겠습니다.해당 코드 깃허브 : https://github.com/alinfanclub/KimDevlogServer/blob/updateMany/src/routes/userRouter.js userRouter.put("/:userId", async (req, res) => { try { const { userId } = req.params; if (!mongoose.isValidObjectId(userId)) return res.status(400).send({ err: "invalid userId" }); const { age, name } = req.body; if (!age && !name) return res.status(400).send({ err: "age or name is required" }); if (age && typeof age !== "number") return res.status(400).send({ err: "age must be a number" }); if (name && typeof name.first !== "string" && typeof name.last !== "string") return res.status(400).send({ err: "first and last name are strings" }); // let updateBody = {}; // if(age) updateBody.age = age; // if(name) updateBody.name = name; // const user = await User.findByIdAndUpdate(userId, updateBody, { new: true }); let user = await User.findById(userId); if (age) user.age = age; if (name) { user.name = name; await Blog.updateMany({ "user._id": userId }, { "user.name": name }); } await user.save(); return res.send({ user }); } catch (err) { console.log(err); return res.status(500).send({ err: err.message }); } });
-
미해결데이터베이스 중급(Modeling)
PK에 임의의 식별자(정수형 시퀀스값)부여에 관한 질문드립니다.
영상 마지막에 나온것처럼 PK에 해당하는 칼럼의 값을 프로그래머를 위해 넘겨주어야 한다고 말씀하신거처럼.클라이언트 화면에서는 임의의 식별자 데이터는 렌더링하지는 않지만 사용자(클라이언트 프로그램)가 어떤 데이터를 요청할 때 클라이언트는 해당 데이터(레코드)에 해당하는 PK의 값을 서버에 전달. 과 같은 방식일까요? 질문이 조금 매끄럽지가 않은것같아 좀 더 말해보면에로들어 도서 관리 DB의 도서(Book)테이블에 PK가 도서 번호(1,2,3,4..)이며 나머지 속성은 책 이름, 출판사 등의 속성을 가지고 있고, 책 테이블의 도서 번호를 참조한 자식 관계를 가진 대여 기록 테이블이 있을 때 사용자가 'RDBMS Modeling 기초'라는 책의 대여기록을 보고싶어서 해당 책이름을 클릭하면 내부 코드에서는 클라이언트 코드에서는 해당 책의 PK인 책번호를 서버에게 전달 후 서버는 해당 책번호를 통해 대여기록 테이블과 JOIN하여 클라이언트에 응답. 과 같은 방식이 일반적인지 궁금합니다
-
해결됨카프카 완벽 가이드 - 코어편
브로커가 추가될 때 파티션 재분배
안녕하세요 선생님! 완강 후에 정리하며 이것저것 테스트를 하는 와중에 궁금한게 생겨 질문드립니다. 이미 특정 토픽의 파티션이 브로커들에게 분배된 상태에서, 새로운 브로커가 추가됐을 때 새로운 브로커는 특정 토픽의 파티션을 가질 수 있는 대상으로 선정되지 않는 것 같습니다.새로운 브로커가 추가 됐을 때 새 브로커에도 기존의 토픽의 파티션 재분배를 하는 방법이 있나요?불가능 하다면, 이런 모델을 가지는 이유가 있을까요? 테스트 과정 공유드립니다.broker #1, #2 총 2개 띄운 상태에서 partition 3개, replication 2개의 토픽 생성 (topic-p3r2)Topic: topic-p3r2 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1 Offline: Topic: topic-p3r2 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Offline: Topic: topic-p3r2 Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1 Offline:broker #3 추가 후 topic-p3r2 토픽 상태Topic: topic-p3r2 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1 Offline: Topic: topic-p3r2 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Offline: Topic: topic-p3r2 Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1 Offline:브로커#3은 후보에도 오르지 않았습니다. 제가 예상했던 건 아래와 같이 브로커#3이 추가되었을 때 브로커#3도 토픽의 파티션을 갖는 것이었습니다.(아래 로그는 제가 상상한 것을 임의로 만든 것입니다)Topic: topic-p3r2 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1 Offline: Topic: topic-p3r2 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Offline: Topic: topic-p3r2 Partition: 2 Leader: 3 Replicas: 3,1 Isr: 3,1 Offline: 감사합니다!!
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
강사님 오류 관련하여 질문이 있습니다.
저는 지금 AIOkafka를 사용하고 있는데 commit()을 해주면 종종commit cannot be completed since the group has already rebalanced이 에러가 나오더라구요찾아보니 aio는 자동으로 리밸런싱 해서 그렇다는데 그렇다면 commit을 어떻게 써야 중복도 안되고 자동 리밸런싱으로 오류도 안생길까요?
-
미해결[리뉴얼] 처음하는 SQL과 데이터베이스(MySQL) 부트캠프 [입문부터 활용까지]
pymysql import 시 찾을 수 없는 모듈이라 나옵니다
pip install 시 설치가 이미 되어있다고 나오는데 import 시에는 찾을 수 없다고 나옵니다...