본문 바로가기
#02.천재교육 빅데이터/+07.데이터엔지니어링기초

[천재교육] 데이터 엔지니어링 - 파이프라인, connector, settings

by 돌비오 2023. 3. 27.
728x90
기초 짚어가기

 

1) 라이브러리 (library)

: 모듈과 패키지의 모음
- 표준 라이브러리 (Standard Library) 
    eg) time, sys, os, math, random, urlib 등
- 외부 라이브러리 (Third Party Library)
    eg) requests, scrapy, webbrowser
- 엄밀히 정의하자면, 패키지보다 상위 개념
- 보통 module/package 등이 publish되면 library라고 부름
- library는 특정한 기능을 제공하는 게 아님. 즉, '라이브러리를 실행'할 순 없다. ( you cannot 'run a library')

 

 

2) 패키지 (package)

: 모듈의 묶음; 여러 모듈들을 하나의 상위 폴더에 넣어 놓은 것
(가끔) 라이브러리를 배포하는 메커니즘을 말하기도 함
- . 을 이용하여 파이썬 모듈을 계층적으로 관리할 수 있도록 하는 것
    eg aaa.bbb : aaa는 패키지, bbb는 모듈
- 기능적으로 유사한 모듈의 집합

 

 

3) 모듈 (module)

: 파이썬 파일(.py); script
- import로 불러오는 것이 module

 

 

4) 프레임워크

vs. 라이브러리 vs. API

  흐름 소유 예시
프레임워크 전체적인 흐름을 자체적으로 지님
라이브러리 프로그래머가 전체적인 흐름을 지님 가구

 

 

 

출처)

 

헷갈리는 용어 정리 - 라이브러리/패키지/모듈, 함수/메서드, 매개변수/(전달)인자, 클래스/객체

1. 라이브러리/패키지/모듈 + 프레임워크/API/플랫폼,아키텍처 모든 패키지는 모듈이지만, 모든 모듈이 패키지는 아니다. 즉, 패키지는 모듈의 한 종류이다. '__path__' 속성을 지닌 모듈이 패키지라

mola23.tistory.com

 

 

 

 


파이프라인 (Pipeline)

한 데이터 처리 단계의 출력이 다음 단계의 입력으로 이어지는 형태로 연결된 구조

 

 

이번 시간에 배울 내용은 위 이미지에서 ① 과정.

즉, 통계db → 분석서버 → s3 과정

 

 

 

 


code skeleton
  root          개별 dir 내부      기타
init.py -  
settins.py - 접속을 위해 필요한 여러가지.
main.py - 컨트롤러만 불러옴
db/ init.py  
db/ connector.py  
db/ query.py  
pipeline/ init.py  
pipeline/ controller.py 어느 시점에서 익스트랙트 등 각각의 모듈이 붙어야 한다! 정의
pipeline/ extract.py  
pipeline/ transform.py  
pipeline/ load.py  
messages.py - option
.gitignore - option
.env - option

 

 

 

 

 


class
class test:
    temp0 = "0"                                                  # attribute(변수) 를 선언하고
    temp1 = "1"

    def __init__(self, A, B, C):                            # method(함수) 를 선언한다.
        self.a = A
        self.b = B
        self.c = C

    def text(self):
        return self.a, self.b, self.c

    def text_concat(self):
        return self.a + self.b

    def text_concat_v2(self, temp = temp0):
        return self.a + self.b + temp



test(1, 2, 3).text()
=> (1, 2, 3)


test("1", "2", "3").text_concat()
=> '12'


test("1", "2", "3").text_concat_v2()
=> '120'

 

 

 

connector.py
import psycopg2 as pgsql                     # 파이썬을 위한 포스트그리 라이브러리
import pymongo as mongo                   # 파이썬을 위한 몽고db 라이브러리


# 포스트그리sql 연결
conn = pgsql.connect
(host= "127.0.0.1", port = "5432", user = "postgres", password="ps", dbname = "dvdrental" )

# 몽고DB 연결
conn = mongo.mongo_client.MongoClient
(host= "127.0.0.1", port = 27017, username = "admin", password="ps")





# connector 클래스를 만들어 보자!
class DBConnector:               


# __init__(self)
 객체를 생성할 때 자동으로 호출되는 특수한 메소드.

 반드시 첫 번째 인자는 self 이어야 한다.
 이 메소드 내부에 클래스 변수를 생성할 수 있다.

# 객체를 생성할 때 이름을 사용자가 입력하도록 하려면
 __init__()메소드에 일반 인자를 self 뒤에 주면 된다.

    def __init__(self, host, port, user, password, database, location, engine):
        self.host = host
        self.port = port
        self.user = user
        self.password = password
        self.database = database
        self.location = location
        self.engine = engine
        # self.conn = None

        # if self.location == 'localhost_source':
        #     if self.engine == 'postgre':
        #         self.connect = self._postgre_connect
        #     elif self.engine == 'mongodb':
        #         self.connect = self._mongodb_connect
        #     else:
        #         raise RuntimeError(f"{self.engine} is not supported")
        # elif self.location == 'localhost_target':
        #     if self.engine == 'postgre':
        #         self.connect = self._postgre_connect
        #     elif self.engine == 'mongodb':
        #         self.connect = self._mongodb_connect
        #     else:
        #         raise RuntimeError(f"{self.engine} is not supported")
        # else :
        #     raise RuntimeError(f"{self.location} is not supported")

    # def __enter__(self):
    #     self.connect()
    #     return self

    # def __exit__(self, exc_type, exc_val, exc_tb):
    #     self.conn.close()

    def _postgre_connect(self):
        conn = pgsql.connect
        (host = self.host, port = self.port, user = self.user, password = self.password, dbname = self.database)
        return conn

    def _mongodb_connect(self):
        conn = mongo.mongo_client.MongoClient
        (host = self.host, port = self.port, username = self.user, password = self.password)
        return conn

 

 

 

 

query.py
queries = {
'read': {
        'actor': '''
            SELECT '{batch_month}' AS YYYYMM, * 
            FROM actor 
            WHERE TO_CHAR(CAST(last_update AS DATE), 'YYYYMM') = '{batch_month}'
            ;
        ''',
        'film': '''
            SELECT '{batch_month}' AS YYYYMM, * 
            FROM film 
            WHERE TO_CHAR(CAST(last_update AS DATE), 'YYYYMM') = '{batch_month}'
            ;
        '''},
    'create': {
        'actor': '''
            INSERT INTO actor_back VALUES (%s, %s, %s, %s, %s)
            ;
        ''',
        'film': '''
            INSERT INTO film_back VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            ;
        '''
}
    }

 

 

 

 

settings.py
DB_SETTINGS = {
    'source_db_localhost_rdb': {                        # RDB : dvdrental를 localhost → 어딘가로 보낸다
        'host' : "127.0.0.1",
        'port' : "5432",
        'user' : "postgres",
        'password' : "qwe123",
        'database' : "dvdrental",
        'location' : "localhost_source",
        'engine' : "postgre"
    },
    'source_db_localhost_ddb' : {                         # DDB : book_info를 localhost → 어딘가로 보낸다
        'host' : "127.0.0.1",
        'port' : 27017,
        'user' : "admin",
        'password' : "qwe123",
        'database' : None,
        'location' : "localhost_source",
        'engine' : "mongodb"
    },
    'target_db_localhost_rdb' : {
        'host' : "127.0.0.1",
        'port' : "5432",
        'user' : "postgres",
        'password' : "qwe123",
        'database' : "temp1",
        'location' : "localhost_target",
        'engine' : "postgre"
    },
    'target_db_localhost_ddb' : {
        'host' : "127.0.0.1",
        'port' : 27017,
        'user' : "admin",
        'password' : "qwe123",
        'database' : None,
        'location' : "localhost_target",
        'engine' : "mongodb"
    },
}
728x90