Search
🌬️

[Airflow] 기본 개념과 아키텍쳐

태그
날짜
2024/02/26
4 more properties

Workflow Scheduler의 필요성

ETL 모델의 한계 (정적 Scheduling 모델의 한계)

Sequential한 작업 단위에만 적용가능했다
데이터 양 증가 + Machine Learning 등으로 인한 데이터 활용 요구사항 증가
(Machine Learning에는 Python이 많이 이용된다)
하나의 흐름에서 분기가 이루어지거나, 두 개 이상의 흐름에서 하나로 모이는 join 현태의 모델링이 어려움
Use Case
데이터 분석 작업을 위해 이벤트 데이터(실시간)와 metadata를 조인해서 사용해야한다
한 시간 단위 배치작업 → 이벤트 데이터 저장소와 metadata의 dump 저장소가 다르다
metadata dump가 완료 돼도 에빈트 데이터의 저장소에는 이벤트 양에 따라서 데이터가 확정되는 시간이 다르다
이벤트 양이 증가해서 스케쥴링된 시간에 적재가 다 되지 않은 경우

정적 타입 언어의 생산성

ETL에 주로 RDBMS를 사용했고, JDBC를 이용해서 프로그래밍 했다
자바 계열에서 멀티스레딩 프로그래밍을 이용
정적 타입언어로 컴파일 타임에 모든 타입 시스템이 맞아야 하므로 개발 생산성이 떨어짐
데이터 양이 많아지고 NoSQL이 많아졌다 → 굳이 JDBC를 쓸 필요가 없다

Airflow란?

Airflow의 주요 기능

python 기반의 workflow scheduler이다
DAG와 Task라는 단위로 복잡하고 다양하게 구성할 수 있다
동적인 조건으로 trigger 할 수 있다
Scheduler와 Worker가 나누어져 있고, Worker는 확장 가능하다
동시에 수많은 workflow를 실행하고 관리할 수 있다
하나의 python 파일로 복잡한 workflow를 구성할 수 있다
개발 시간이 단축되고 유지 관리 비용도 적다
Container, Resource Management

Airflow의 장점

파이썬 코드로 쉽고 간단하게 Batch 작업, Workflow를 구성할 수 있다
파이썬 코드로 다이나믹한 파이프라인을 구성할 수 있다
파이썬 코드로 workflow를 작성하기 때문에 VCS를 이용해서 버전 관리를 할 수 있다
여러 사람들이 동시에 workflow를 개발하고, 개별 workflow 별로 동작과 설정 등을 관리할 수 있다
Operator로 반복 작업을 줄이고, 다양한 기술스택과 연결할 수 있다
Jinja template으로 parameterize할 수 있다 (동적으로 할당)
배치를 처리하는 인프라를 동적으로 늘리거나 줄일 수 있다
배치 처리 결과에 따라 동적으로 수정할 수 있다
직관적이고 편리한 Web UI 관리 툴로 다양한 배치작업을 쉽게 시각화하거나 디버깅 할 수 있다

Airflow의 한계

다음 기능은 하지 못한다
Streaming 작업
무한히 실행되는 작업
Airflow 외부 요소에 의해 Trigger 되는 Scheduling 방식
따라서 다음 작업은 Airflow에 적합하지 않다
지연을 허용하지 않는 작업의 스케쥴링
Airflow Worker 내에서 고부하 작업의 처리

Airflow Architecture & Components

Scheduler
가장 중요한 요소
DAG (workflow)를 trigger하고 Task를 Executor에 실행하도록 제출하는 역할
1분에 한 번 DAG Directory를 읽어서 조건을 확인하고 Executor에 전달
Executor
Task의 실제 실행을 관리
여러 종류 중 선택할 수 있음
Local Executor: 로컬에서 process를 실행시켜서 task를 실행
Sequential Executor: 로컬에서 한 번에 하나만 동작
Celery Executor: Celery를 이용해서 작업을 Worker에 분배하고 결과를 관리
직접 machine을 pooling해서 관리하는 경우 자주 사용
CeleryKubernetes Executor: k8s 환경에서 Celery를 이용하는 아키텍쳐
Dask Executor: Dask(python을 이용한 분산 작업 관리 시스템) 이용
Kubernetes Executor: k8s 환경에서 task를 pod에 스케쥴링하는 방식 이용
Kubernetes를 이용하는 경우 자주 사용
LocalKubernetes Executor: task를 LocalExecutor 혹은 KubernetesExecutor로 동작시킴
→ Celery Executor와 Kubernetes Executor가 양분화 하는 중
Web Server
ui 제공
Metadata Database와 DAG Directory(python files)에 접근 가능해야한다
DAG Directory
DAG가 정의된 파이썬 파일의 위치
Workers, Scheduler, Web Server 다 봐야한다
Metadata Database
작업에 대한 모든 기록
작업의 정의, 상태, 실행 정보, 로그,
Workers, Scheduler, Web Server 다 봐야한다
Database 버전 확인 필수

Architecture on Celery Executor

Components

Workers: 할당된 태스크를 실행
Scheduler: 실행할 태스크를 Queue에 추가
Web Server: DAG, task 상태를 확인하고 메타데이터 관리, 시각화, admin 명령을 할 수 있는 HTTP Server
Database: DAG, tsks, Variable, connection 등 메타데이터 저장하는 저장소
Celery: 스케쥴링하는 Queue의 방식(의 구현체)
Worker는 Broker를 Consume한다
Scheduler는 매 분마다 DAG를 읽어서 Broker에 Publish한다

Celery Components

Broker: 실행해야할 Command 저장
Result Backend: 완료된 Command 상태 저장
Flower: Celery Queue 및 작업 상태를 모니터링하는 WebUI

Tasks Execution Process

시작 시 두 개의 프로세스가 Running이어야 한다
Scheduler Process
Worker Process
인프라도 준비가 돼 있어야 한다 (Queue Broker, Result Backend)
Queue Broker: Redis로 실습
Result Backend: MySQL로 실습
Task 실행을 위해 Worker Process는 Queue에서 Job를 지정해서 Child Process를 Fork한다