묻고 답해요
161만명의 커뮤니티!! 함께 토론해봐요.
인프런 TOP Writers
-
미해결Airflow 마스터 클래스
Email operator로 메일 전송하기 오류 내용에 대한 도움 부탁드립니다.
안녕하세요. 선생님.Email operator 메일 전송 dag 수행 시 아래 오류 발생합니다.시간 되실 때 내용에 대해 도움 부탁드립니다.*** /opt/airflow/logs/dag_id=dags_email_operator/run_id=manual__2025-02-19T05:59:40.395069+00:00/task_id=send_email_task/attempt=1.log ▲▲▲ Log group end [2025-02-19, 14:59:44 KST] {local_task_job_runner.py:123} ▼ Pre task execution logs [2025-02-19, 14:59:44 KST] {taskinstance.py:2613} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: dags_email_operator.send_email_task manual__2025-02-19T05:59:40.395069+00:00 [queued]> [2025-02-19, 14:59:44 KST] {taskinstance.py:2613} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: dags_email_operator.send_email_task manual__2025-02-19T05:59:40.395069+00:00 [queued]> [2025-02-19, 14:59:44 KST] {taskinstance.py:2866} INFO - Starting attempt 1 of 1 [2025-02-19, 14:59:44 KST] {taskinstance.py:2889} INFO - Executing <Task(EmailOperator): send_email_task> on 2025-02-19 05:59:40.395069+00:00 [2025-02-19, 14:59:44 KST] {warnings.py:112} WARNING - /home/**/.local/lib/python3.12/site-packages/***/task/task_runner/standard_task_runner.py:70: DeprecationWarning: This process (pid=117) is multi-threaded, use of fork() may lead to deadlocks in the child. pid = os.fork() [2025-02-19, 14:59:44 KST] {standard_task_runner.py:72} INFO - Started process 118 to run task [2025-02-19, 14:59:44 KST] {standard_task_runner.py:104} INFO - Running: ['***', 'tasks', 'run', 'dags_email_operator', 'send_email_task', 'manual__2025-02-19T05:59:40.395069+00:00', '--job-id', '121', '--raw', '--subdir', 'DAGS_FOLDER/dags_email_operator.py', '--cfg-path', '/tmp/tmp4a9ijz8k'] [2025-02-19, 14:59:44 KST] {standard_task_runner.py:105} INFO - Job 121: Subtask send_email_task [2025-02-19, 14:59:44 KST] {task_command.py:467} INFO - Running <TaskInstance: dags_email_operator.send_email_task manual__2025-02-19T05:59:40.395069+00:00 [running]> on host 4742acded404 [2025-02-19, 14:59:44 KST] {taskinstance.py:3132} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='dags_email_operator' AIRFLOW_CTX_TASK_ID='send_email_task' AIRFLOW_CTX_EXECUTION_DATE='2025-02-19T05:59:40.395069+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2025-02-19T05:59:40.395069+00:00' [2025-02-19, 14:59:44 KST] {logging_mixin.py:190} INFO - Task instance is in running state [2025-02-19, 14:59:44 KST] {logging_mixin.py:190} INFO - Previous state of the Task instance: queued [2025-02-19, 14:59:44 KST] {logging_mixin.py:190} INFO - Current task name:send_email_task state:running start_date:2025-02-19 05:59:44.157879+00:00 [2025-02-19, 14:59:44 KST] {logging_mixin.py:190} INFO - Dag name:dags_email_operator and current dag run status:running [2025-02-19, 14:59:44 KST] {taskinstance.py:731} ▲▲▲ Log group end
-
해결됨카프카 완벽 가이드 - ksqlDB
Compact Topic에 대하여
안녕하세요 항상 좋은 강의 해주셔서 감사합니다.다름이 아니라 제가 프로젝트 중에 유저가 알림 설정한 데이터를 kafka로 받아서 시세 데이터와 조인하여알림 메세지를 생성하려고합니다. 유저 알림 데이터는 boolean값을 가진 활성화여부와 삭제라는 기능이있습니다.그래서 kafka내에서 유저 알림 데이터를 최신화하기 위해서 찾아본 결과 compact topic이라는것을 발견하였고 실습중에 있습니다만 결과가 좋지않아서 질문드립니다. sudo docker exec -it kafka-1 kafka-topics --create --bootstrap-server 192.168.56.101:29092 --topic user-alert-set --partitions 6 --replication-factor 2 --config "cleanup.policy=compact" --config "min.compaction.lag.ms=5000" --config "max.compaction.lag.ms=10000 " --config "delete.retention.ms=3000" --config "segment.ms=3000" --config "min.cleanable.dirty.ratio=0.01"이런식으로 토픽을 생성하여 실습한 결과1. Key A: value1 넣음 -> [A:value1] 2. Key A: value2 넣음 -> [A:value1, A:value2] 3. Key B: value1 넣음 -> [ A:value2, B:value1] 4. Key B: value2 넣음 -> [A:value2, B:value1, B:value2]이렇게 진행되었습니다. 삭제를 위한 tombstone메세지 또한 위와같은 형식으로 진행되었습니다.제가 이해한 바로는 같은 키의 값이 들어오면 들어온 값으로 최신화 혹은 삭제를 하는 설정으로 이해를 해서이 결과가 저는 이해가 되지않습니다. gpt한테 물어보니 로그 컴팩션은 "head"와 "tail" 두 부분으로 나뉩니다:Tail: 이미 컴팩션이 완료된 부분 (깔끔한 상태)Head: 아직 컴팩션되지 않은 활성 부분 (더티 상태)예시:1. A:value1 입력 -> [A:value1] 2. A:value2 입력 -> [A:value1, A:value2] (컴팩션 발생) -> [A:value2] // A는 tail 부분으로 이동 3. B:value1 입력 -> [A:value2, B:value1] // B는 head 부분에 위치 즉, A는 이미 컴팩션되어 tail 부분에 최신 값만 있고, B는 아직 head 부분에 있어서 컴팩션되지 않은 상태로 남아있는 것입니다.이는 정상적인 동작이며, B도 시간이 지나면 컴팩션될 것입니다.라는 답변을 받았지만 시간이 지나도 키가 B인 데이터는 여전히 두개로 남아있었습니다.혹시 이유를 아신다면 설명해주시면 감사하겠습니다.
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
zookeeper실행시 오류가 발생합니다.
bin/zookeeper-server-start.sh config/zookeeper.properties 이 명령어를 통해서 주키퍼를 실행시킬 떄 오류가 발생합니다. 삭제했다가 다시 설치도 반복했음에도 불구하고 계속 오류가 발생합니다. 도와주세요..
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
커스텀 소스 커넥터에서 Thread.sleep (1000) 은 왜 하는거에요?
섹션 10. 커스텀 소스 커넥터 22:49 보고있는데요. poll 함수의 상단에 Thread.sleep(1000) 을 하고있는데 왜 1초의 딜레이를 주는지 궁금합니다.
-
해결됨카프카 완벽 가이드 - 커넥트(Connect) 편
table.name.format에 관하여..
안녕하세요 선생님. 강의 잘 보고 있습니다.JDBC Sink Connector의 데이터베이스 스키마 설정 문제가 있네요. 찾아보니 JDBC Sink Connector의 내부적인 문제 같습니다... 구글링을 해보니 table.name.format에서 .구분자를 기준으로 데이터베이스 스키마를 지정해주는 것이 저를 포함하여 문제가 있는 사람들도 있던데... 결론은 그냥 되는 config로 사용하자 입니다. 아래의 내용은 혹여나 저와 같은 문제가 생기신다면 참고해 주세요. 공식 포럼과 이슈를 확인해봐도 명확하게 문제의 원인이 무엇인지에 대한 설명은 없습니다. 해결 방안만 있어요.. JDBC Sink Connector 생성하여 Key값을 가지는 Customers 토픽에서 테이블로 데이터 Sink해당 github 링크의 코드를 확인해보시면 connection.url에서 이미 스키마 om_sink를 바라보고 있는데, table.name.format 설정에서 또다시 om_sink 하위에 table을 만드려는 시도에 Exception이 발생합니다. CREATE TABLE om_sink.`om_sink`.`customers_sink_base`라는 SQL 문을 실행하게 되고 이에 exception이 발생합니다. 에러 발생=> om_sink의 om_sink의 customers_sink_base 테이블 생성을 시도함.CREATE TABLE `om_sink`.`om_sink`.`customers_sink_base` -- 이렇게 실행됨. 따라서 connection.url 또는, table.name.format 둘 중 한 군데에는 om_sink를 빼줘야 할 것 같아요. 수정 후 정상 동작
-
미해결따라하며 배우는 도커와 CI환경 [2023.11 업데이트]
The instance profile aws-elasticbeanstalk-ec2-role associated with the environment does not exist.
https://ginghambagle.tistory.com/162이분 블로그 참고해서 오류 해소했습니다.
-
미해결따라하며 배우는 도커와 CI환경 [2023.11 업데이트]
travis ci가 이제 유료화가 된 것 같습니다;;;
이거 무료로 할 수 있는 방법이 있을까요? 인강 진행이 안되네요.
-
미해결15일간의 빅데이터 파일럿 프로젝트
가상환경 내보내기
파일만 가지고 가서 다른곳에서 실행하고 싶은데 ISO 이미지 포함이 안되도 되나요?? 위 설정대로 하면 가져오기만 해도 실행이 될까요? 초반에 .vbox 머신추가했던것 처럼 하고 싶습니다. 수업 듣는 내내 궁금했는데 저기가 원천으로 사용하는 5개 테이블의 create 문을 알고싶습니다. 어떤 컬럼이 있고 각 컬럼들은 무엇을 의미하는지는 강의나 교안에 잘 안나와 있어서 한눈에 안들어 올때가 있었습니다. 테이블정의서를 그려서 좀 보고 싶습니다.
-
미해결15일간의 빅데이터 파일럿 프로젝트
SpoolDIR 폴더로 옮기
mv를 했는데 파일이 사라지기만 하고 옮겨지지 않습니다
-
미해결15일간의 빅데이터 파일럿 프로젝트
cpu usage 에러
모니터링을 하고 싶은데 위 에러가 뜨더니 이후엔 Magnagement를 켜면 나머지 기능들에 에러가 나서 작동이 아예 안되는 수준입니다.
-
미해결15일간의 빅데이터 파일럿 프로젝트
쿼리 실행시 10000 에러
이런 에러가 자주 나는데 왜 나는 걸까요? 쿼리실행시 가끔 떠서 여쭤봅니다
-
미해결15일간의 빅데이터 파일럿 프로젝트
탐색 과정 중 주제2 진행중 발생 에러
ascii 관련 에러가 났는데 어느 부분에서 고쳐야하는지 모르겠어요
-
미해결15일간의 빅데이터 파일럿 프로젝트
Hue(휴) 설치중 에러
-
미해결다양한 사례로 익히는 SQL 데이터 분석
퍼널 질문드립니다.
안녕하세요해당 데이터셋에는 퍼널의 순서가 있다보니 각단계별 전환율을 구할 수 있었는데요, 다른 데이터의 경우 퍼널의 순서가 없다면 어떻게 분석을 해야할까요? 예를들어 "로그인 > 장바구니 >구매"순서가 아닌"로그인 > 구매" 이렇게 장바구니와 구매를 선택할수 있는 허브역할을 할 수도 있다고 생각되는데요.이부분에서 개인적으로 공부하는데 막히더군요
-
미해결Airflow 마스터 클래스
filesensor가 해당 파일이 있는데 찾지 못하는 경우
안녕하세요, 영상 따라 실습중에 궁금한 점이 있습니다.filesensor로 확인하려는 파일이 존재하는데도 task가 계속 reschedule되고 있는 원인을 모르겠습니다.😭
-
미해결카프카 완벽 가이드 - 코어편
acks 1 이면 비동기가 아니지 않나요?!
제가 공부하다가 든 의문에 대해서 자문자답?을 해봤습니다!혹시 제가 잘못 이해하거나 틀린 개념이 있다면 알려주실 수 있을까요?나의 의문! 비동기 전송할 때, acks = 1 설정이면 Producer는 Leader broker가 메시지 A를 정상적으로 받았는지에 대한 ==Ack 메시지를 받은 후 다음 메시지인 메시지 B를 바로 전송==. 이러면 async 전송이 아닌거 아니야?내 생각어디까지 비동기 처리인지에 대한 경계가 불분명해서 생긴 오해!!1. Main Thread (producer.send() 호출하는 thread) for (seq in 0..19) { producer.send(topic, seq, record) }메인 스레드는 blocking 없이 연속적으로 메시지 전송! -> 비동기 전송!producer 내부[main thread] -> [RecordAccumulator] -> [sender thread] -> [broker]RecordAccumulator는 메시지들을 버퍼링하고 배치로 모음2. sender thread 동작 (acks 1 기준)0번 메시지 전송 -> ack 대기 -> ack 수신 완료1번 메시지 전송 -> ack 대기 -> ack 수신 완료...sender thread는 이젠 메시지의 ack를 받을 때가지 다음 메시지를 보내지 않는다하지만 main thread는 이와 무관하게 계속해서 메시지를 RecordAccumulator에 추가할 수 있다! 결론 main thread 입장에서는 메시지 응답을 기다리지 않고 계속 보낼 수 있으니까 비동기 전송 -> 메시지는 batch에 차곡차곡 쌓임실제 메시지를 전송하는 sender thread는 acks 1 설정으로 leader broker가 보낸 ack를 수신해야 다음 메시지 전송 이렇게 이해했는데 제가 이해한게 맞을까요,,,?
-
미해결Airflow 마스터 클래스
블로그 api 관련 질문
chatGPT&Airflow로 블로그 자동 포스팅하기 강의를 듣고 실습을 하려다가tistory가 api 서비스를 중단되었는 것을 알았는데 혹시 대체 블로그가 있는지 문의드립니다.
-
미해결카프카 완벽 가이드 - 코어편
Producer의 메시지 비동기화 전송 구현 강좌 내용 중 질문
안녕하세요뒷 강의에서 뭔가 말씀해주실것같긴한데성급해서 먼저 질문드립니다. 동기, 비동기 발송 시 설정한 ack 모드(1 혹은 -1)에 따라 Exception이 발생하면 send thread에서 재전송을 한다고 말씀해주셨는데요 강의 중 작성해주신 아래 코드에서 else부분에서 error 처리(캐치)부분을 넣는것과 별개로재전송은 따로 이루어진다 라고 이해하면될까요? // KafkaProducer message send kafkaProducer.send(producerRecord, (metadata, exception) -> { if(exception == null) { // 로그출력부분 } else { // 에러 처리부분 logger.error("exception error from broker" + exception.getMessage()); } });
-
미해결Airflow 마스터 클래스
dag_run 주기적으로 삭제
안녕하세요 강사님! 실습중에 궁금한게 생겨서 질문드립니다! dag이 실행될때 저장되는 dag_run 데이터는 주기적으로 삭제해도 airflow 스케줄 실행에 문제가 발생하진 않나요?? postgres도 EC2 내부에 docker-compose로 띄워놔서 혹시나 주기적으로 지워주면 리소스를 줄일 수 있을까 해서요!
-
미해결Airflow 마스터 클래스
강의 내용이 일부 잘린 것으로 보입니다!
안녕하세요! 항상 강의 잘 듣고 있습니다다른 강의와 달리 실습 코드에 대한 설명 없이 바로 airflow 실행으로 화면이 넘어가는 것으로 보입니다!혹 강의 영상의 일부가 잘린 것인가 하여 문의를 드립니다(해당 영상 4분 30초 기준)