게시글
질문&답변
2024.05.19
데이터 엔지니어 업무 초보자가 궁금한 점 질문드립니다..
안녕하세요 권설민님! 저도 Datalake 구축시 빅쿼리를 기반으로 Airflow를 사용했던 적이 있습니다. GCP를 사용하신다니 반갑네요 ^^ 질문에 답변하자면, Airflow와 pandas 조합은 개인적으로 비추천합니다. pandas 가 소규모 데이터에서는 큰 문제없이 동작하는데 데이터 규모가 커지면 CPU를 많이 먹습니다. pandas 아키텍처상 병렬처리도 불가능하구요. 그리고 아키텍처 관점에서도 비추천합니다. Airflow 는 ETL 툴이라 부르기보다는 오케스트레이션 툴이라 불립니다. 즉 Airflow의 워커가 직접 CPU/MEMORY 와 같은 자원을 소모하며 작업을 처리하기 보다는 다른 도구에 작업을 지시하고 관리하는 것에 더 부합하기 때문입니다. 따라서 전처리가 필요하다면 Dataproc 또는 빅쿼리와 같은 도구를 사용하는것이 좋습니다. 그리고 이 문제는 워크플로우의 관리 문제에 앞서서, 데이터레이크의 데이터 흐름을 어떻게 관리할 것이냐에 대한 거버넌스 문제이기도 합니다. (제 생각에 이런 거버넌스를 먼저 확정하는 것이 좋을 듯 합니다) 데이터레이크는 그 정의에 따라 정형/비정형 데이터를 원천 그대로의 모습으로 한 곳에 모아두는 것입니다. 물론 원천 그대로만 두지는 않고 활용을 위해 가공이 필요하지요. 일반적으로 GCS 에 저장해두고, SQL 처리도구로 빅쿼리를 사용하는게 가장 일반적인 모습인것 같습니다. (AWS라면 S3에 저장해두고 glue 로 처리하는 형태) 따라서 설민님께서 고민하시는 파이프라인은 크게 두 가지로 구분할 수 있는데 1) 원천 데이터를 가져와 Airflow가 pandas 로 만들어 메모리상에서 처리 후 GCS에 저장한다. 2) 원천 데이터를 가져와 GCS에 저장해두고 빅쿼리로 처리한다. 어떤게 더 나아 보이나요? 1)은 흔히 과거 DW가 유행하던 시절 ETL 도구가 처리하는 방식입니다. Extract 후 Transformation 하고 마지막에 Load 하는 방식이지요. 2)는 datalake 개념이 도입되면서 ELT 라는 이름으로 처리하는 방식입니다. Extract 후 일단 GCS에 Load하고 필요시 가공(Transformation)하는 것이죠. - 데이터 수집이 몇 개가 아니라 향후 수천개로 늘어난다면 어떤게 더 나을 것 같나요? - 만약 수집한 데이터에 문제가 있어서 제대로 전처리가 되지 않았고, 원인을 파악해야 한다면 어떤 방법이 더 나을까요? - 수집한 데이터가 수십GB 이상에 달해 처리에 오래 걸린다면 어떤 방법이 더 나을까요? - 만약 원천 그대로의 데이터가 필요한 상황이 온다면 어떻게 할까요? 위 4가지 케이스 모두 Airflow가 직접 pandas로 처리하기보다 GCS에 저장 후 빅쿼리로 처리하는게 더 나은 방법입니다. 일반적으로 물리테이블을 만들어서 처리합니다. 빅쿼리도 대용량 테이블을 view를 이용해서 굴비엮듯 다단계로 구성해서 한번에 처리하려고 하다보면 메모리 부족 에러가 뜨기도 합니다. 가급적 물리 테이블을 이용해서 구간별로 처리하는 걸 권장합니다. 이 방법이 중간에 에러로 멈췄을 때 어디서 에러가 발생했는지 추적하기도 좋습니다. 장애 발생 시 어떤 구간에서 발생했으며 Retry 시 저장하면서 가야 정확한 에러 구간에 대해 모니터링이 가능해보여서요.. 이건 전처리 도구에 어떤 툴을 활용해도 에러 구간 확인은 가능할 것 같습니다. 꼭 이것 때문에 빅쿼리를 써야하는건 아닐것 같아요. 이건 정답이 있는지는 저도 모르겠습니다 ^^ 저는 일반적으로 DB를 별도로 만들어 dag에 필요한 내용을 작성해 놓으면 DB를 읽어 dag이 생성되도록 구성하기를 선호합니다. 이 방법은 DB에 내용을 작성하는 방법만 알면 표준화된 DAG 관리가 가능하다는 것이 가장 큰 장점입니다. 뿐만 아니라 DAG 과 task의 구성, DAG간의 선후행 관계 등을 DB쿼리를 통해 한번에 파악/관리할 수 있다는 것도 큰 장점이지요. 단점은 개발자가 Airflow의 고급 기능들을 자유자재로 활용하기가 어렵다는 점입니다. 저는 SI 프로젝트를 주로 하므로 실력차이가 서로 다른 개발자 분들이 와서 개발해도 어느정도 표준화된 DAG 구성을 만들게끔 하고자 이 방법을 주로 사용합니다. 그리고 이렇게 하면 git으로 DAG 관리를 하는게 아니라 DB로 관리를 하게되어 git 표준하고는 상관이 없어집니다. 혹시 git 브랜치 전략같은게 궁금하시다면 DAG과 관련해서도 feature 브랜치, hotfix 브랜치 등과 같은 전략을 통해 관리할 수 있습니다. 신규 DAG을 만든다면 issue 등록 후 feature 브랜치를 따서 DAG 내용 작성하는 것도 가능하구요. 파이프라인에 문제가 있어 DAG 수정을 해야한다면 hotfix 브랜치를 따서 수정하는 것도 좋구요. 하지만 브랜치 전략이라는게 그렇지만 꼭 이런 브랜치 전략을 따를 필요는 없습니다. 마찬가지로 git을 이용 방법도 정답이 없는 문제라 프로젝트 상황을 봐야합니다. 마지막으로.. 빅쿼리는 사용해보시면 장단점이 꽤 분명합니다. 장점은 엄청 빠릅니다. 대규모 데이터도 금방 처리가 되구요. 단점은 RDB와는 달리 인덱스나 PK설정이 안됩니다. 그리고 파티션 설정을 잘해야 비용/성능이 잘 나온다는 것입니다. 그리고 처리한 데이터양을 기준으로 비용이 청구되어서 비용도 생각보다 많이 나오기도 합니다. 그래서 빅쿼리 사용을 권장하긴 하는데 공부를 좀 하셔야 잘 사용하실 수 있을거에요. 아무튼 데이터 엔지니어로써 고민이 많으신듯 합니다. ^^ 누군가 설계해놓은 걸 이행하는 것보다 이런 고민을 직접 많이 하시는게 엄청 도움이 많이 됩니다. 앞으로 더욱 성장하실 것 같네요. 화이팅입니다!
- 0
- 1
- 32
질문&답변
2024.05.01
강의 내용 정리
안녕하세요 이한희님! 네! 물론 괜찮습니다. 도움이 많이 되셨다니 다행입니다 ^^
- 0
- 1
- 58
질문&답변
2024.04.24
파이참에서 외부 파이썬 함수 수행하기
안녕하세요 rosy님 우선 구분해야할 것이, 1) airflow가 common을 인식하지 못한 것은 2) rosy님 로컬에 설치한 파이참에서 from common 부분을 인식하지 못해 빨간줄 그어진 것과 서로 다른 문제입니다. 우선 1)의 원인 먼저 확인을 해볼까요? 우선 ModuleNotFoundError를 내뱉은 것은 정확히는 scheduler 가 내뱉은 에러이고, scheduler 컨테이너가 plugins/common/common_ func.py 를 잘 인식하고 있는지 확인을 해봐야 합니다. WSL에서 스케줄러 컨테이너 안으로 진입한 후 plugins/common/common_ func.py 파일이 잘 있는지부터 확인해볼께요 sudo docker ps sudo docker exec -it {scheduler 컨테이너ID} bash 위 명령으로 스케줄러 컨테이너로 진입한 후 $ cd /opt/airflow/plugins/common $ ls 이렇게 해서 common_ func.py 가 있는지 확인해보세요. 아마 없을겁니다. 그렇다면 docker-compose.yaml 파일에서, 볼륨 연결이 잘 안되었을 가능성이 있습니다. Volumns 항목이 아래처럼 잘 작성되었는지 확인해보세요. volumes: - ${AIRFLOW_PROJ_DIR:-.}/airflow/dags:/opt/airflow/dags - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs - ${AIRFLOW_PROJ_DIR:-.}/airflow/plugins:/opt/airflow/plugins - ${AIRFLOW_PROJ_DIR:-.}/airflow/files:/opt/airflow/files 다음으로 2)의 문제를 볼까요? 1)과 2)가 서로 다른 문제인 이유는, 위에서 설명드린 것처럼 Airflow가 파일을 못 찾은 것은 컨테이너 안에서 일어난 일입니다. 하지만 2)는 rosy 님의 로컬 컴퓨터에서 일어난 일이죠. 따라서 2)의 해결은 rosy님 pycharm 설정만 잘 해주면 끝납니다. .env 설정은 VScode에서 하는 설정입니다 ^^ 따라서 파이참의 경우 File --> Setting --> Project --> Project Interpreter 메뉴에 들어가면 현재 설정된 파이썬 인터프리터가 보입니다. 그거 선택 후 Show All 누르면 (사진) 이런 메뉴가 나오구요 여기서 선택된 인터프리터 마우스 우클릭하면 Show Interpreter Paths 가 나옵니다. (사진) 그걸 선택하면 아래처럼 현재 인터프리터가 PYTHONPATH로 인식하고 있는 경로들이 나와요. 여기서 + 눌러서 airflow 설치된 디렉토리의 plugins 디렉토리를 add 해주시면 됩니다. (사진) 그럼 rosy 님의 파이참 환경에서 from common 부분을 인식하지 못하는 문제는 해결 될거에요!
- 0
- 2
- 99
질문&답변
2024.04.20
xcom_pull 메서드 사용 질문
안녕하세요 방효석님! 두 번째 task에 오타가 있습니다. key=reuslt1 --> resutl1 key=reuslt2 --> result2 로 수정해서 돌려보시겠어요?
- 0
- 2
- 75
질문&답변
2024.04.20
show_templates() 함수의 키워드 아규먼트 질문
안녕하세요 방효석님! 맞습니다. 정확히는 @task 데커레이터가 넣어준다고 보시면 됩니다. @task 데커레이터가 어떻게 넣어주는지 이해하기 전에 먼저 PythonOperator를 사용할 때를 생각해봅시다. with DAG(... ) as dag: def print_context(**context): print('context start') print(context) print('context end') t1 = PythonOperator( task_id="print_context_from_python_operator", python_callable=print_context ) 이렇게 작성해서 한번 돌려보시면 @task 데커레이터를 쓰지 않아도 context 에 값이 저장되어 print 되는 것을 볼 수 있습니다. 결국 PythonOperator 가 여러 기본 변수들을 context에 넣어준다고 볼 수 있겠죠. 그럼 이제 @task 데커레이터가 어떻게 넣어주는지 알려면 소스코드를 봐야 하는데, 좀 복잡합니다. 먼저 @task 키워드를 쓸때 사용되는 task는 task_decorator_factory 함수를 리턴하는 함수입니다. def python_task( python_callable: Callable | None = None, multiple_outputs: bool | None = None, **kwargs, ) -> TaskDecorator: """ Wrap a function into an Airflow operator. Accepts kwargs for operator kwarg. Can be reused in a single DAG. :param python_callable: Function to decorate :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False. """ return task_decorator_factory( python_callable=python_callable, multiple_outputs=multiple_outputs, decorated_operator_class=_PythonDecoratedOperator, **kwargs, ) ( https://github.com/apache/airflow/blob/main/airflow/decorators/python.py ) 그리고 실습시 작성했던 아래 코드를 다시 생각해보면, @task(task_id='python_task') def show_templates(**kwargs): from pprint import pprint pprint(kwargs) show_templates() show_templates() 실행시 task 데커레이터의 outer function에 해당하는 python_task 가 실행되게 됩니다. 위에서 python_task는 task_decorator_factory 함수를 리턴한다고 말씀드렸죠? task_decorator_factory 함수는 아래처럼 생겼습니다. 좀 복잡하죠? def task_decorator_factory( python_callable: Callable | None = None, *, multiple_outputs: bool | None = None, decorated_operator_class: type[BaseOperator], **kwargs, ) -> TaskDecorator: """Generate a wrapper that wraps a function into an Airflow operator. Can be reused in a single DAG. :param python_callable: Function to decorate. :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. If set to False (default), only at most one XCom value is pushed. :param decorated_operator_class: The operator that executes the logic needed to run the python function in the correct environment. Other kwargs are directly forwarded to the underlying operator class when it's instantiated. """ if multiple_outputs is None: multiple_outputs = cast(bool, attr.NOTHING) if python_callable: decorator = _TaskDecorator( function=python_callable, multiple_outputs=multiple_outputs, operator_class=decorated_operator_class, kwargs=kwargs, ) return cast(TaskDecorator, decorator) elif python_callable is not None: raise TypeError("No args allowed while using @task, use kwargs instead") def decorator_factory(python_callable): return _TaskDecorator( function=python_callable, multiple_outputs=multiple_outputs, operator_class=decorated_operator_class, kwargs=kwargs, ) return cast(TaskDecorator, decorator_factory) (https://github.com/apache/airflow/blob/main/airflow/decorators/base.py ) 참고로 task_decorator_factory는 데커레이터를 쉽게 만들게 해주는 함수입니다. task_decorator_factory를 이용하면 custom 한 데커레이터도 만들 수 있습니다. 어쨌든 task_decorator_factory를 이해하는게 중요한데 이 함수의 파라미터 중 decorated_operator_class 파라미터는 어떤 오퍼레이터를 실행하게 할 것인지를 결정합니다. 다시 위의 python_task 함수를 살펴보면 task_decorator_factory 를 리턴할 때 파라미터 중 decorated_operator_class=_PythonDecoratedOperator 를 전달하고 있습니다. _PythonDecoratedOperator 클래스를 보면 DecoratedOperator와 PythonOperator를 다중 상속받고 있습니다. class _PythonDecoratedOperator(DecoratedOperator, PythonOperator): """ Wraps a Python callable and captures args/kwargs when called for execution. :param python_callable: A reference to an object that is callable :param op_kwargs: a dictionary of keyword arguments that will get unpacked in your function (templated) :param op_args: a list of positional arguments that will get unpacked when calling your callable (templated) :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False. """ ( https://github.com/apache/airflow/blob/main/airflow/decorators/python.py ) 쉽게 생각하면 PythonOperator + DecoratedOperator 라는 두 개의 부모를 상속한 _PythonDecoratedOperator가 정의돼있는데 우리가 @task 데커레이터를 쓰면 _PythonDecoratedOperator가 실행되는 구조입니다. 이 과정에서 결과적으로 PythonOperator를 실행하는 것과 같고 이 과정중에 context가 **kwargs에 전달된다고 보시면 됩니다. 코드가 좀 복잡한데 결국 @task 데커레이터는 PythonOperator를 실행하는 것과 같고 PythonOperator를 썼을 때도 기본 변수를 **kwargs 로 밀어넣어주고 있으므로 원리는 같다. 라고 생각하시면 됩니다.
- 0
- 2
- 72
질문&답변
2024.04.15
ubuntu 응답없음
안녕하세요 rosy님! 이미 wsl ubuntu가 설치되어 있는것 같네요 혹시 기존에 있던 버전도 삭제 가능하신가요? 가능하시다면 초기화하고 20.04 버전으로 설치해보시겠어요? 삭제하는건 아래 명령으로 가능하구요. wslconfig.exe /u Ubuntu wslconfig.exe /u Ubuntu-20.04 20.04 버전으로 설치하는 건 아래 명령어 참고 하셔서 진행해보시기 바랍니다. wsl --install -d Ubuntu-20.04
- 0
- 1
- 74
질문&답변
2024.04.08
docker daemon 실행 문제
안녕하세요 홍창기님! 음 WSL에서는 처음 본 에러네요. 일단 강의 내용과 docker 가이드상 내용이 조금 상이하긴 하지만 진행 절차상 큰 문제는 없어보입니다. 제 생각에 설치 절차상 문제인것보다, WSL에 문제가 있는 것 같습니다. 에러 로그상 봤을 때 현재 사용하시는 WSL이 2 버전이 맞는것 같은데 혹시 다시 한번 확인해보시겠어요? cmd 창에서 아래 명령을 입력하시면 버전을 볼 수 있는데 우측에 VERSION 이 2로 나와야 합니다. wsl -l -v NAME STATE VERSION * Ubuntu-20.04 Running 2 그리고 아래 WSL에 docker 설치를 위한 사전조건인데 한번 확인해보시겠어요? https://docs.docker.com/desktop/install/windows-install/#system-requirements (사진)
- 0
- 1
- 93
질문&답변
2024.04.07
제목과 영상이 다른것 같아요~
안녕하세요 장순돌님! 알려주셔서 너무 감사드립니다! 이걸 왜 이제 발견했을까요 ^^;
- 0
- 1
- 67
질문&답변
2024.04.04
docker-compose.yaml 파일 관련 질문
안녕하세요 방효석님! 결론부터 말씀드리자면 효석님이 생각하신게 맞습니다. yaml 파일 작성시 왼쪽에 들어가는 경로는 git pull 을 통해 소스가 생성되는 최종 경로라고 보시면 됩니다. 윈도우 환경에서는 그게 WSL이고 효석님의 경우 remote 디렉토리입니다. yaml 파일의 오른쪽은 도커 컨테이너가 연결될 경로이므로 윈도우 환경에서는 WSL에 있는 파일이, 효석님의 경우 remote 디렉토리에 있는 파일이 컨테이너로 연결되는 것이죠 . 이해 되셨을까요?
- 0
- 2
- 110
질문&답변
2024.04.01
mac 환경에서 실습 질문
안녕하세요 현지원님! 우선 질문하신 내용에 답변 드리자면 statground/airflow/dags 폴더에 직접 dag 을 작성하시거나 수정하셔서 작업을 해도 되긴합니다. 그런데 그러면 지원님이 dag을 새로 만들거나 operator를 만드는 과정에서 수정&save 를 반복할 때마다 기존 dag 이나 오퍼레이터가 계속 영향받게 됩니다. 사용하는 IDE 툴 설정에 따라 다르긴 하겠지만 save 를 하지 않아도 타이핑하는 내용이 즉각 저장되는 경우가 있습니다. 그러면 영향받는 정도는 더욱 심해지겠죠? 결론적으로는 지원님이 작성하신대로 하셔도 되는데, IDE 툴 설정에서 명시적으로 save 해야 저장되도록 되어 있는지 확인해보고 작업하시는게 좋겠습니다.
- 0
- 2
- 116