묻고 답해요
161만명의 커뮤니티!! 함께 토론해봐요.
인프런 TOP Writers
-
미해결Airflow 마스터 클래스
섹션3-2 외부 파이썬 함수 수행하기 에러코드 질문드립니다.
아래와 같은 에러가 발생했는데, common 파일을 못찾는 것 같습니다. 혹시 해결방법이 있을까요?
-
미해결카프카 완벽 가이드 - 코어편
혹시 ubuntu desktop 버전이 강의를 볼 때 필 수 인가요??
ssh 접속으로 한다고 하면 구지 desktop 안따라해도 될 것 같은데..
-
미해결다양한 사례로 익히는 SQL 데이터 분석
"사용자별 특정 상품 주문시 함께 가장 많이 주문된 다른 상품 추출하기"에서 조건관련..
주문별 고객별 연관 상품 추출 SQL로 구하기 -02 강의를 듣던중 궁금한점이 있어 글을 남기게 되었습니다.임시테이블 temp_01 에서 인데요..고객별 주문별 연관상품 추출하려면 user_id도 같아야하겠지만, order_id(주문번호)도 같다는 조건 하에 self join해야하지 않을까 싶어서요.select a.user_id, a.product_id as prod_01, b.product_id as prod_02from temp_00 ajoin temp_00 b on a.user_id = b.user_idand a.order_id = b.order_id -- 이 부분 추가되어야하지 않을지 궁금합니다.where a.product_id != b.product_id감사합니다.
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
연결 브로커 지정
안녕하세요클러스터에 브로커가 100개,토픽 A에 파티션 5개, 복제 3개인 상황 이라고 가정하겠습니다.프로듀서는 리더 파티션이 존재하는 브로커와 통신을 해야하는데브로커 100개 중 리더 파티션 5개가 분배될 것인데 클라이언트 입장에서는 리더 파티션이 있는 브로커를 모릅니다.이 때 bootstrap.servers에 100개 중 아무 브로커 2개만 적어 주면 알아서 리더 파티션이 있는 브로커를 알려주나요?bootstrap.servers에 몇 개의 브로커를 적는게 올바른가요?
-
미해결Data Engineering Course (1) : 빅데이터 하둡 직접 설치하기
동영상 재생이 안됩니다.
- 학습 관련 질문을 남겨주세요. 상세히 작성하면 더 좋아요! - 먼저 유사한 질문이 있었는지 검색해보세요. - 서로 예의를 지키며 존중하는 문화를 만들어가요. - 잠깐! 인프런 서비스 운영 관련 문의는 1:1 문의하기를 이용해주세요. 동영상 재생이 너무 느립니다. 그나마 엣지에서는 버벅 거리면서 돌아가긴 했는데, 크롬에 최적화 되어 있다고 해서 크롬으로 해보니 동영상 재생 자체가 안되네요. 계속 로딩만 하고..해결 방법이 없을까요?
-
미해결Airflow 마스터 클래스
localhost:8080 에서 로그인이 안됩니다.
위 상황에서 인터넷창에 localhost:8080 후 진입하면 우선 선생님과 같은 로그인 화면이 아닌 다른 UI가 나옵니다. 그리고 초기 아이디/비번으로 설정된 airflow/airflow 로 시도해도 로그인이 안되고 있어서 해결방법을 알수있을까요?
-
해결됨Airflow 마스터 클래스
task 동시성 제한 및 중복 호출 방지
안녕하세요, 수업을 대부분 수강하고 실제 현업에서 사용중에 있는데 문의사항이 있어서 질문 드립니다. 현재 상황은 이렇습니다.DAG 구성- 5분단위 스케줄링 -4개의 task - task1 >> task2 >> task3 >> task4 - 각 task 별 timeout =5분 문제 상황은 task 2번이 한달에 한번씩 data 가 많아지면 5분까지 타임아웃이 걸릴때가 있는 것인데요,이때 그다음 Dag run 이 수행되면서 task 2 번이 동시에 수행 되는 시간이 조금 있는데 그때 데이터 처리가 중복으로 처리되는 현상이 발생하게 됩니다. 그래서 가능하면 task2 을 동시에 돌리는걸 막고 싶었는데요,처음 생각해낸 방법은 task_concurrency 옵션을 task 에 주어서 1개만 돌수 있게 바꾸고 timeout 을 조금더 넉넉하게 주려고 했으나, 만에하나 해당 task 가 10분이상 걸린다면 dag run 이 수행되고있는것 제외 2개가 더 웨이팅을 하는것이 되고, 이게 누적이 될수도 있을것으로 보여서 문제로 인지 했습니다.서비스 적으로 5분내에 돌수 있게 하거나, 아니면 5분 스케줄링을 변경하는 방법을 고려해야 하지만 해당 고려 없이 혹시 airflow 단에서 할수 있는 작업이 있을까요?ex . runninng 중인 task 와 대기중인 task 가 하나정도 있다면 해당 task 는 스킵하는 옵션 등입니다..
-
미해결실리콘밸리 엔지니어와 함께하는 Apache Airflow
airflow와 postgres간의 connection 오류
airflow와 postgres 간의 connection 오류 문제입니다.airflow UI -> admin-> connections에서 postgres 연결설정docker-compose.yaml 설정 dag 코드입력 airflow tasks test postgres_loader execute_sql_query 2023-01-01 시에 오류가 뜹니다ㅠ[2024-06-21T15:40:45.514+0900] {dagbag.py:545} INFO - Filling up the DagBag from /home/kim/airflow/dags [2024-06-21T15:40:45.805+0900] {taskinstance.py:2076} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: postgres_loader.execute_sql_query __airflow_temporary_run_2024-06-21T06:40:45.755970+00:00__ [None]> [2024-06-21T15:40:45.811+0900] {taskinstance.py:2076} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: postgres_loader.execute_sql_query __airflow_temporary_run_2024-06-21T06:40:45.755970+00:00__ [None]> [2024-06-21T15:40:45.812+0900] {taskinstance.py:2306} INFO - Starting attempt 1 of 1 [2024-06-21T15:40:45.812+0900] {taskinstance.py:2388} WARNING - cannot record queued_duration for task execute_sql_query because previous state change time has not been saved [2024-06-21T15:40:45.813+0900] {taskinstance.py:2330} INFO - Executing <Task(PostgresOperator): execute_sql_query> on 2023-01-01 00:00:00+00:00 [2024-06-21T15:40:45.855+0900] {taskinstance.py:2648} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='postgres_loader' AIRFLOW_CTX_TASK_ID='execute_sql_query' AIRFLOW_CTX_EXECUTION_DATE='2023-01-01T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='__airflow_temporary_run_2024-06-21T06:40:45.755970+00:00__' [2024-06-21T15:40:45.858+0900] {taskinstance.py:430} INFO - ::endgroup:: [2024-06-21T15:40:45.870+0900] {sql.py:276} INFO - Executing: INSERT INTO sample_table (key, value) VALUES ('hello', 'world') [2024-06-21T15:40:45.875+0900] {taskinstance.py:441} INFO - ::group::Post task execution logs [2024-06-21T15:40:45.875+0900] {taskinstance.py:2905} ERROR - Task failed with exception Traceback (most recent call last): File "/home/kim/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) File "/home/kim/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable return execute_callable(context=context, **execute_callable_kwargs) File "/home/kim/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 401, in wrapper return func(self, *args, **kwargs) File "/home/kim/.local/lib/python3.10/site-packages/airflow/providers/common/sql/operators/sql.py", line 277, in execute hook = self.get_db_hook() File "/home/kim/.local/lib/python3.10/site-packages/airflow/providers/common/sql/operators/sql.py", line 188, in get_db_hook return self._hook File "/usr/lib/python3.10/functools.py", line 981, in __get__ val = self.func(instance) File "/home/kim/.local/lib/python3.10/site-packages/airflow/providers/common/sql/operators/sql.py", line 150, in _hook conn = BaseHook.get_connection(conn_id) File "/home/kim/.local/lib/python3.10/site-packages/airflow/hooks/base.py", line 83, in get_connection conn = Connection.get_connection_from_secrets(conn_id) File "/home/kim/.local/lib/python3.10/site-packages/airflow/models/connection.py", line 519, in get_connection_from_secrets raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined") airflow.exceptions.AirflowNotFoundException: The conn_id `my_postgres_connection` isn't defined
-
미해결카프카 완벽 가이드 - 커넥트(Connect) 편
안녕하세요 질문이 있어 문의드립니다.
이미 Kafka Core편은 들었고 현재 Kafka Connect부분중 Sink Connector듣기 전입니다.이 강의를 듣게된 동기이기도 한데요.. DBToDB방식으로 해결하지 않고 Source Connector가 없이 Topic를 만들고 Topic에 Produce하는 방법이 가능한지요? Topic에 생성된 메시지는 Sink Connector 로 DB에 적재되는 구조입니다. Topic의 메시지 구조는 Source Connector에서 생성된 Message 체계로 Topic에 메시지를 적재하는 식으로 처리할려고 합니다.Kafka core에서 배웠던 Produce하는 방법을 이용하는경우입니다. 정리) Source Connector없이 Topic에 메시지를 보내고 이를 SinkConnect로 DB에 적재하는게 가능한지요?감사합니다.
-
미해결카프카 완벽 가이드 - ksqlDB
table의 데이터가 실시간으로 topic에 담기지 않습니다
mysql에 debezium source connector로 topic에 가져온 데이터를 받는 stream을 만들고그 stream을 기반으로 하여CREATE TABLE timeout WITH (KAFKA_TOPIC='timeout' , KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO', PARTITIONS=1) AS > SELECT > order_id -> order_id AS order_id, > TIMESTAMPADD(MILLISECONDS, 9 * 3600 * 1000, PARSE_TIMESTAMP(LATEST_BY_OFFSET(order_datetime), 'yyyy-MM-dd''T''HH:mm:ssX')) AS last_log_time > FROM orders > GROUP BY order_id -> order_id > HAVING ((UNIX_TIMESTAMP(CONVERT_TZ(FROM_UNIXTIME(UNIX_TIMESTAMP()), 'UTC', 'Asia/Seoul')) - UNIX_TIMESTAMP(TIMESTAMPADD(MILLISECONDS, 9 * 3600 * 1000, PARSE_TIMESTAMP(LATEST_BY_OFFSET(order_datetime), 'yyyy-MM-dd''T''HH:mm:ssX')))) / 1000 > 600) > EMIT CHANGES;이런식으로 id별로 마지막 로그 시간이 오고 10분 이상이 지나면 table에 담기도록 만들었습니다처음에 이미 10분이 지난 데이터를 넣으면 table에도 들어가고 topic에도 잘 들어가는데현재시간의 데이터를 넣고 10분이 지나면 table에는 들어가는데 topic에는 들어가지 않습니다table에도 담기고 topic에도 담기려면 어떻게 해야하나요? 아니면 원래 불가능한건가요?기반한 stream은 데이터를 넣으면 곧 바로 stream과 토픽에 잘 들어갑니다.|ORDER_ID |CALCULATED_TIME |LAST_LOG_TIME | +------------------------------------------+------------------------------------------+------------------------------------------+ |1 |78088 |2024-06-16T12:30:00.000 | |2 |69988 |2024-06-16T14:45:00.000 | |3 |72088 |2024-06-16T14:10:00.000 | |4 |32739088 |2023-06-04T12:00:00.000 | |5 |32637088 |2023-06-05T16:20:00.000 | |6 |32567788 |2023-06-06T11:35:00.000 | |7 |69058 |2024-06-16T15:00:30.000 | |8 |68698 |2024-06-16T15:06:30.000 | |9 |66958 |2024-06-16T15:35:30.000 | |10 |65698 |2024-06-16T15:56:30.000 | |11 |66298 |2024-06-16T15:46:30.000 | |12 |4258 |2024-06-17T09:00:30.000 | |13 |3418 |2024-06-17T09:14:30.000 | |14 |1918 |2024-06-17T09:39:30.000 | |15 |2429 |2024-06-17T09:30:59.000 | Query terminated ksql> print result7777; Key format: AVRO or KAFKA_STRING Value format: AVRO rowtime: 2024/06/16 04:23:23.878 Z, key: 1, value: {"CALCULATED_TIME": 12183, "LAST_LOG_TIME": 1718541000000}, partition: 0 rowtime: 2024/06/16 04:23:23.879 Z, key: 2, value: {"CALCULATED_TIME": 4083, "LAST_LOG_TIME": 1718549100000}, partition: 0 rowtime: 2024/06/16 05:10:08.498 Z, key: 3, value: {"CALCULATED_TIME": 6183, "LAST_LOG_TIME": 1718547000000}, partition: 0 rowtime: 2024/06/16 06:06:52.365 Z, key: 4, value: {"CALCULATED_TIME": 32673183, "LAST_LOG_TIME": 1685880000000}, partition: 0 rowtime: 2024/06/16 06:06:52.373 Z, key: 5, value: {"CALCULATED_TIME": 32571183, "LAST_LOG_TIME": 1685982000000}, partition: 0 rowtime: 2024/06/16 06:06:52.377 Z, key: 6, value: {"CALCULATED_TIME": 32501883, "LAST_LOG_TIME": 1686051300000}, partition: 0 rowtime: 2024/06/16 06:09:36.530 Z, key: 7, value: {"CALCULATED_TIME": 3153, "LAST_LOG_TIME": 1718550030000}, partition: 0 rowtime: 2024/06/16 06:15:08.351 Z, key: 8, value: {"CALCULATED_TIME": 2793, "LAST_LOG_TIME": 1718550390000}, partition: 0 rowtime: 2024/06/16 06:41:28.920 Z, key: 9, value: {"CALCULATED_TIME": 1053, "LAST_LOG_TIME": 1718552130000}, partition: 0 rowtime: 2024/06/17 00:23:09.442 Z, key: 12, value: {"CALCULATED_TIME": 1372, "LAST_LOG_TIME": 1718614830000}, partition: 01-9, 12 이미 10분이 지난 데이터 // 그 외 = 데이터가 mysql에 담기고 10분이 지나 table에 담긴 데이터
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
auto_commit_interval_ms_config 질문
- 카프카를 공부하시면서 생긴 질문들을 남겨주세요. 상세히 작성하면 더 좋아요! - 먼저 유사한 질문이 있었는지 검색해보세요. - 서로 예의를 지키며 존중하는 문화를 만들어가요. - 잠깐! 인프런 서비스 운영 관련 문의는 1:1 문의하기를 이용해주세요. 안녕하세요~auto_commit_interval_ms_config 에 대해 궁금한 점이 있는데요 찾아봐도 모호해서 질문드립니다.auto_commit_interval_ms_config = 60이라고 가정했을때위 옵션은 poll() 호출 여부와 관계없이 60초 마다 자동으로 커밋을 해주는건가요?아니면 마지막 자동커밋 발생하고 60초 이후에 poll() 이 호출될때 커밋을 해준다는건가요?
-
미해결Database - SQL
MySQL 실습
CUSTOMER ID FORMATTING- 1: 00001- 2: 00002- 13: 00013SELECT CUSTOMERID, CONCAT(REPEAT('0', 5-LENGTH(CUSTOMERID)), CUSTOMERID)FROM CUSTOMER;MySQL은 문자열 + 하기 연산이 없어서CONCAT 함수를 사용해야 합니다
-
미해결카프카 완벽 가이드 - 코어편
강의 질문
제가 EXCEL파일에 있는 데이터를 카프카를 통해 db에 저장을 해야하는데 이 강의를 들으면 할 수 있을까요?
-
해결됨카프카 완벽 가이드 - ksqlDB
debezium에서 ksqldb로
제가 구상하고있는 구조가 mysql에서 debezium source connector가 topic에 넘기고 ksqldb의 streams나 table로 재구성하여 다른 topic으로 넘긴 후mysql sink database에서 받는다. 라는걸 구상중인데요 ksqldb에서 직접 insert를 하면 json 형식이 아니라서 sink connector가 읽지 못하는거 같습니다.-- debezium.json --{ "name": "debezium", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "localhost", "database.port": "3306", "database.user": "root", "database.password": "1234", "database.allowPublicKeyRetrieval": "true", "database.server.id": "10777", "database.server.name": "debe01", "database.include.list": "debe", "table.include.list": "debe.user", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "schema-changesde.mysql.oc", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "database.connectionTimeZone": "Asia/Seoul", "time.precision.mode": "connect", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false" } } 그래서 debezium으로 mysql의 data를 읽어서 topic으로 가져왔는데ksqldb에서 그 data를 읽는 부분에서 막혔습니다강의에 나온거처럼 ksqldb와 debezium을 연동을 해야 가능한건가요?아니면 어떤 방법이 있을까요?
-
미해결따라하며 배우는 도커와 CI환경 [2023.11 업데이트]
Docker Volume 오류(reference, lowercase)
#ERROR1repository 이름이 소문자여야 한다는 에러입니다. 제가 폴더 명을 "Docker"로 해놨었더니 이런 오류가 뜨더라고요. 폴더명을 "docker"로 바꿨더니 해결되었습니다.kim-yaegun@gim-yegeons-MacBook-Air Docker % docker run -p 5001:8080 -v /usr/src/app/node_modules -v $(pwd):/usr/src/app yaegun/nodedocker: invalid reference format: repository name (Docker) must be lowercase.See 'docker run --help'.#ERROR2레퍼런스가 유효하지 않다는 에러입니다.kim-yaegun@gim-yegeons-MacBook-Air docker % docker run -p 5001:8000 -v /usr/src/app/node_modules -v $(pwd):/usr/src/app yaegunkim/nodedocker: invalid reference format.See 'docker run --help'.이건 $(pwd)를"$(pwd)"로 바꾸어 주니 해결되었습니다.kim-yaegun@gim-yegeons-MacBook-Air docker % docker run -p 5001:8000 -v /usr/src/app/node_modules -v "$(pwd)":/usr/src/app yaegunkim/nodeSuccessful
-
미해결따라하며 배우는 도커와 CI환경 [2023.11 업데이트]
[섹션3 - 내가 만든 이미지 기억하기 쉬운 이름 주기] 네이밍/태그 에러
"docker build -t YaegunKim/hello:latest ./"로 빌드를 잘 한 것 같은데 아래와 같은 에러가 나오더라고요.#COMMANDdocker run -it YaegunKim/hello#ERRORUnable to find image 'YaegunKim/hello:latest' locallydocker: Error response from daemon: Get "https://YaegunKim/v2/": dialing YaegunKim:443 container via direct connection because has no HTTPS proxy: resolving host YaegunKim: lookup YaegunKim: no such host.See 'docker run --help'.근데 또 아이디로 run을 하면 잘 되고...#PROBLEM-SOLVING이유는 버전은 "latest"가 아닌 "lastest"로 해서 에러가 났던 것이었습니다. 다시 빌드 하고 실행해보니 hello가 잘 출력되네요 ㅎㅎ
-
미해결Airflow 마스터 클래스
도커를 사용하지 않는 방법
안녕하세요 HPC를 사용하고 있는데 도커가 사용 불가능한 HPC라 우선은 구글링하여 airflow를 설치하고 강의를 듣고 있습니다. 아직 1강인데, 혹시 차후에 도커가 없어서 강의를 못따라가는 상황이 생길까요? 수강신청전에 미리 확인해봤어야 했는데 죄송합니다 ㅜㅜ!
-
미해결실리콘밸리 엔지니어와 함께하는 Apache Airflow
from airflow.sensors.sql import SqlSensor에 대해 질문 있습니다.
선생님이 4:21초에 from airflow.sensors.sql import SqlSensor는 provider에 있는게 아니라 core에 있는 sensor라고 알려주셨는데 airflow 버전 2.9.1에서는 SqlSensor가 apache-airflow-providers-common-sql 패키지에 포함되어 있다고 하는데 그러면 버전 2.9.1에서는 airflow core에 있는 sensor를 사용하지 못하는 건가요??
-
미해결15일간의 빅데이터 파일럿 프로젝트
tail -f flume-cmf-flume-AGENT-server02.hadoop.com.log 오류
tail -f flume-cmf-flume-AGENT-server02.hadoop.com.log 했을때 Creating이나 강의에 말씀한 내용 나오지않고, 아래처럼 나오기만 하는데 Flume Config파일도 정상적이고 재시동도 해봤는데 안되는데 또 조치해야할게 있을까요?
-
해결됨카프카 완벽 가이드 - ksqlDB
CLI로 실행과 코드로 실행하면 결과가 다르게 나옵니다
CREATE STREAM add_stream WITH (KAFKA_TOPIC='column_stream_topic', VALUE_FORMAT='JSON') AS SELECT *, CAST(NULL AS INT) AS new_column FROM test_stream EMIT CHANGES;이렇게 기존 test_stream에서 column을 추가한 add_stream을 만들려고 CLI문을 실행시키면원래 test_stream에 담겨있는 data가 담아져서 나오는데package com.example.service; import io.confluent.ksql.api.client.Client; import io.confluent.ksql.api.client.ClientOptions; import io.confluent.ksql.api.client.ExecuteStatementResult; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.concurrent.ExecutionException; @Service public class streamPracticeAdd { @Value("${ksqldb.server.host}") private String ksqlDbHost; private int ksqlDbPort; private Client client; @PostConstruct public void init() { ClientOptions options = ClientOptions.create() .setHost(ksqlDbHost) .setPort(ksqlDbPort); client = Client.create(options); } public void streamsAdd(String columnName, String dataType) { String createStreamKsql = "CREATE STREAM add_stream WITH (KAFKA_TOPIC='column_stream_topic', VALUE_FORMAT='JSON') AS SELECT *, CAST(NULL AS " + dataType + ") AS " + columnName + " FROM test_stream EMIT CHANGES;"; try { ExecuteStatementResult result = client.executeStatement(createStreamKsql).get(); System.out.println("Stream created and data inserted into new topic: " + result.queryId()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }/kafka/addColumn/new_column/INT 인 API 요청을 줘서 새 stream을 만드는 코드인데실행시키면 기존 column에 새 column까지 추가는 되는데 기존 data가 하나도 들어오지 않습니다.검색을 해봤는데도 잘 안나와서 질문 남깁니다 감사합니다.