소개
안녕하세요
LG CNS 데이터 엔지니어 김현진 입니다.
정보관리기술사를 취득한 이후 지금까지 얻은 지식을 많은 사람들에게 공유하고,
특히 데이터 엔지니어를 희망하고 공부하고 싶은 분 들에게 도움이 되고자 컨텐츠를 제작하고 있습니다.
첫 컨텐츠는 Airflow 마스터 클래스로써
데이터 파이프라인을 만들고 관리할 수 있는 핵심 도구를 초보자도 이해하기 쉽게 만들었습니다.
만나서 반갑습니다 ^^
강의
전체1수강평
- 도움이 많이 됩니다~
최일주
2024.04.24
1
- 유익한 강의에요
jyjy7
2024.03.30
1
- 강의 최고!!
JinHo Kim
2024.01.18
1
게시글
질문&답변
2024.05.01
강의 내용 정리
안녕하세요 이한희님! 네! 물론 괜찮습니다. 도움이 많이 되셨다니 다행입니다 ^^
- 0
- 1
- 49
질문&답변
2024.04.24
파이참에서 외부 파이썬 함수 수행하기
안녕하세요 rosy님 우선 구분해야할 것이, 1) airflow가 common을 인식하지 못한 것은 2) rosy님 로컬에 설치한 파이참에서 from common 부분을 인식하지 못해 빨간줄 그어진 것과 서로 다른 문제입니다. 우선 1)의 원인 먼저 확인을 해볼까요? 우선 ModuleNotFoundError를 내뱉은 것은 정확히는 scheduler 가 내뱉은 에러이고, scheduler 컨테이너가 plugins/common/common_ func.py 를 잘 인식하고 있는지 확인을 해봐야 합니다. WSL에서 스케줄러 컨테이너 안으로 진입한 후 plugins/common/common_ func.py 파일이 잘 있는지부터 확인해볼께요 sudo docker ps sudo docker exec -it {scheduler 컨테이너ID} bash 위 명령으로 스케줄러 컨테이너로 진입한 후 $ cd /opt/airflow/plugins/common $ ls 이렇게 해서 common_ func.py 가 있는지 확인해보세요. 아마 없을겁니다. 그렇다면 docker-compose.yaml 파일에서, 볼륨 연결이 잘 안되었을 가능성이 있습니다. Volumns 항목이 아래처럼 잘 작성되었는지 확인해보세요. volumes: - ${AIRFLOW_PROJ_DIR:-.}/airflow/dags:/opt/airflow/dags - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs - ${AIRFLOW_PROJ_DIR:-.}/airflow/plugins:/opt/airflow/plugins - ${AIRFLOW_PROJ_DIR:-.}/airflow/files:/opt/airflow/files 다음으로 2)의 문제를 볼까요? 1)과 2)가 서로 다른 문제인 이유는, 위에서 설명드린 것처럼 Airflow가 파일을 못 찾은 것은 컨테이너 안에서 일어난 일입니다. 하지만 2)는 rosy 님의 로컬 컴퓨터에서 일어난 일이죠. 따라서 2)의 해결은 rosy님 pycharm 설정만 잘 해주면 끝납니다. .env 설정은 VScode에서 하는 설정입니다 ^^ 따라서 파이참의 경우 File --> Setting --> Project --> Project Interpreter 메뉴에 들어가면 현재 설정된 파이썬 인터프리터가 보입니다. 그거 선택 후 Show All 누르면 (사진) 이런 메뉴가 나오구요 여기서 선택된 인터프리터 마우스 우클릭하면 Show Interpreter Paths 가 나옵니다. (사진) 그걸 선택하면 아래처럼 현재 인터프리터가 PYTHONPATH로 인식하고 있는 경로들이 나와요. 여기서 + 눌러서 airflow 설치된 디렉토리의 plugins 디렉토리를 add 해주시면 됩니다. (사진) 그럼 rosy 님의 파이참 환경에서 from common 부분을 인식하지 못하는 문제는 해결 될거에요!
- 0
- 2
- 92
질문&답변
2024.04.20
xcom_pull 메서드 사용 질문
안녕하세요 방효석님! 두 번째 task에 오타가 있습니다. key=reuslt1 --> resutl1 key=reuslt2 --> result2 로 수정해서 돌려보시겠어요?
- 0
- 2
- 71
질문&답변
2024.04.20
show_templates() 함수의 키워드 아규먼트 질문
안녕하세요 방효석님! 맞습니다. 정확히는 @task 데커레이터가 넣어준다고 보시면 됩니다. @task 데커레이터가 어떻게 넣어주는지 이해하기 전에 먼저 PythonOperator를 사용할 때를 생각해봅시다. with DAG(... ) as dag: def print_context(**context): print('context start') print(context) print('context end') t1 = PythonOperator( task_id="print_context_from_python_operator", python_callable=print_context ) 이렇게 작성해서 한번 돌려보시면 @task 데커레이터를 쓰지 않아도 context 에 값이 저장되어 print 되는 것을 볼 수 있습니다. 결국 PythonOperator 가 여러 기본 변수들을 context에 넣어준다고 볼 수 있겠죠. 그럼 이제 @task 데커레이터가 어떻게 넣어주는지 알려면 소스코드를 봐야 하는데, 좀 복잡합니다. 먼저 @task 키워드를 쓸때 사용되는 task는 task_decorator_factory 함수를 리턴하는 함수입니다. def python_task( python_callable: Callable | None = None, multiple_outputs: bool | None = None, **kwargs, ) -> TaskDecorator: """ Wrap a function into an Airflow operator. Accepts kwargs for operator kwarg. Can be reused in a single DAG. :param python_callable: Function to decorate :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False. """ return task_decorator_factory( python_callable=python_callable, multiple_outputs=multiple_outputs, decorated_operator_class=_PythonDecoratedOperator, **kwargs, ) ( https://github.com/apache/airflow/blob/main/airflow/decorators/python.py ) 그리고 실습시 작성했던 아래 코드를 다시 생각해보면, @task(task_id='python_task') def show_templates(**kwargs): from pprint import pprint pprint(kwargs) show_templates() show_templates() 실행시 task 데커레이터의 outer function에 해당하는 python_task 가 실행되게 됩니다. 위에서 python_task는 task_decorator_factory 함수를 리턴한다고 말씀드렸죠? task_decorator_factory 함수는 아래처럼 생겼습니다. 좀 복잡하죠? def task_decorator_factory( python_callable: Callable | None = None, *, multiple_outputs: bool | None = None, decorated_operator_class: type[BaseOperator], **kwargs, ) -> TaskDecorator: """Generate a wrapper that wraps a function into an Airflow operator. Can be reused in a single DAG. :param python_callable: Function to decorate. :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. If set to False (default), only at most one XCom value is pushed. :param decorated_operator_class: The operator that executes the logic needed to run the python function in the correct environment. Other kwargs are directly forwarded to the underlying operator class when it's instantiated. """ if multiple_outputs is None: multiple_outputs = cast(bool, attr.NOTHING) if python_callable: decorator = _TaskDecorator( function=python_callable, multiple_outputs=multiple_outputs, operator_class=decorated_operator_class, kwargs=kwargs, ) return cast(TaskDecorator, decorator) elif python_callable is not None: raise TypeError("No args allowed while using @task, use kwargs instead") def decorator_factory(python_callable): return _TaskDecorator( function=python_callable, multiple_outputs=multiple_outputs, operator_class=decorated_operator_class, kwargs=kwargs, ) return cast(TaskDecorator, decorator_factory) (https://github.com/apache/airflow/blob/main/airflow/decorators/base.py ) 참고로 task_decorator_factory는 데커레이터를 쉽게 만들게 해주는 함수입니다. task_decorator_factory를 이용하면 custom 한 데커레이터도 만들 수 있습니다. 어쨌든 task_decorator_factory를 이해하는게 중요한데 이 함수의 파라미터 중 decorated_operator_class 파라미터는 어떤 오퍼레이터를 실행하게 할 것인지를 결정합니다. 다시 위의 python_task 함수를 살펴보면 task_decorator_factory 를 리턴할 때 파라미터 중 decorated_operator_class=_PythonDecoratedOperator 를 전달하고 있습니다. _PythonDecoratedOperator 클래스를 보면 DecoratedOperator와 PythonOperator를 다중 상속받고 있습니다. class _PythonDecoratedOperator(DecoratedOperator, PythonOperator): """ Wraps a Python callable and captures args/kwargs when called for execution. :param python_callable: A reference to an object that is callable :param op_kwargs: a dictionary of keyword arguments that will get unpacked in your function (templated) :param op_args: a list of positional arguments that will get unpacked when calling your callable (templated) :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False. """ ( https://github.com/apache/airflow/blob/main/airflow/decorators/python.py ) 쉽게 생각하면 PythonOperator + DecoratedOperator 라는 두 개의 부모를 상속한 _PythonDecoratedOperator가 정의돼있는데 우리가 @task 데커레이터를 쓰면 _PythonDecoratedOperator가 실행되는 구조입니다. 이 과정에서 결과적으로 PythonOperator를 실행하는 것과 같고 이 과정중에 context가 **kwargs에 전달된다고 보시면 됩니다. 코드가 좀 복잡한데 결국 @task 데커레이터는 PythonOperator를 실행하는 것과 같고 PythonOperator를 썼을 때도 기본 변수를 **kwargs 로 밀어넣어주고 있으므로 원리는 같다. 라고 생각하시면 됩니다.
- 0
- 2
- 68
질문&답변
2024.04.15
ubuntu 응답없음
안녕하세요 rosy님! 이미 wsl ubuntu가 설치되어 있는것 같네요 혹시 기존에 있던 버전도 삭제 가능하신가요? 가능하시다면 초기화하고 20.04 버전으로 설치해보시겠어요? 삭제하는건 아래 명령으로 가능하구요. wslconfig.exe /u Ubuntu wslconfig.exe /u Ubuntu-20.04 20.04 버전으로 설치하는 건 아래 명령어 참고 하셔서 진행해보시기 바랍니다. wsl --install -d Ubuntu-20.04
- 0
- 1
- 66