Immersion In Data

Snowflake/Snowflake - The Complete Masterclass

[Snowflake] Loading data

sungjunminn 2023. 1. 30. 17:44

1. Bulk loading 

  • 가장 빈번한 방법
  • 웨어하우스를 사용
  • 스테이지에서 로드
  • 명령을 복사(Copy Command)
  • 변환 가능

 


2. Continuous loading

  • 소량의 데이터를 로드하도록 설계됨
  • 스테이지에서 추가되면 자동으로 실행
  • 분석을 위해 결과를 늦춤
  • Snowpipe 이용(서버리스 기능)

 


3. 스테이지

  • 데이터를 로드할 수 있는 위치를 포함하는 데이터베이스 개체

 

4. 외부 스테이지(External Stage)

  • S3
  • Google Cloud Platform
  • Microsoft Azure
  • 스키마에서 데이터베이스가 생성됨
  • CREATE STAGE 명령을 사용하여 스테이지를 생성함

 

5. 내부 스테이지(Internal Stage)

  • 로컬 스토리지는 Snowflake에 의해 유지됨

 

6. 데이터 로드 순서(Bulk)

  • 데이터베이스 생성
CREATE OR REPLACE DATABASE MANAGE_DB;

 

 

  • 데이터베이스 내 스키마 생성
CREATE OR REPLACE SCHEMA external_stages;

 

 

  • 외부 클라우드 공급자(AWS)용 스테이지 생성(ALTER문으로 credentials 옵션 변경 가능, 모든 사용자에게 열려 있는 버킷이라면 credentials 옵션 불필요)
CREATE OR REPLACE STAGE MANAGE_DB.external_stages.aws_stage
    url='s3://bucketsnowflakes3'
    credentials=(aws_key_id='ABCD_DUMMY_ID' aws_secret_key='1234abcd_key');
ALTER STAGE aws_stage
    SET credentials=(aws_key_id='XYZ_DUMMY_ID' aws_secret_key='987xyz');

 

 

  • DESC문으로 생성한 스테이지 확인
DESC STAGE MANAGE_DB.external_stages.aws_stage;

 

 

  • LIST문으로 스테이지 세부사항 확인(버킷 개수 확인)
LIST @aws_stage;

 

 

  • 로드할 데이터를 파악하여 정확한 컬럼 정보를 가진 테이블 생성
CREATE OR REPLACE TABLE OUR_FIRST_DB.PUBLIC.ORDERS (
    ORDER_ID VARCHAR(30),
    AMOUNT INT,
    PROFIT INT,
    QUANTITY INT,
    CATEGORY VARCHAR(30),
    SUBCATEGORY VARCHAR(30));

 

 

  • 생성한 테이블 확인
SELECT * FROM OUR_FIRST_DB.PUBLIC.ORDERS;

 

 

  • 스테이지에 여러 버킷이 설정되어 있다면, COPY INTO 문으로 받아올 때, 정확한 파일명을 적는 옵션을 추가해야 함
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS
    FROM @MANAGE_DB.external_stages.aws_stage
    file_format= (type = csv field_delimiter=',' skip_header=1)
    files = ('OrderDetails.csv'); // 특정 파일 지정

 

 

  • .*를 이용하면 .*뒤에오는 문구를 가진 로드함(하지만 같은 위치에 이미 로드한 데이터는 다시 로드하지 않음)
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS
    FROM @MANAGE_DB.external_stages.aws_stage
    file_format= (type = csv field_delimiter=',' skip_header=1)
    pattern='.*Order.*';

 

 

7. 데이터 변환 순서

example 1 : alias를 이용한 select문을 활용하여 복제

  • 로드할 데이터를 파악하여 정확한 컬럼 정보를 가진 테이블 생성 
CREATE OR REPLACE TABLE OUR_FIRST_DB.PUBLIC.ORDERS_EX (
    ORDER_ID VARCHAR(30),
    AMOUNT INT
    );

 

  • alias를 이용한 select문을 활용하여 데이터 복제
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS_EX
    FROM (select s.$1, s.$2 from @MANAGE_DB.external_stages.aws_stage s)
    file_format= (type = csv field_delimiter=',' skip_header=1)
    files=('OrderDetails.csv');



example 2 : CASE WHEN절을 통해 새롭게 생긴 컬럼에 데이터 변환

  • 변환할 데이터 테이블을 생성 
CREATE OR REPLACE TABLE OUR_FIRST_DB.PUBLIC.ORDERS_EX (
    ORDER_ID VARCHAR(30),
    AMOUNT INT,
    PROFIT INT,
    PROFITABLE_FLAG VARCHAR(30)
    );

 

  • CASE WHEN절을 통해 다른 컬럼을 조건으로 위에서 생성한 새로운 컬럼(PROFITABLE_FLAG) 데이터를 변환
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS_EX
    FROM (select 
            s.$1,
            s.$2, 
            s.$3,
            CASE WHEN CAST(s.$3 as int) < 0 THEN 'not profitable' ELSE 'profitable' END 
          from @MANAGE_DB.external_stages.aws_stage s)
    file_format= (type = csv field_delimiter=',' skip_header=1)
    files=('OrderDetails.csv');

 

  • 변환된 데이터 확인
SELECT * FROM OUR_FIRST_DB.PUBLIC.ORDERS_EX;

 

example 3 : substring 구문을 통해 문자열을 변환해 새로운 컬럼 생성

  • 변환할 데이터 테이블 생성
CREATE OR REPLACE TABLE OUR_FIRST_DB.PUBLIC.ORDERS_EX (
    ORDER_ID VARCHAR(30),
    AMOUNT INT,
    PROFIT INT,
    CATEGORY_SUBSTRING VARCHAR(5)
    );

 

  • substring 구문을 통해 시작점과 끝점을 정하여 문자열을 잘라 새로운 컬럼(CATEGORY_SUBSTRING)에 데이터 변환
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS_EX
    FROM (select 
            s.$1,
            s.$2, 
            s.$3,
            substring(s.$5,1,5) 
          from @MANAGE_DB.external_stages.aws_stage s)
    file_format= (type = csv field_delimiter=',' skip_header=1)
    files=('OrderDetails.csv');

 

  • 변환된 데이터 확인
SELECT * FROM OUR_FIRST_DB.PUBLIC.ORDERS_EX;

 

example 4 : 특정컬럼을 제외한 나머지 컬럼 null값으로 변환

  • 변환할 데이터 테이블 생성 
CREATE OR REPLACE TABLE OUR_FIRST_DB.PUBLIC.ORDERS_EX (
    ORDER_ID VARCHAR(30),
    AMOUNT INT,
    PROFIT INT,
    PROFITABLE_FLAG VARCHAR(30)
    );

 

  • (ORDER_ID,PROFIT)을 추가함으로써 선택한 두 컬럼을 제외한 나머지 컬럼은 null값으로 채워짐

COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS_EX (ORDER_ID,PROFIT)
    FROM (select
            s.$1,
            s.$3
        from @MANAGE_DB.external_stages.aws_stage s)
    file_format= (type = csv field_delimiter=',' skip_header=1)
    files=('OrderDetails.csv');

 

  • 변환된 데이터 확인
SELECT * FROM OUR_FIRST_DB.PUBLIC.ORDERS_EX;

 

example 5 : autoincrement 옵션을 통한 해당 컬럼 규칙에 따라 증가

  • 변환할 데이터 테이블 생성 autoincrement 옵션을 통해 해당 컬럼을 규칙에 따라 증가 
CREATE OR REPLACE TABLE OUR_FIRST_DB.PUBLIC.ORDERS_EX (
    ORDER_ID number autoincrement start 1 increment 1,
    AMOUNT INT,
    PROFIT INT,
    PROFITABLE_FLAG VARCHAR(30)
  
    );

 

COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS_EX (PROFIT,AMOUNT)
    FROM (select 
            s.$2,
            s.$3
          from @MANAGE_DB.external_stages.aws_stage s)
    file_format= (type = csv field_delimiter=',' skip_header=1)
    files=('OrderDetails.csv');

 

  • 변환된 데이터 확인
SELECT * FROM OUR_FIRST_DB.PUBLIC.ORDERS_EX WHERE ORDER_ID > 15;

 

 

8. 오류 처리

  • 스테이지 생성
CREATE OR REPLACE STAGE MANAGE_DB.external_stages.aws_stage_errorex
    url='s3://bucketsnowflakes4';

 

  • 생성한 스테이지 확인
 LIST @MANAGE_DB.external_stages.aws_stage_errorex;

 

  • 테이블 생성 
 CREATE OR REPLACE TABLE OUR_FIRST_DB.PUBLIC.ORDERS_EX (
    ORDER_ID VARCHAR(30),
    AMOUNT INT,
    PROFIT INT,
    QUANTITY INT,
    CATEGORY VARCHAR(30),
    SUBCATEGORY VARCHAR(30));

 

  • 데이터 복제 - PROFIT 컬럼은 INT 형식이지만, 복제한 데이터 중 두 개가 String인 오류를 설정 
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS_EX
    FROM @MANAGE_DB.external_stages.aws_stage_errorex
    file_format= (type = csv field_delimiter=',' skip_header=1)
    files = ('OrderDetails_error.csv');

 

  • 데이터 확인 - 오류가 발생하고, 해당 테이블에는 아무것도 들어있지 않은 것을 확인할 수 있다. 
SELECT * FROM OUR_FIRST_DB.PUBLIC.ORDERS_EX;

 

example 1 : ON_ERROR = 'CONTINUE'

  • ON_ERROR 옵션을 통해 부분적으로 데이터 복제 - 1500개의 데이터 중에서 오류 2개를 제외한 1498개 로드
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS_EX
    FROM @MANAGE_DB.external_stages.aws_stage_errorex
    file_format= (type = csv field_delimiter=',' skip_header=1)
    files = ('OrderDetails_error.csv')
    ON_ERROR = 'CONTINUE';

 

  • 데이터 확인
SELECT * FROM OUR_FIRST_DB.PUBLIC.ORDERS_EX;
SELECT COUNT(*) FROM OUR_FIRST_DB.PUBLIC.ORDERS_EX;

 

example 2 : ON_ERROR = 'ABORT_STATEMENT'

  • ON_ERROR = 'ABORT_STATEMENT' 옵션을 통해 전에 로드된 데이터는 스킵하고, 새롭게 추가된 데이터만 로드
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS_EX
    FROM @MANAGE_DB.external_stages.aws_stage_errorex
    file_format= (type = csv field_delimiter=',' skip_header=1)
    files = ('OrderDetails_error.csv','OrderDetails_error2.csv')
    ON_ERROR = 'ABORT_STATEMENT';

 

  • 해당 테이블 TRUNCATE
TRUNCATE TABLE OUR_FIRST_DB.PUBLIC.ORDERS_EX;

 

  • ON_ERROR = 'ABORT_STATEMENT' 옵션을 가진 COPY문 재실행 - 첫 번째 파일에서만 오류가 발생, 두 번째 파일은 로드되지 않음
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS_EX
    FROM @MANAGE_DB.external_stages.aws_stage_errorex
    file_format= (type = csv field_delimiter=',' skip_header=1)
    files = ('OrderDetails_error.csv','OrderDetails_error2.csv')
    ON_ERROR = 'ABORT_STATEMENT';

 

  • 데이터 확인 
SELECT * FROM OUR_FIRST_DB.PUBLIC.ORDERS_EX;
SELECT COUNT(*) FROM OUR_FIRST_DB.PUBLIC.ORDERS_EX;

 

  • 해당 테이블 다시 TRUNCATE
TRUNCATE TABLE OUR_FIRST_DB.PUBLIC.ORDERS_EX;

 

example 3 : ON_ERROR = 'SKIP_FILE'

  • ON_ERROR = 'SKIP_FILE' 옵션을 통해 오류가 발생한 파일은 건너뛰고, 오류가 발생하지 않는 파일은 계속 로드함
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS_EX
    FROM @MANAGE_DB.external_stages.aws_stage_errorex
    file_format= (type = csv field_delimiter=',' skip_header=1)
    files = ('OrderDetails_error.csv','OrderDetails_error2.csv')
    ON_ERROR = 'SKIP_FILE';

 

  • 해당 테이블 다시 TRUNCATE
TRUNCATE TABLE OUR_FIRST_DB.PUBLIC.ORDERS_EX;

 

example 4 : ON_ERROR = 'SKIP_FILE_2'

  • ON_ERROR = 'SKIP_FILE_2' 옵션을 통해 SKIP_FILE_ 다음에 들어가는 숫자에 따라 오류 제한을 표시할 수 있음
    ex) ON_ERROR = 'SKIP_FILE_2' 옵션일 때, 파일의 오류가 3개 이상이라면 건너뜀
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS_EX
    FROM @MANAGE_DB.external_stages.aws_stage_errorex
    file_format= (type = csv field_delimiter=',' skip_header=1)
    files = ('OrderDetails_error.csv','OrderDetails_error2.csv')
    ON_ERROR = 'SKIP_FILE_2';

 

  • 데이터 확인
SELECT * FROM OUR_FIRST_DB.PUBLIC.ORDERS_EX;
SELECT COUNT(*) FROM OUR_FIRST_DB.PUBLIC.ORDERS_EX;

 

  • 해당 테이블 다시 TRUNCATE
TRUNCATE TABLE OUR_FIRST_DB.PUBLIC.ORDERS_EX;

 

example 5 : ON_ERROR = 'SKIP_FILE_0.5%'

  • ON_ERROR = 'SKIP_FILE_0.5%' 옵션을 통해 SKIP_FILE_ 다음에 들어가는 %에 따라 전체 데이터 개수의 비율에 따라 오류 제한을 표시할 수 있음
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS_EX
    FROM @MANAGE_DB.external_stages.aws_stage_errorex
    file_format= (type = csv field_delimiter=',' skip_header=1)
    files = ('OrderDetails_error.csv','OrderDetails_error2.csv')
    ON_ERROR = 'SKIP_FILE_0.5%';

 

  • 데이터 확인
SELECT * FROM OUR_FIRST_DB.PUBLIC.ORDERS_EX;

 

 

 

Udemy의 'Snowflake - The Complete Masterclass (2023 Edition)'를 공부한 내용을 바탕으로 작성하였습니다.