Airflow

0.0_
|2022. 5. 13. 17:33

airflow는 파이썬으로 동작한다. 

 

airflow 설치 

데이터 파이프라인을 dag라고 부른다 - ETL을 파이썬 코딩으로 구현된 코드 

그 DAG는 task의 집합임. 

 

airflow는 하나의 서버로 구성되지만 job의 수가 늘어나면 서버를 더 세팅해야 함. 다수의 서버로 airflow 클러스터를 운영할 수 있음. 

airflow 클러스터 형태로 구성이 된 airflow를 쓰고 싶으면 클라우드 서버를 쓰는 것이 낫다. 

서버가 많으면 많을 수록 그 위에서 돌릴 수 있는 dag가 많아지는 것임. 

이 서버가 무슨 이슈로 안 돌아가면 airflow를 쓸 수 없어서 문제가 있을 수 있다. 

내부 DB 필요함 -> history 기록용 

sqlite에 dag 정보를 기록함 < 이걸 쓰면 개발하기 힘들고 보통 postgresql나 mysql 설치해서 씀 

 

Airflow 데이터 파이프라인을 생각해 보면 

1. 어떤 데이터 소스에 연결할 것인가 

2. 어떤 destination에 저장할 것인가 

 

3rd party services 와 연동하는 것이 쉽다. airflow 2.0 을 사용 중 

 

airflow에는 다섯 가지의 컴포넌트가 있다. 

1. Web Server

2. Scheduler : 몇 시 몇 분에 실행되어야 한다. 1시간에 한 번씩 실행해야 한다. 일요일에 실행되어야 한다 등 

3. Workder: DAG 가 실행이 된다: DAG에 포함되어 있는 Task들을 실행하는 것이 Worker다. 

4. Database (Sqlite가 기본으로 설치됨) : 성능이 좋은 postgresql나 mysql 설치하는 것이 좋다. 

5. Queue (멀티 노드 구성인 경우에만 사용됨) 

 

노드가 하나일 경우에는 worker, scheduler, webserver가 한 군데에 저장됨. 

데이터가 작을 때는 서버 한 대로 충분하다 

worker는 falsk로 구현되어 있음. 

 

Scale up: 서버를 더 좋은 사양으로 바꾸는 것 

Scale out: workder node 추가 

 

노드가 많으면 많을 수록 DAG를 사용하기 쉬움 

DAG?

데이터 파이프라인. ETL 

다수의 task 

directed acyclic graph 

airflow는 루프를 도는 사이클은 지원하지 않음 단방향으로 감 

task의 집함

 

Task

: 데이터의 오퍼레이터

postgres query, hive query, S3 Read/Write, Spark Job, Shell Script 

굉장히 범용적인 오퍼레이터들이 많고, 그걸 갖다 쓰면 된다. 

서포트도 좋음 

 

t1, t2, t3 라는 Task가 있고 t1이 끝나는 순간 t2, t3 두 개의 task 가 동시에 실행되도록 DAG를 짤 수 있다. 

 

Airflow의 장단점

ETL 관리하고, 작성을 생산성 있게 만드려고 만들어진 툴임

데이터 엔지니어들한테는 좋은 기능들임

 

대신에 러닝커브가 좀 있다. 

클러스터 노드로 들어가게 되면 관리하기가 힘들기 때문에 클라우드를 써야 함 

디버깅하고 개발하기 힘든 부분이 있음 

 

from airflow import DAG

DAG가 실패하면 연락하는 메일. 실패하면 다시 실행할 건지 유무도 있음 

중요한 파라미터들이 좀 있음. 

 

DAG object 만들어야 함 

test_dag = DAG(

"dag_v1", # DAG name 

# schedule (same as cronjob)

shcedule_interval="0 9  ***",

# 매일 아홉 시 0분에 시작한다. 

# 0 * *** 면 매일 0시에 시작한다는 뜻임 

# 특정 달, 특정일, 특정 요일에 쓸 수 있음 

# 0, 30 9 * * * 매일 30분마다 ETL

# 앞에 DAG가 끝나면 트리거를 줄 수 있음 앞의 데이터가 뭘 받아야 실행되면 upstream 모듈이 끝나면 나를 트리거 해라 라는 식으로 부를 수 있음. 

#timezone을 서울로 바꿔야 함 

 

Operators Creation Example 

t1이 끝나면 t2, t3가 동시에 작동되게 하고 싶음 

t1 >> t2

t1 >> t3 

bashoperator 

 이 태스크가 실행이 되면 리눅스 커맨드 라인에서 실행되게 함 

  task 라는 것은 오퍼레이터를 써서 코드를 만드는 것임 

t1 >> [t2, t3]

t2.set_upstream(t1)

t3.set_upstream(t2)

 

start = DummyOperator (아무의미도 없음. 시작 끝 구분하려고 그러는 거) 

 

start >> t1 >> end

start >> t2 >> end 

start >> [t1, t2] >> end

 

데이터 파이프라인을 만들 때 고려할 점 

이상과 현실간의 괴리 

데이터 파이프라인은 많은 이유로 실패함

- 버그, 데이터 소스상의 이슈, 데이터 파이프라인들간의 의존도에 이해도 부족

데이터 파이프라인의 수가 늘어나면 유지보수 비용이 기하급수적으로 늘어남 

 

가장 좋은 방법

(1)

가능하면 데이터가 작을 경우 매번 통채로 복사해서 테이블 만들기 (Full Refresh)

Incremental update만 가능하다면, 대상 데이터소스가 갖춰야 할 몇 가지 조건이 있음 (바뀐 부분만 업데이트하기)

- 데이터 소스가 프로덕션 데이터베이스 테이블이라면 다음 필드가 필요함

  • created (데이터 업데이트 관점에서 필요하지는 않음) 
  • modified
  • deleted 

데이터 소스가 API라면 특정 날짜를 기준으로 새로 생성되거나 업데이트된 레코드들을 읽어올 수 있어야 함 

 

(2)

 멱등성(Idempotency)

동일한 입력데이터로 파이프라인을 한 번 실행하나 백 번 실행하나 내용이 동일해야 한다 

백 번 실행하면 중복이 백 개가 생기는지. 

input이 같은데 output이 다르면 안 된다. 

 

(3)

실패한 데이터 파이프라인을 재실행이 쉬어야 함 

과거 데이터를 다시 채우는 과정(Backfill)이 쉬어야 함 - 이미 읽어온 데이터가 있는데 포맷 데이터건 뭐건 다시 읽어오는 거 

Airflow는 이 부분(특히 backfill)에 강점을 갖고 있음 

  • DAG의 catchup 파라미터가 True가 되어야 하고 start_Date end_date이 적절하게 설정되어야 함 
  • 대상 테이블이 incremental update가 되는 경우에만 의미가 있음 
    • execution_date 파라미터를 사용해서 업데이트되는 날짜 혹은 시간을 알아내게 코드를 작성해야 함 
      • 현재 시간을 기준으로 업데이트 대상을 선택하는 것은 안티 패턴. 

 

(4) 

데이터 파이프라인의 수가 늘어나면 뭐가 필요한지 뭐가 안 필요한지 버리는 데이터가 뭔지 분석하는 게 중요함 

주기적으로 안 쓰는 데이터 파이프라인, 대쉬보드를 클린업하는 작업이 필요함 

 

(5)

데이터 파이프라인 사고시마다 사고 리포트 쓰기 

중요 데이터 파이프라인의 입력과 출력을 체크하기 

  • 아주 간단하게 입력 레코드의 수와 출력 레코드의 수가 몇 개인지 체크하는 것부터 시작 
  • 써머리 테이블을 만들어내고 primary key 가 존재한다면 Primary key uniqueness가 보장되는지 체크하는 것이 필요함
  • 중복 레코드 체크 

Daily Incremental Update를 구현해야 한다면? 

  • 하루에 한 번씩 돌면서 update하는 거. full refresh 하면 편한데 아닐 경우
  • 2020년 11월 7일부터 매일매일 하루치 데이터를 읽어온다고 가정해 보자 
  • 이 경우 언제부터 해당 ETL이 동작해야 하나?
    • 2020년 11월 8일 
  • 2020년 11월일날 동작하지만 읽어와야하는 데이터의 날짜는?
    • 2020년 11월 7일
    • Airflow의 start_date는 시작 날짜라기 보다는 읽어 와야 하는 날짜임 (incremental update 잡인 경우) 

Incremental하게 1년치 데이터를 Backfill 해야한다면?

  • 어떻게 ETL을 구현해놓으면 이런 일이 편해질까?
  • 해결방법 1
    • 기존 ETL 코드를 조금 수정해서 지난 1년치 데이터에 대해 돌린다
    • 실수하기 쉽고 수정하는데 시간이 걸림
  • 해결방법 2 
    • 시스템적으로 이걸 쉽게 해주는 방법을 구현한다
    • 날짜를 지정해주면 그 기간에 대한 데이터를 다시 읽어온다
      • Airflow의 접근방식
        • 모든 DAG 실행에는 “execution_date” 이 지정되어 있음
        • execution_date으로  채워야하는 날짜와 시간이 넘어옴
        • 이를 바탕으로 데이터를 갱신하도록 코드를 작성해야함

execution_date를 결정하게 코드를 짜면 좋다.