Airflow 로컬 환경 설치 및 테스트
ETL(Extract, Transform, Load)은 데이터 통합 및 분석의 핵심 프로세스입니다. Apache Airflow는 이러한 ETL 파이프라인을 관리하고 자동화하는 강력한 도구로 자리잡았습니다. 이 글에서는 Airflow를 활용하여 ETL 파이프라인을 설계, 구현, 운영하는 방법을 단계별로 설명하겠습니다.
1. Airflow 소개
Apache Airflow는 데이터 파이프라인을 작성, 스케줄링 및 모니터링하기 위한 오픈 소스 플랫폼입니다. Airflow의 주요 특징은 다음과 같습니다:
- 다양한 연산 지원: Python을 기반으로 DAG(Directed Acyclic Graph)를 작성하여 다양한 작업을 정의할 수 있습니다.
- 스케줄링: 원하는 시간에 작업을 자동으로 실행할 수 있습니다.
- 모니터링: 작업의 상태를 시각적으로 확인하고 관리할 수 있습니다.
- 확장성: 플러그인을 통해 다양한 데이터 소스와 통합할 수 있습니다.
2. Airflow 설치
Airflow를 설치하려면 Python과 pip이 필요합니다. 기본 설치 과정은 다음과 같습니다:
pip install apache-airflow
Airflow를 설치한 후 초기화 및 웹 서버를 실행합니다:
# 데이터베이스 초기화
airflow db init
# 사용자 생성 (선택 사항)
airflow users create \\\\
--username admin \\\\
--firstname FIRST_NAME \\\\
--lastname LAST_NAME \\\\
--role Admin \\\\
--email admin@example.com
# 웹 서버 실행
airflow webserver --port 8080
다른 터미널에서 스케줄러를 실행합니다:
airflow scheduler
3. ETL 파이프라인 설계
ETL 파이프라인을 설계할 때 각 단계는 개별 작업(task)으로 정의됩니다. 예제에서는 간단한 CSV 파일을 추출, 변환, 로드하는 파이프라인을 생성해보겠습니다.
3.1 추출 단계
데이터를 추출하는 작업을 정의합니다. 예를 들어, 원격 서버에서 CSV 파일을 다운로드하는 작업입니다.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import requests
def extract_data(**kwargs):
url = '<https://example.com/data.csv>'
response = requests.get(url)
with open('/path/to/save/data.csv', 'wb') as f:
f.write(response.content)
with DAG(dag_id='etl_pipeline', start_date=datetime(2023, 1, 1), schedule_interval='@daily') as dag:
extract_task = PythonOperator(
task_id='extract_data',
python_callable=extract_data
3.2 변환 단계
추출한 데이터를 변환하는 작업을 정의합니다. 예를 들어, 데이터를 정리하고 필요한 형식으로 변경합니다.
import pandas as pd
def transform_data(**kwargs):
df = pd.read_csv('/path/to/save/data.csv')
df['new_column'] = df['existing_column'].apply(lambda x: x * 2)
df.to_csv('/path/to/save/transformed_data.csv', index=False)
transform_task = PythonOperator(
task_id='transform_data',
python_callable=transform_data
)
3.3 로드 단계
변환한 데이터를 데이터베이스 또는 데이터 웨어하우스로 로드하는 작업을 정의합니다.
import sqlalchemy
def load_data(**kwargs):
engine = sqlalchemy.create_engine('postgresql://user:password@localhost:5432/mydatabase')
df = pd.read_csv('/path/to/save/transformed_data.csv')
df.to_sql('my_table', engine, if_exists='replace', index=False)
load_task = PythonOperator(
task_id='load_data',
python_callable=load_data
)
3.4 작업 연결
모든 작업을 DAG에 추가하고 의존성을 정의합니다.
extract_task >> transform_task >> load_task
4. ETL 파이프라인 실행 및 모니터링
Airflow 웹 UI를 통해 파이프라인의 상태를 모니터링하고 실행 결과를 확인할 수 있습니다. 각 작업의 상태(성공, 실패, 진행 중)를 직관적으로 파악할 수 있으며, 실패한 작업에 대한 로그를 확인하여 문제를 해결할 수 있습니다.
5. 결론
Apache Airflow를 활용하면 ETL 파이프라인을 효율적으로 관리하고 자동화할 수 있습니다. 이 글에서 설명한 예제는 기본적인 ETL 파이프라인을 다루었지만, 실제 환경에서는 다양한 연산과 데이터 소스를 다룰 수 있습니다. Airflow의 강력한 기능을 활용하여 복잡한 데이터 워크플로우를 쉽게 관리해보세요.
참고 자료
'Data & MarTech' 카테고리의 다른 글
[Langchain] AWS Bedrock과 Langchain 활용한 LLM 어플리케이션 개발 (1) | 2024.11.20 |
---|---|
Pandas를 이용해 데이터를 Merge하는 방법과 로깅 활용하기 (0) | 2024.05.29 |
구글시트 API(Google Sheets API) in Python 데이터 업데이트 (1) | 2023.11.16 |
구글시트 API(Google Sheets API) in Python - 데이터 쓰기 (0) | 2023.11.10 |
구글시트 API(Google Sheets API)를 사용한 데이터 조회 (1) | 2023.06.12 |