본문 바로가기
#02.천재교육 빅데이터/+07.데이터엔지니어링기초

[천재교육] 데이터 엔지니어링(2)

by 돌비오 2023. 3. 29.
728x90

매개변수(parameter)와 인수(argument)

 

함수의 매개변수(parameter)란 함수를 호출할 때 인수로 전달된 값을 함수 내부에서 사용할 수 있게 해주는 변수입니다.
또한, 함수의 인수(argument)란 함수가 호출될 때 함수로 값을 전달해주는 변수를 가리킵니다.
대부분의 함수는 하나 이상의 매개변수를 가지며, 매개변수가 없는 함수도 존재합니다.

 

 

 

DB_Connector
import psycopg2 as pgsql
import pymongo as mongo


# DB에서 PostgreSQL 또는 MongoDB 데이터베이스에 연결하기 위한 컨텍스트 관리자 인터페이스를 제공하는 클래스를 정의한다. 이 클래스는 명령문 내에서 PostgreSQL 또는 MongoDB 데이터베이스에 연결하고 상호 작용하는 편리한 방법을 제공하여 with컨텍스트가 종료될 때 연결이 올바르게 닫히도록 한다.
class
 DBConnector:


# 이 메서드는 __init__ 과 매개변수를 사용하여 클래스를 초기화합니다 . 매개 변수는 데이터베이스가 로컬 시스템(소스 또는 대상 데이터베이스)에 있는지 여부를 지정하고 매개 변수는 PostgreSQL 또는 MongoDB 데이터베이스인지 여부를 지정합니다.
    def __init__(self, host, port, user, password, database, location, engine): 

        self.host = host
        self.port = port
        self.user = user
        self.password = password
        self.database = database
        self.location = location
        self.engine = engine
        self.conn = None

        if self.location == 'localhost_source':                         # 만약 location 이 localhost_source 이고

            if self.engine == 'postgre':                                      # engine 이 postgre 라면
                self.enter_connect = self._postgre_connect      # postgre 에 연결한다

            elif self.engine == 'mongodb':
                self.enter_connect = self._mongodb_connect

            else:
                raise RuntimeError(f"{self.engine} is not supported")


        elif self.location == 'localhost_target':

            if self.engine == 'postgre':
                self.enter_connect = self._postgre_connect

            elif self.engine == 'mongodb':
                self.enter_connect = self._mongodb_connect

            else:
                raise RuntimeError(f"{self.engine} is not supported")


        else :
            raise RuntimeError(f"{self.location} is not supported")




# 컨텍스트 진입을 위해 문을 사용할 때 메소드가 호출되며, 매개변수에 따라 또는 메소드를 호출하여 데이터베이스 __enter__ 에 연결합니다 . 데이터베이스와 상호 작용하는 데 사용할 수 있는 클래스의 현재 인스턴스를 반환합니다.
    def __enter__(self):
        self.enter_connect()
        return self



# __exit__메서드는 컨텍스트가 종료될 때 호출되며 데이터베이스에 대한 연결을 닫습니다.
    def __exit__(self, exc_type, exc_val, exc_tb):

        try :
            self.conn.close()
        except:
            pass

# _postgre_connect 메소드는 라이브러리를 사용하여 psycopg2클래스 매개변수로 지정된 PostgreSQL 데이터베이스에 연결합니다.
    def _postgre_connect(self):
        self.conn = pgsql.connect(host = self.host, port = self.port, user = self.user,
                                                  password = self.password, dbname = self.database)


# _mongodb_connect 메소드는 라이브러리를 사용하여 pymongo클래스 매개변수로 지정된 MongoDB 데이터베이스에 연결합니다.
    def _mongodb_connect(self):
        self.conn = mongo.mongo_client.MongoClient(host = self.host, port = self.port).get_database(self.database)

 

 

 

 

 

queries_rdb

queries_rdb 관계형 데이터베이스에서 레코드를 읽고 생성하기 위한 SQL 쿼리가 포함된 Python 사전이 있는 것 같습니다 . 키에는 테이블에서 모든 레코드를 선택하기 위한 쿼리와 테이블에서 모든 레코드를 선택하기 위한 read쿼리의 두 가지 쿼리가 포함되어 있습니다 . 키 에는 각각 및 테이블에 레코드를 삽입하기 위한 두 개의 쿼리가 포함되어 있습니다 

테이블 actor에는 아마도 4개의 열이 있고 film테이블에는 13개의 열이 있습니다. 삽입 쿼리는 %s테이블에 삽입될 값에 대해 자리 표시자( )를 사용합니다. 이러한 쿼리는 Python 라이브러리를 사용하여 실행하거나 psycopg2특정 mysql-connector-python데이터베이스 관리 시스템(예: PostgreSQL 또는 MySQL)과 상호 작용하기 위한 것일 수 있습니다.

queries_rdb 는 Python 에서 SQL 명령을 사용하여 데이터베이스와 상호 작용하는데 유용한 도구 .


queries_rdb = {
'read': {
        'actor': f'''
            SELECT * 
            FROM actor
            ;
        ''',
        'film': f'''
            SELECT * 
            FROM film
            ;
        '''
        },
    'create': {
        'actor': '''
            INSERT INTO actor VALUES (%s, %s, %s, %s)
            ;
        ''',
        'film': '''
            INSERT INTO film VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            ;
        '''
}
}

queries_job1 = {
    'read': {
        'actor': f'''
            SELECT * 
            FROM actor
            ;
        ''',
        'film': f'''
            SELECT * 
            FROM film
            ;
        ''',
        'film_actor': f'''
            SELECT *
            FROM film_actor
            ;
        '''
        }
    }


queries_job2 = {
    'read': {
        'actor': '''
            SELECT '{batch_month}' AS YYYYMM, * 
            FROM actor 
            ;
        ''',
        'film': '''
            SELECT '{batch_month}' AS YYYYMM, * 
            FROM film 
            ;
        ''',
        'film_actor': '''
            SELECT '{batch_month}' AS YYYYMM, *
            FROM film_actor
            ;
        '''
        }
    }

 

 

 

 

 

 

main.py

 

Click 라이브러리를 사용하여 명령줄 인터페이스를 정의하는 Python 스크립트입니다. 이 명령은 이름이 지정 start_batch되고 선택적 인수를 허용 -m하거나 --custom-batch-month실행할 배치 작업의 월을 지정합니다. 인수가 제공되지 않으면 스크립트는 배치 월을 이전 달로 설정합니다.

그런 다음 스크립트는 배치 월이 유효한지 확인하고 etl_3배치 월을 인수로 사용하여 파이프라인 컨트롤러 함수를 실행합니다. 파이프라인 컨트롤러 기능 실행 중에 예외가 발생하면 스크립트는 오류 메시지를 인쇄하고 상태 코드 1로 종료합니다. 실행에 성공하면 스크립트는 상태 코드 0으로 종료됩니다.

이 start_batch함수는 입력된 사용자 지정 배치 월과 함수에서 얻은 배치 월을 인쇄합니다 _get_batch_month. 배치 월이 유효하지 않으면 함수는 메시지를 인쇄하고 상태 코드 1로 종료합니다.

이 _get_batch_month함수는 사용자 정의 배치 월 인수가 제공되고 유효한지 확인합니다. 사용자 지정 배치 월이 제공되지 않으면 함수는 배치 월을 이전 달로 설정합니다. 이 함수는 배치 월을 형식으로 반환합니다 YYYYMM.

이 _check_valid_month함수는 형식의 지정된 문자열이 메서드를 YYYYMM사용하여 유효한 월인지 확인합니다 strptime. 문자열이

import click
from datetime import datetime, timedelta

import sys
import setting
from pipeline.controller import etl_1, etl_2, etl_3


@click.command()
@click.option('-m', '--custom-batch-month', type = click.STRING, default='', help='배치작업연월')
def start_batch(custom_batch_month):

    print('input :', custom_batch_month)
    batch_month = _get_batch_month(custom_batch_month)
    print('get_batch_month :', batch_month) 

    if not batch_month:
        print('batch_month is None')
        sys.exit(1)
    try:
        # etl_1(batch_month)
        # etl_2(batch_month)
        etl_3(batch_month)
    except Exception as e:
        print(e)
        sys.exit(1)
    sys.exit(0)



def _get_batch_month(_custom_batch_month):
    if _custom_batch_month:
        print('custom_batch > batch_month : ', _custom_batch_month)
        return _check_valid_month(_custom_batch_month)

    first_day = datetime.today().replace(day = 1)
    batch_month = first_day - timedelta(days = 1)
    return batch_month.strftime('%Y%m')



def _check_valid_month(str_yyyymm):
    try:
        # print(str_yyyymm)
        datetime.strptime(str_yyyymm, '%Y%m')
        return str_yyyymm
    except Exception as e:
        return None



if __name__ == '__main__':
    print('start_batch_job')
    start_batch()
    print('end_batch_job') 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 


<< 파이프라인 >>

 

 

controller.py

etl_1이 함수는 쿼리 목록을 사용하여 소스 데이터베이스에서 데이터를 추출하고( extract.rdb_cursor_extractor), 필요에 따라 변환하고, 다른 것을 사용하여 결과를 대상 데이터베이스로 로드하는 파이프라인( )의 첫 번째 단계에 대한 ETL 프로세스를 정의하는 것으로 보입니다. 쿼리 목록( load.rdb_cursor_loader).

_yyyymm이 함수는 함수 에서 계산된 배치 월인 문자열을 입력으로 사용합니다 start_batch. extract.rdb_cursor_extractor이 값은 추출되거나 로드되는 데이터를 필터링하기 위해 및 함수 내에서 어떤 방식으로 사용되지만 load.rdb_cursor_loader해당 함수의 구현을 보지 않고는 방법이 명확하지 않습니다.

이 기능은 ETL 프로세스의 시작과 끝에서 메시지를 출력하며, 이는 실행 중 파이프라인의 진행 상황을 추적하는 데 유용할 수 있습니다.

전반적으로 이 기능은 다단계 ETL 파이프라인의 첫 번째 단계를 정의하는 것처럼 보이지만 추가 컨텍스트가 없으면 구현의 전반적인 효율성 또는 정확성을 평가하기 어렵습니다.

from db.connector import DBConnector
from db.querise_rdb import queries_rdb, queries_job2, queries_job1

from pipeline import extract, transform, load

from settings import DB_SETTINGS



def etl_1(_yyyymm):
    print('start_etl_1')
    print(_yyyymm)

    result = extract.rdb_cursor_extractor(
        db_connector= DBConnector(**DB_SETTINGS['source_db_localhost_rdb'])
        , _query_list = queries_rdb
        )

    load.rdb_cursor_loader(
        db_connector = DBConnector(**DB_SETTINGS['target_db_localhost_rdb'])
        , _query_list = queries_rdb
        , _result = result
        )
    
    print('end_etl_1')


def etl_2(_yyyymm):
    print('start_etl_2')
    print(_yyyymm)

    result = extract.rdb_pandas_extractor(
        db_connector = DBConnector(**DB_SETTINGS['source_db_localhost_rdb'])
        , _query_list = queries_job2
        , param = {'batch_month' : _yyyymm}
    )

    load.rdb_pandas_loader(
        db_connector = DBConnector(**DB_SETTINGS['target_db_localhost_rdb'])
        , _query_list = queries_job2
        , _name = ''
        , _result = result
    )

    print('end_etl_2')




def etl_3(_yyyymm):
    print('start_etl_3')

    result = extract.rdb_pandas_extractor(db_connector = DBConnector(**DB_SETTINGS['source_db_localhost_rdb']), _query_list = queries_job2, 
                                          param = {'batch_month' : _yyyymm})

    result2 = transform.rdb_pandas_transform(result)
    
    load.rdb_pandas_loader(db_connector = DBConnector(**DB_SETTINGS['target_db_localhost_rdb']), _query_list = queries_job2
                                    , _name = ''
                                    , _result = result)
    
    load.rdb_pandas_loader2(db_connector = DBConnector(**DB_SETTINGS['target_db_localhost_rdb']), _query_list = queries_job2
                                    , _name = ''
                                    , _result = result2)

    print('end_etl_3')

 

728x90