매개변수(parameter)와 인수(argument)
함수의 매개변수(parameter)란 함수를 호출할 때 인수로 전달된 값을 함수 내부에서 사용할 수 있게 해주는 변수입니다.
또한, 함수의 인수(argument)란 함수가 호출될 때 함수로 값을 전달해주는 변수를 가리킵니다.
대부분의 함수는 하나 이상의 매개변수를 가지며, 매개변수가 없는 함수도 존재합니다.
DB_Connector
import psycopg2 as pgsql
import pymongo as mongo
# DB에서 PostgreSQL 또는 MongoDB 데이터베이스에 연결하기 위한 컨텍스트 관리자 인터페이스를 제공하는 클래스를 정의한다. 이 클래스는 명령문 내에서 PostgreSQL 또는 MongoDB 데이터베이스에 연결하고 상호 작용하는 편리한 방법을 제공하여 with컨텍스트가 종료될 때 연결이 올바르게 닫히도록 한다.
class DBConnector:
# 이 메서드는 __init__ 과 매개변수를 사용하여 클래스를 초기화합니다 . 매개 변수는 데이터베이스가 로컬 시스템(소스 또는 대상 데이터베이스)에 있는지 여부를 지정하고 매개 변수는 PostgreSQL 또는 MongoDB 데이터베이스인지 여부를 지정합니다.
def __init__(self, host, port, user, password, database, location, engine):
self.host = host
self.port = port
self.user = user
self.password = password
self.database = database
self.location = location
self.engine = engine
self.conn = None
if self.location == 'localhost_source': # 만약 location 이 localhost_source 이고
if self.engine == 'postgre': # engine 이 postgre 라면
self.enter_connect = self._postgre_connect # postgre 에 연결한다
elif self.engine == 'mongodb':
self.enter_connect = self._mongodb_connect
else:
raise RuntimeError(f"{self.engine} is not supported")
elif self.location == 'localhost_target':
if self.engine == 'postgre':
self.enter_connect = self._postgre_connect
elif self.engine == 'mongodb':
self.enter_connect = self._mongodb_connect
else:
raise RuntimeError(f"{self.engine} is not supported")
else :
raise RuntimeError(f"{self.location} is not supported")
# 컨텍스트 진입을 위해 문을 사용할 때 메소드가 호출되며, 매개변수에 따라 또는 메소드를 호출하여 데이터베이스 __enter__ 에 연결합니다 . 데이터베이스와 상호 작용하는 데 사용할 수 있는 클래스의 현재 인스턴스를 반환합니다.
def __enter__(self):
self.enter_connect()
return self
# __exit__메서드는 컨텍스트가 종료될 때 호출되며 데이터베이스에 대한 연결을 닫습니다.
def __exit__(self, exc_type, exc_val, exc_tb):
try :
self.conn.close()
except:
pass
# _postgre_connect 메소드는 라이브러리를 사용하여 psycopg2클래스 매개변수로 지정된 PostgreSQL 데이터베이스에 연결합니다.
def _postgre_connect(self):
self.conn = pgsql.connect(host = self.host, port = self.port, user = self.user,
password = self.password, dbname = self.database)
# _mongodb_connect 메소드는 라이브러리를 사용하여 pymongo클래스 매개변수로 지정된 MongoDB 데이터베이스에 연결합니다.
def _mongodb_connect(self):
self.conn = mongo.mongo_client.MongoClient(host = self.host, port = self.port).get_database(self.database)
queries_rdb
queries_rdb 관계형 데이터베이스에서 레코드를 읽고 생성하기 위한 SQL 쿼리가 포함된 Python 사전이 있는 것 같습니다 . 키에는 테이블에서 모든 레코드를 선택하기 위한 쿼리와 테이블에서 모든 레코드를 선택하기 위한 read쿼리의 두 가지 쿼리가 포함되어 있습니다 . 키 에는 각각 및 테이블에 레코드를 삽입하기 위한 두 개의 쿼리가 포함되어 있습니다
테이블 actor에는 아마도 4개의 열이 있고 film테이블에는 13개의 열이 있습니다. 삽입 쿼리는 %s테이블에 삽입될 값에 대해 자리 표시자( )를 사용합니다. 이러한 쿼리는 Python 라이브러리를 사용하여 실행하거나 psycopg2특정 mysql-connector-python데이터베이스 관리 시스템(예: PostgreSQL 또는 MySQL)과 상호 작용하기 위한 것일 수 있습니다.
queries_rdb 는 Python 에서 SQL 명령을 사용하여 데이터베이스와 상호 작용하는데 유용한 도구 .
queries_rdb = {
'read': {
'actor': f'''
SELECT *
FROM actor
;
''',
'film': f'''
SELECT *
FROM film
;
'''
},
'create': {
'actor': '''
INSERT INTO actor VALUES (%s, %s, %s, %s)
;
''',
'film': '''
INSERT INTO film VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
;
'''
}
}
queries_job1 = {
'read': {
'actor': f'''
SELECT *
FROM actor
;
''',
'film': f'''
SELECT *
FROM film
;
''',
'film_actor': f'''
SELECT *
FROM film_actor
;
'''
}
}
queries_job2 = {
'read': {
'actor': '''
SELECT '{batch_month}' AS YYYYMM, *
FROM actor
;
''',
'film': '''
SELECT '{batch_month}' AS YYYYMM, *
FROM film
;
''',
'film_actor': '''
SELECT '{batch_month}' AS YYYYMM, *
FROM film_actor
;
'''
}
}
main.py
Click 라이브러리를 사용하여 명령줄 인터페이스를 정의하는 Python 스크립트입니다. 이 명령은 이름이 지정 start_batch되고 선택적 인수를 허용 -m하거나 --custom-batch-month실행할 배치 작업의 월을 지정합니다. 인수가 제공되지 않으면 스크립트는 배치 월을 이전 달로 설정합니다.
그런 다음 스크립트는 배치 월이 유효한지 확인하고 etl_3배치 월을 인수로 사용하여 파이프라인 컨트롤러 함수를 실행합니다. 파이프라인 컨트롤러 기능 실행 중에 예외가 발생하면 스크립트는 오류 메시지를 인쇄하고 상태 코드 1로 종료합니다. 실행에 성공하면 스크립트는 상태 코드 0으로 종료됩니다.
이 start_batch함수는 입력된 사용자 지정 배치 월과 함수에서 얻은 배치 월을 인쇄합니다 _get_batch_month. 배치 월이 유효하지 않으면 함수는 메시지를 인쇄하고 상태 코드 1로 종료합니다.
이 _get_batch_month함수는 사용자 정의 배치 월 인수가 제공되고 유효한지 확인합니다. 사용자 지정 배치 월이 제공되지 않으면 함수는 배치 월을 이전 달로 설정합니다. 이 함수는 배치 월을 형식으로 반환합니다 YYYYMM.
이 _check_valid_month함수는 형식의 지정된 문자열이 메서드를 YYYYMM사용하여 유효한 월인지 확인합니다 strptime. 문자열이
import click
from datetime import datetime, timedelta
import sys
import setting
from pipeline.controller import etl_1, etl_2, etl_3
@click.command()
@click.option('-m', '--custom-batch-month', type = click.STRING, default='', help='배치작업연월')
def start_batch(custom_batch_month):
print('input :', custom_batch_month)
batch_month = _get_batch_month(custom_batch_month)
print('get_batch_month :', batch_month)
if not batch_month:
print('batch_month is None')
sys.exit(1)
try:
# etl_1(batch_month)
# etl_2(batch_month)
etl_3(batch_month)
except Exception as e:
print(e)
sys.exit(1)
sys.exit(0)
def _get_batch_month(_custom_batch_month):
if _custom_batch_month:
print('custom_batch > batch_month : ', _custom_batch_month)
return _check_valid_month(_custom_batch_month)
first_day = datetime.today().replace(day = 1)
batch_month = first_day - timedelta(days = 1)
return batch_month.strftime('%Y%m')
def _check_valid_month(str_yyyymm):
try:
# print(str_yyyymm)
datetime.strptime(str_yyyymm, '%Y%m')
return str_yyyymm
except Exception as e:
return None
if __name__ == '__main__':
print('start_batch_job')
start_batch()
print('end_batch_job')
<< 파이프라인 >>
controller.py
etl_1이 함수는 쿼리 목록을 사용하여 소스 데이터베이스에서 데이터를 추출하고( extract.rdb_cursor_extractor), 필요에 따라 변환하고, 다른 것을 사용하여 결과를 대상 데이터베이스로 로드하는 파이프라인( )의 첫 번째 단계에 대한 ETL 프로세스를 정의하는 것으로 보입니다. 쿼리 목록( load.rdb_cursor_loader).
_yyyymm이 함수는 함수 에서 계산된 배치 월인 문자열을 입력으로 사용합니다 start_batch. extract.rdb_cursor_extractor이 값은 추출되거나 로드되는 데이터를 필터링하기 위해 및 함수 내에서 어떤 방식으로 사용되지만 load.rdb_cursor_loader해당 함수의 구현을 보지 않고는 방법이 명확하지 않습니다.
이 기능은 ETL 프로세스의 시작과 끝에서 메시지를 출력하며, 이는 실행 중 파이프라인의 진행 상황을 추적하는 데 유용할 수 있습니다.
전반적으로 이 기능은 다단계 ETL 파이프라인의 첫 번째 단계를 정의하는 것처럼 보이지만 추가 컨텍스트가 없으면 구현의 전반적인 효율성 또는 정확성을 평가하기 어렵습니다.
from db.connector import DBConnector
from db.querise_rdb import queries_rdb, queries_job2, queries_job1
from pipeline import extract, transform, load
from settings import DB_SETTINGS
def etl_1(_yyyymm):
print('start_etl_1')
print(_yyyymm)
result = extract.rdb_cursor_extractor(
db_connector= DBConnector(**DB_SETTINGS['source_db_localhost_rdb'])
, _query_list = queries_rdb
)
load.rdb_cursor_loader(
db_connector = DBConnector(**DB_SETTINGS['target_db_localhost_rdb'])
, _query_list = queries_rdb
, _result = result
)
print('end_etl_1')
def etl_2(_yyyymm):
print('start_etl_2')
print(_yyyymm)
result = extract.rdb_pandas_extractor(
db_connector = DBConnector(**DB_SETTINGS['source_db_localhost_rdb'])
, _query_list = queries_job2
, param = {'batch_month' : _yyyymm}
)
load.rdb_pandas_loader(
db_connector = DBConnector(**DB_SETTINGS['target_db_localhost_rdb'])
, _query_list = queries_job2
, _name = ''
, _result = result
)
print('end_etl_2')
def etl_3(_yyyymm):
print('start_etl_3')
result = extract.rdb_pandas_extractor(db_connector = DBConnector(**DB_SETTINGS['source_db_localhost_rdb']), _query_list = queries_job2,
param = {'batch_month' : _yyyymm})
result2 = transform.rdb_pandas_transform(result)
load.rdb_pandas_loader(db_connector = DBConnector(**DB_SETTINGS['target_db_localhost_rdb']), _query_list = queries_job2
, _name = ''
, _result = result)
load.rdb_pandas_loader2(db_connector = DBConnector(**DB_SETTINGS['target_db_localhost_rdb']), _query_list = queries_job2
, _name = ''
, _result = result2)
print('end_etl_3')
'#02.천재교육 빅데이터 > +07.데이터엔지니어링기초' 카테고리의 다른 글
| [천재교육] 데이터 엔지니어링 - 파이프라인, connector, settings (0) | 2023.03.27 |
|---|