본문 바로가기

Data & MarTech

Airflow 로컬 환경 설치 및 테스트

반응형

Airflow 로컬 환경 설치 및 테스트

ETL(Extract, Transform, Load)은 데이터 통합 및 분석의 핵심 프로세스입니다. Apache Airflow는 이러한 ETL 파이프라인을 관리하고 자동화하는 강력한 도구로 자리잡았습니다. 이 글에서는 Airflow를 활용하여 ETL 파이프라인을 설계, 구현, 운영하는 방법을 단계별로 설명하겠습니다.

1. Airflow 소개

Apache Airflow는 데이터 파이프라인을 작성, 스케줄링 및 모니터링하기 위한 오픈 소스 플랫폼입니다. Airflow의 주요 특징은 다음과 같습니다:

  • 다양한 연산 지원: Python을 기반으로 DAG(Directed Acyclic Graph)를 작성하여 다양한 작업을 정의할 수 있습니다.
  • 스케줄링: 원하는 시간에 작업을 자동으로 실행할 수 있습니다.
  • 모니터링: 작업의 상태를 시각적으로 확인하고 관리할 수 있습니다.
  • 확장성: 플러그인을 통해 다양한 데이터 소스와 통합할 수 있습니다.

2. Airflow 설치

Airflow를 설치하려면 Python과 pip이 필요합니다. 기본 설치 과정은 다음과 같습니다:

pip install apache-airflow

Airflow를 설치한 후 초기화 및 웹 서버를 실행합니다:

# 데이터베이스 초기화
airflow db init

# 사용자 생성 (선택 사항)
airflow users create \\\\
    --username admin \\\\
    --firstname FIRST_NAME \\\\
    --lastname LAST_NAME \\\\
    --role Admin \\\\
    --email admin@example.com

# 웹 서버 실행
airflow webserver --port 8080

다른 터미널에서 스케줄러를 실행합니다:

airflow scheduler

3. ETL 파이프라인 설계

ETL 파이프라인을 설계할 때 각 단계는 개별 작업(task)으로 정의됩니다. 예제에서는 간단한 CSV 파일을 추출, 변환, 로드하는 파이프라인을 생성해보겠습니다.

3.1 추출 단계

데이터를 추출하는 작업을 정의합니다. 예를 들어, 원격 서버에서 CSV 파일을 다운로드하는 작업입니다.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import requests

def extract_data(**kwargs):
    url = '<https://example.com/data.csv>'
    response = requests.get(url)
    with open('/path/to/save/data.csv', 'wb') as f:
        f.write(response.content)

with DAG(dag_id='etl_pipeline', start_date=datetime(2023, 1, 1), schedule_interval='@daily') as dag:
    extract_task = PythonOperator(
        task_id='extract_data',
        python_callable=extract_data
    

3.2 변환 단계

추출한 데이터를 변환하는 작업을 정의합니다. 예를 들어, 데이터를 정리하고 필요한 형식으로 변경합니다.

import pandas as pd

def transform_data(**kwargs):
    df = pd.read_csv('/path/to/save/data.csv')
    df['new_column'] = df['existing_column'].apply(lambda x: x * 2)
    df.to_csv('/path/to/save/transformed_data.csv', index=False)

transform_task = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data
)

3.3 로드 단계

변환한 데이터를 데이터베이스 또는 데이터 웨어하우스로 로드하는 작업을 정의합니다.

import sqlalchemy

def load_data(**kwargs):
    engine = sqlalchemy.create_engine('postgresql://user:password@localhost:5432/mydatabase')
    df = pd.read_csv('/path/to/save/transformed_data.csv')
    df.to_sql('my_table', engine, if_exists='replace', index=False)

load_task = PythonOperator(
    task_id='load_data',
    python_callable=load_data
)

3.4 작업 연결

모든 작업을 DAG에 추가하고 의존성을 정의합니다.

extract_task >> transform_task >> load_task

4. ETL 파이프라인 실행 및 모니터링

Airflow 웹 UI를 통해 파이프라인의 상태를 모니터링하고 실행 결과를 확인할 수 있습니다. 각 작업의 상태(성공, 실패, 진행 중)를 직관적으로 파악할 수 있으며, 실패한 작업에 대한 로그를 확인하여 문제를 해결할 수 있습니다.

5. 결론

Apache Airflow를 활용하면 ETL 파이프라인을 효율적으로 관리하고 자동화할 수 있습니다. 이 글에서 설명한 예제는 기본적인 ETL 파이프라인을 다루었지만, 실제 환경에서는 다양한 연산과 데이터 소스를 다룰 수 있습니다. Airflow의 강력한 기능을 활용하여 복잡한 데이터 워크플로우를 쉽게 관리해보세요.

참고 자료

반응형