Workflow Scheduler의 필요성
ETL 모델의 한계 (정적 Scheduling 모델의 한계)
•
Sequential한 작업 단위에만 적용가능했다
•
데이터 양 증가 + Machine Learning 등으로 인한 데이터 활용 요구사항 증가
◦
(Machine Learning에는 Python이 많이 이용된다)
•
하나의 흐름에서 분기가 이루어지거나, 두 개 이상의 흐름에서 하나로 모이는 join 현태의 모델링이 어려움
•
Use Case
◦
데이터 분석 작업을 위해 이벤트 데이터(실시간)와 metadata를 조인해서 사용해야한다
◦
한 시간 단위 배치작업 → 이벤트 데이터 저장소와 metadata의 dump 저장소가 다르다
◦
metadata dump가 완료 돼도 에빈트 데이터의 저장소에는 이벤트 양에 따라서 데이터가 확정되는 시간이 다르다
▪
이벤트 양이 증가해서 스케쥴링된 시간에 적재가 다 되지 않은 경우
정적 타입 언어의 생산성
•
ETL에 주로 RDBMS를 사용했고, JDBC를 이용해서 프로그래밍 했다
•
자바 계열에서 멀티스레딩 프로그래밍을 이용
•
정적 타입언어로 컴파일 타임에 모든 타입 시스템이 맞아야 하므로 개발 생산성이 떨어짐
•
데이터 양이 많아지고 NoSQL이 많아졌다 → 굳이 JDBC를 쓸 필요가 없다
Airflow란?
Airflow의 주요 기능
•
python 기반의 workflow scheduler이다
•
DAG와 Task라는 단위로 복잡하고 다양하게 구성할 수 있다
•
동적인 조건으로 trigger 할 수 있다
•
Scheduler와 Worker가 나누어져 있고, Worker는 확장 가능하다
◦
동시에 수많은 workflow를 실행하고 관리할 수 있다
•
하나의 python 파일로 복잡한 workflow를 구성할 수 있다
◦
개발 시간이 단축되고 유지 관리 비용도 적다
◦
Container, Resource Management
Airflow의 장점
•
파이썬 코드로 쉽고 간단하게 Batch 작업, Workflow를 구성할 수 있다
•
파이썬 코드로 다이나믹한 파이프라인을 구성할 수 있다
•
파이썬 코드로 workflow를 작성하기 때문에 VCS를 이용해서 버전 관리를 할 수 있다
•
여러 사람들이 동시에 workflow를 개발하고, 개별 workflow 별로 동작과 설정 등을 관리할 수 있다
•
Operator로 반복 작업을 줄이고, 다양한 기술스택과 연결할 수 있다
•
Jinja template으로 parameterize할 수 있다 (동적으로 할당)
•
배치를 처리하는 인프라를 동적으로 늘리거나 줄일 수 있다
•
배치 처리 결과에 따라 동적으로 수정할 수 있다
•
직관적이고 편리한 Web UI 관리 툴로 다양한 배치작업을 쉽게 시각화하거나 디버깅 할 수 있다
Airflow의 한계
다음 기능은 하지 못한다
•
Streaming 작업
•
무한히 실행되는 작업
•
Airflow 외부 요소에 의해 Trigger 되는 Scheduling 방식
따라서 다음 작업은 Airflow에 적합하지 않다
•
지연을 허용하지 않는 작업의 스케쥴링
•
Airflow Worker 내에서 고부하 작업의 처리
Airflow Architecture & Components
•
Scheduler
◦
가장 중요한 요소
◦
DAG (workflow)를 trigger하고 Task를 Executor에 실행하도록 제출하는 역할
◦
1분에 한 번 DAG Directory를 읽어서 조건을 확인하고 Executor에 전달
•
Executor
◦
Task의 실제 실행을 관리
◦
여러 종류 중 선택할 수 있음
▪
Local Executor: 로컬에서 process를 실행시켜서 task를 실행
▪
Sequential Executor: 로컬에서 한 번에 하나만 동작
▪
Celery Executor: Celery를 이용해서 작업을 Worker에 분배하고 결과를 관리
•
직접 machine을 pooling해서 관리하는 경우 자주 사용
▪
CeleryKubernetes Executor: k8s 환경에서 Celery를 이용하는 아키텍쳐
▪
Dask Executor: Dask(python을 이용한 분산 작업 관리 시스템) 이용
▪
Kubernetes Executor: k8s 환경에서 task를 pod에 스케쥴링하는 방식 이용
•
Kubernetes를 이용하는 경우 자주 사용
▪
LocalKubernetes Executor: task를 LocalExecutor 혹은 KubernetesExecutor로 동작시킴
→ Celery Executor와 Kubernetes Executor가 양분화 하는 중
•
Web Server
◦
ui 제공
◦
Metadata Database와 DAG Directory(python files)에 접근 가능해야한다
•
DAG Directory
◦
DAG가 정의된 파이썬 파일의 위치
◦
Workers, Scheduler, Web Server 다 봐야한다
•
Metadata Database
◦
작업에 대한 모든 기록
◦
작업의 정의, 상태, 실행 정보, 로그,
◦
Workers, Scheduler, Web Server 다 봐야한다
◦
Database 버전 확인 필수
Architecture on Celery Executor
Components
•
Workers: 할당된 태스크를 실행
•
Scheduler: 실행할 태스크를 Queue에 추가
•
Web Server: DAG, task 상태를 확인하고 메타데이터 관리, 시각화, admin 명령을 할 수 있는 HTTP Server
•
Database: DAG, tsks, Variable, connection 등 메타데이터 저장하는 저장소
•
Celery: 스케쥴링하는 Queue의 방식(의 구현체)
•
Worker는 Broker를 Consume한다
•
Scheduler는 매 분마다 DAG를 읽어서 Broker에 Publish한다
Celery Components
•
Broker: 실행해야할 Command 저장
•
Result Backend: 완료된 Command 상태 저장
•
Flower: Celery Queue 및 작업 상태를 모니터링하는 WebUI
Tasks Execution Process
•
시작 시 두 개의 프로세스가 Running이어야 한다
◦
Scheduler Process
◦
Worker Process
•
인프라도 준비가 돼 있어야 한다 (Queue Broker, Result Backend)
◦
Queue Broker: Redis로 실습
◦
Result Backend: MySQL로 실습
•
Task 실행을 위해 Worker Process는 Queue에서 Job를 지정해서 Child Process를 Fork한다