Apache Airflow와 Amazon-S3를 사용한 End-to-End ETL 파이프라인 구축 하는 방법

Apache Airflow와 Amazon-S3를 사용한 End-to-End ETL 파이프라인 구축 하는 방법
TILPosted On Jul 9, 20249 min read

프로젝트 개요

이 프로젝트는 Apache Airflow와 Amazon S3를 사용하여 end-to-end ETL (추출, 변환, 로드) 파이프라인을 개발하는 데 중점을 둡니다.

이 파이프라인은 OpenWeather API에서 날씨 데이터를 검색하여 구조화된 형식으로 변환하고 S3 버킷에로드합니다. 이 프로젝트를 완료하면 희망하는 빈도로 예약된 파이프라인을 실행할 수 있는 완전히 기능하는 파이프라인을 보유하게 됩니다.

프로젝트 구조

  • 날씨 데이터 가져오기: OpenWeather API에서 날씨 데이터를 가져옵니다.
  • 날씨 데이터 변환: API에서 가져온 데이터는 JSON 형식이며, 변환 작업은 JSON 개체에서 데이터프레임을 만들고 데이터프레임을 CSV 파일로 변환하는 작업을 포함합니다.
  • S3에 데이터로드: 변환된 데이터를 S3 버킷에 저장합니다.

사전 준비 및 사용된 도구

  • Apache Airflow: 워크플로우를 프로그래밍 방식으로 작성, 예약, 모니터링할 수 있는 강력하고 유연한 플랫폼입니다.
  • OpenWeather API: 여러 도시의 날씨 데이터를 제공하는 서비스입니다.
  • Amazon S3: Amazon Web Services (AWS)의 확장 가능한 객체 스토리지 서비스입니다.
  • Pandas: 데이터 조작 및 분석을 위해 사용되는 Python 라이브러리입니다.
  • Boto3: Python 개발자가 S3와 같은 Amazon 서비스를 활용하는 소프트웨어를 작성할 수 있게 해주는 AWS SDK for Python입니다.
  • DAG 파일: Apache Airflow에서 작업 및 종속성을 정의하는 작업열로서 사용되는 중요한 개념인 Directed Acyclic Graph(DAG) 파일입니다.

구현

  • API와 연결하여 데이터를 가져오는 DAG 스크립트를 작성하세요. 데이터는 데이터프레임에 저장됩니다. DAG 코드는 이 페이지의 맨 아래에서 찾을 수 있습니다.
  • EC2 인스턴스를 생성하고 인스턴스를 시작하여 콘솔에 연결하세요. 저는 무료티어 AWS를 사용하여 이 인스턴스를 생성했습니다. 사양은 t2.micro 및 우분투 22 버전입니다.

이미지

인스턴스가 실행되면 콘솔에 연결하여 다음을 설치하세요.

sudo apt-get update
sudo install python3-pip
sudo pip install requests pandas boto3 s3fs pyarrow apache-airflow

  1. 한 번 설치되면, Airflow가 올바르게 설치되었는지 확인하세요. airflow 명령어를 사용하여 확인하고 초기 로그인 자격 증명을 위해 스탠드얼론 명령어를 실행하십시오. 자격 증명을 복사하여 나중에 사용하세요.
airflow
airflow standalone
  1. 실행 중인 인스턴스에서 보안으로 이동하여 보안 그룹에 액세스하세요. 인바운드 규칙을 편집하고 새 역할을 생성하세요. "모든 트래픽", "IPv4 어디서나"로 설정하세요.

  2. Airflow 서버와 스케줄러를 시작하세요.

에어플로우 스케줄러 및 에어플로우 웹서버 - 포트 8080

6. 공개 IP 주소를 복사하고 포트를 추가하세요. 예: 172.31.22.254:8080. 이로써 에어플로우 어플리케이션을 열 수 있습니다. 기본 자격 증명을 사용하여 로그인하고 마음에 드는 비밀번호로 재설정하세요.

7. AWS에서 데이터를 저장할 S3 버킷을 만드세요. IAM 역할을 사용하여 권한을 조정하세요. 새 IAM 역할을 만들고 S3EC2에 권한을 부여하세요.

8. DAG 파일에 관련 있는 S3 버킷 이름을 추가하고 저장하세요.

  1. 인스턴스 콘솔에서 서버를 중지하고 명령을 실행하세요. 이렇게 하면 airflow의 DAGs 폴더에 액세스할 수 있어요. 원하는 경우 DAG 파일을 추가하고 필요할 때 수정할 수 있어요.
airflow
cd airflow
ls
sudo nano airflow.cfg

DAGs 폴더에서 파일 이름을 조정하세요. 수정된 버퍼를 저장하고 종료하세요.

  1. 파일을 저장한 후 airflow 명령을 다시 실행하고 로그인하세요. 그러면 airflow 내에서 Dag 파일을 볼 수 있어요. 보이는 형태는 이렇습니다:

아래는 Markdown 형식으로 변경된 테이블입니다.

  1. 파일을 열어서 수동으로 실행할 수 있어야 합니다. Airflow의 내장 그래프 기능을 사용하여 DAG 파일의 상태를 모니터링할 수 있습니다. 초록 테두리는 성공적인 실행을 나타냅니다.

  1. 실행이 성공하면 데이터가 S3 버킷에 표시됩니다.

Airflow-S3

DAG 파일과 설명:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import requests
import pandas as pd
import boto3
from io import StringIO

# Configuration
API_KEY = 'xxxxxxxxxxxxxxxxxxxxxxxxxxx'  # OpenWeatherAPICITY = 'Arizona'  # 날씨 데이터를 가져올 도시
S3_BUCKET = 'open-weather-s3-bucket'  # 데이터가 저장될 S3 버킷 이름
S3_KEY = 'weather_data/weather.csv'  # S3 오브젝트 키 (버킷 내 파일 경로)

# DAG를 위한 기본 인수
default_args = {
    'owner': 'airflow',  # DAG 소유자
    'depends_on_past': False,  # 작업 인스턴스는 과거 실행에 의존하지 않음
    'start_date': datetime(2024, 7, 5),  # DAG 시작 날짜
    'email_on_failure': False,  # 실패 시 이메일 알림 비활성화
    'email_on_retry': False,  # 재시도 시 이메일 알림 비활성화
    'retries': 1,  # 실패 시 재시도 횟수
    'retry_delay': timedelta(minutes=5),  # 재시도 간의 지연
}

# 스케줄러가 없는 DAG 정의
dag = DAG(
    'OpenWeather_to_s3',
    default_args=default_args,
    description='날씨 데이터를 가져와 변환한 후 S3로 로드합니다.',
    schedule_interval=None,  # schedule_interval을 None으로 설정하여 스케줄러 비활성화
)

def fetch_weather_data():
    """OpenWeather API에서 날씨 데이터 가져오기"""
    url = f"http://api.openweathermap.org/data/2.5/weather?q={CITY}&appid={API_KEY}"  # 도시와 API 키를 포함한 API 엔드포인트
    response = requests.get(url)  # APIGET 요청 보내기
    data = response.json()  # 응답을 JSON으로 변환
    return data  # 데이터 반환

def transform_weather_data(**kwargs):
    """가져온 날씨 데이터 변환하기"""
    ti = kwargs['ti']  # 작업 인스턴스 가져오기
    data = ti.xcom_pull(task_ids='fetch_weather_data')  # XCom을 사용하여 'fetch_weather_data' 작업에서 데이터 가져오기

    weather = {
        'city': data['name'],  # 도시 이름 추출
        'temperature': data['main']['temp'],  # 온도 추출
        'pressure': data['main']['pressure'],  # 기압 추출
        'humidity': data['main']['humidity'],  # 습도 추출
        'weather': data['weather'][0]['description'],  # 날씨 설명 추출
        'wind_speed': data['wind']['speed'],  # 풍속 추출
        'date': datetime.utcfromtimestamp(data['dt']).strftime('%Y-%m-%d %H:%M:%S')  # 타임스탬프를 읽기 가능한 날짜로 변환
    }

    df = pd.DataFrame([weather])  # 날씨 데이터를 판다스 DataFrame으로 변환
    return df  # DataFrame 반환

def load_data_to_s3(**kwargs):
    """변환된 데이터를 S3 버킷에 로드하기"""
    ti = kwargs['ti']  # 작업 인스턴스 가져오기
    df = ti.xcom_pull(task_ids='transform_weather_data')  # XCom을 사용하여 'transform_weather_data' 작업에서 변환된 데이터 가져오기

    csv_buffer = StringIO()  # 인메모리 버퍼 생성
    df.to_csv(csv_buffer, index=False)  # DataFrameCSV로 버퍼에 작성
    print(df)  # DataFrame 출력 (선택 사항)

    s3_resource = boto3.resource('s3')  # boto3 S3 리소스 생성
    s3_resource.Object(S3_BUCKET, S3_KEY).put(Body=csv_buffer.getvalue())  # CSV 데이터를 지정한 S3 버킷 및 키에 업로드

# PythonOperator를 사용하여 작업 정의
fetch_task = PythonOperator(
    task_id='fetch_weather_data',  # 작업 ID
    python_callable=fetch_weather_data,  # 호출 가능한 함수
    dag=dag,  # 작업이 속한 DAG
)

transform_task = PythonOperator(
    task_id='transform_weather_data',  # 작업 ID
    python_callable=transform_weather_data,  # 호출 가능한 함수
    provide_context=True,  # 호출 가능한 함수에 컨텍스트 제공
    dag=dag,  # 작업이 속한 DAG
)

load_task = PythonOperator(
    task_id='load_data_to_s3',  # 작업 ID
    python_callable=load_data_to_s3,  # 호출 가능한 함수
    provide_context=True,  # 호출 가능한 함수에 컨텍스트 제공
    dag=dag,  # 작업이 속한 DAG
)

# 작업 간 의존성 정의 (작업 실행 순서)
fetch_task >> transform_task >> load_task  # 작업 실행 순서 설정: 가져오기 -> 변환 -> 로드

설명:

  • Imports 및 구성: 필요한 라이브러리를 import하고 구성 변수를 설정합니다.

  • 기본 인수: DAG의 기본 인수를 정의합니다. 예를 들어, 소유자, 시작 날짜, 재시도 정책 등이 포함됩니다.

  • DAG 정의: DAG 개체를 설명과 일정 간격으로 생성합니다 (일정을 비활성화하려면 None으로 설정 가능합니다).

  • 작업 함수: 사용할 Python 함수를 작업으로 정의합니다.

  • fetch_weather_data: OpenWeather API에서 날씨 데이터를 가져옵니다.

  • transform_weather_data: 가져온 데이터를 Pandas DataFrame으로 변환합니다.

  • load_data_to_s3: 변환된 데이터를 S3 버킷에 로드합니다.

  1. 작업 생성: PythonOperator를 사용하여 정의된 함수를 호출하는 작업을 생성합니다.

  1. 종속성 설정: 비트 시프트 연산자를 사용하여 작업을 실행할 순서를 정의하세요.

자료들