inflearn logo
강의

강의

N
챌린지

챌린지

멘토링

멘토링

N
클립

클립

로드맵

로드맵

지식공유

Airflow 마스터 클래스

task 동시성 제한 및 중복 호출 방지

해결된 질문

230

qkswl6253

작성한 질문수 4

0

안녕하세요, 수업을 대부분 수강하고 실제 현업에서 사용중에 있는데 문의사항이 있어서 질문 드립니다.

 

현재 상황은 이렇습니다.

DAG 구성

- 5분단위 스케줄링
-4개의 task
- task1 >> task2 >> task3 >> task4
- 각 task 별 timeout =5분

 

문제 상황은 task 2번이 한달에 한번씩 data 가 많아지면 5분까지 타임아웃이 걸릴때가 있는 것인데요,

이때 그다음 Dag run 이 수행되면서 task 2 번이 동시에 수행 되는 시간이 조금 있는데 그때 데이터 처리가 중복으로 처리되는 현상이 발생하게 됩니다. 그래서 가능하면 task2 을 동시에 돌리는걸 막고 싶었는데요,

처음 생각해낸 방법은 task_concurrency 옵션을 task 에 주어서 1개만 돌수 있게 바꾸고 timeout 을 조금더 넉넉하게 주려고 했으나, 만에하나 해당 task 가 10분이상 걸린다면 dag run 이 수행되고있는것 제외 2개가 더 웨이팅을 하는것이 되고, 이게 누적이 될수도 있을것으로 보여서 문제로 인지 했습니다.

서비스 적으로 5분내에 돌수 있게 하거나, 아니면 5분 스케줄링을 변경하는 방법을 고려해야 하지만 해당 고려 없이 혹시 airflow 단에서 할수 있는 작업이 있을까요?

ex . runninng 중인 task 와 대기중인 task 가 하나정도 있다면 해당 task 는 스킵하는 옵션 등입니다..

 

 

python 데이터-엔지니어링 airflow

답변 1

0

김현진

안녕하세요 qkswl6253님

현업에서 잘 사용중이신거 같아 저도 기분이 좋습니다 ^^

우선 위 같은 상황에서 일반적으로 task에 max_active_tis_per_dag=1 옵션을 줄 수 있습니다. 참고로 언급하신 task_concurrency 옵션은 향후 deprecated될 예정이고 사실 max_activive_tis_per_dag 옵션과 같습니다.

그런데 이 옵션을 적용하면 말씀하신 바와 같이 다른 DagRun의 task는 모두 queue에 대기하게 됩니다. 동시에 꼭 하나의 task만 수행되도록 해야한다면 제 생각에 branch operator를 사용해보는게 어떨까 합니다.

1. BranchPythonOperator
2. @task.branch
3. BranchOperator 상속하여 재정의

branch operator 만드는 방법은 위 3가지 중 하나를 이용하고요, 이 branch operator를 이용해 만든 task를 task_branch라 해볼께요.

task_branch의 로직을 어떻게 작성하냐가 어려운데, 방법은 여러 가지가 있지만

task2가 시작/종료되면 시작되었다는 정보를 어딘가에 저장해주면 좋을 듯 합니다. (variable에 특정 key/value로 저장해줘도 좋습니다, 또는 특정 공유 디렉토리에 파일로 기입해도 좋습니다)

그리고 task_branch는 그 정보를 참고해서 task2가 아직 수행중인지 여부를 판단한 후 그 결과에 따라 task branch는 아래처럼 로직을 분기해줄 수 있습니다.

if 'task_2가 미수행중이면':
    return 'task_2'
else:
    return 'task_empty'

 

그리고 EmptyOperator를 하나 이용한다면 (task_empty라 명명) task 연결을 아래처럼 해줄 수 있습니다.

 

task1 >> task_branch >> [task2, task_empty] >> task3 >> task4

다만 task3의 trigger_rule='one_success'로 주어야 합니다. 그래야 task2가 skip되든, task_empty가 skip되든 task3가 수행됩니다. 좀 복잡하긴 하지만 이렇게 작성하면 DagRun의 task2 수행여부에 따라 분기처리가 가능할 듯 합니다.

 

잘 해결되면 좋겠네요 ^^

pykrx 회원제 전환으로 인한 실습 불가

0

114

2

수료증 발급

0

84

3

에러 발생 관련 질문드립니다.

0

83

2

vscode 작업화면에 오류가 발생하지 않습니다.

0

69

2

plugins 폴더 생성

0

76

2

WSL에서 git push 가 안되요 ㅠ

0

115

2

chatGPT&Airflow로 블로그 자동 포스팅하기 는 Deprecated 가 필요합니다.

0

67

2

github에 회원가입이 안되는데 원인이 뭔지 모르겠어요 ㅠ

0

738

2

설치중인데 venv를 꼭 써야할까요?

0

126

2

설치 버전 관련 질문입니다.

0

68

1

우분투 버전 다운받아야하는데 어떤걸로 설치해야할까요?

0

62

1

DAG 만들기 중 airflow 패키지 로드 에러

0

106

2

3.0에서도 수업노트가 성립하는지 확인 부탁드립니다!

0

95

3

task_id 사용법이 뭔가 바뀐 것 같습니다.

0

76

3

email operator 오류 관련

0

66

2

plugins 폴더 관련

0

100

2

bash operator 관련 문의입니다

0

78

3

스케쥴러 - DAG 파싱 부하 줄이는 과정 질문

0

79

2

Dags refresh 주기 관련 질문

0

125

2

wsl 관련 질문입니다.!

0

75

2

macOS에서 docker 설치

0

94

2

템플릿 변수에 대한 오류

0

61

2

custom_image 디렉토리 문의드립니다.

0

54

2

ETL 인터뷰 관련 문의

0

103

2