Immersion In Data

Snowflake/Snowflake - The Complete Masterclass

[Snowflake] Copy options

sungjunminn 2023. 2. 1. 09:19

1. VALIDATION_MODE


RETURN_n_ROWS : 지정된 행 수를 확인하고 반환한다. 처음 발생한 오류에서 실패한다.  
RETURN_ERRORS : Copy Command의 모든 오류를 반환한다. 

 

  • 새로운 database 생성
CREATE OR REPLACE DATABASE COPY_DB;

 

  • 테이블 생성
CREATE OR REPLACE TABLE  COPY_DB.PUBLIC.ORDERS (
    ORDER_ID VARCHAR(30),
    AMOUNT VARCHAR(30),
    PROFIT INT,
    QUANTITY INT,
    CATEGORY VARCHAR(30),
    SUBCATEGORY VARCHAR(30));




example 1 : 에러가 없는 경우

  • 스테이지 생성
CREATE OR REPLACE STAGE COPY_DB.PUBLIC.aws_stage_copy
    url='s3://snowflakebucket-copyoption/size/';

 

  • 스테이지 확인
LIST @COPY_DB.PUBLIC.aws_stage_copy;

 

  • VALIDATION_MODE = RETURN_ERRORS 옵션을 통해 오류가 있다면 모든 오류를 반환하고, 
    오류가 없다면 아직 어떠한 행도 로드되지 않음(= 모든 행을 앞으로 로드할 수 있음)
COPY INTO COPY_DB.PUBLIC.ORDERS
    FROM @aws_stage_copy
    file_format= (type = csv field_delimiter=',' skip_header=1)
    pattern='.*Order.*'
    VALIDATION_MODE = RETURN_ERRORS;

 

  • 테이블에 로드된 데이터가 있는지 확인
SELECT * FROM ORDERS;

 

  • VALIDATION_MODE = RETURN_5_ROWS 옵션을 통해 처음 5개의 행을 유효성 검사를 해서 오류가 없으면 보여줌
    (= 5개 행을 로드할 수 있음)
COPY INTO COPY_DB.PUBLIC.ORDERS
    FROM @aws_stage_copy
    file_format= (type = csv field_delimiter=',' skip_header=1)
    pattern='.*Order.*'
   VALIDATION_MODE = RETURN_5_ROWS;



example 2 : 에러가 있는 경우

  • 새로운 스테이지 생성
CREATE OR REPLACE STAGE COPY_DB.PUBLIC.aws_stage_copy
    url='s3://snowflakebucket-copyoption/returnfailed/';

 

  • 스테이지 확인
LIST @COPY_DB.PUBLIC.aws_stage_copy;

 

  • VALIDATION_MODE = RETURN_ERRORS 옵션을 통해 어떠한 오류가 어디에 발생했는지 유효성 검사
COPY INTO COPY_DB.PUBLIC.ORDERS
    FROM @aws_stage_copy
    file_format= (type = csv field_delimiter=',' skip_header=1)
    pattern='.*Order.*'
    VALIDATION_MODE = RETURN_ERRORS;

 

  • VALIDATION_MODE = RETURN_1_rows 옵션을 통해 오류가 발생하면 반환되는 행을 볼 수 없음
COPY INTO COPY_DB.PUBLIC.ORDERS
    FROM @aws_stage_copy
    file_format= (type = csv field_delimiter=',' skip_header=1)
    pattern='.*Order.*'
    VALIDATION_MODE = RETURN_1_rows;




2. Working with rejected records



example 1 : VALIDATION_MODE 이후에 거부된 파일 저장

  • 새로운 테이블 생성
CREATE OR REPLACE TABLE  COPY_DB.PUBLIC.ORDERS (
    ORDER_ID VARCHAR(30),
    AMOUNT VARCHAR(30),
    PROFIT INT,
    QUANTITY INT,
    CATEGORY VARCHAR(30),
    SUBCATEGORY VARCHAR(30));

 

  • VALIDATION_MODE = RETURN_ERRORS 옵션을 통해 오류가 발생한 지점 확인
COPY INTO COPY_DB.PUBLIC.ORDERS
    FROM @aws_stage_copy
    file_format= (type = csv field_delimiter=',' skip_header=1)
    pattern='.*Order.*'
    VALIDATION_MODE = RETURN_ERRORS;

 

  • CTAS문을 통해 Query Details에서 Query ID를 복사해 새로운 테이블 생성
CREATE OR REPLACE TABLE rejected AS 
select rejected_record from table(result_scan('01aa034e-0000-16ef-0000-45550001d5fa'));



cf) 필요에 따라 가장 마지막에 진행한 query를 추가하여 last_query_id()로 불러서 쓸 수 있음

INSERT INTO rejected
select rejected_record from table(result_scan(last_query_id()));

 

  • 생성한 테이블 확인
SELECT * FROM rejected;



example 2 : 거부된 파일을 VALIDATION_MODE 없이 저장

  • ON_ERROR=CONTINUE 옵션을 통해 로드한 파일 상태 확인
COPY INTO COPY_DB.PUBLIC.ORDERS
    FROM @aws_stage_copy
    file_format= (type = csv field_delimiter=',' skip_header=1)
    pattern='.*Order.*'
    ON_ERROR=CONTINUE;

 

  • validate를 사용하여 오류와 거부된 오류를 확인
select * from table(validate(orders, job_id => '_last'));




example 3 : 거부된 레코드로 작업

  • ,로 구분되어 있는 테이블 확인
SELECT REJECTED_RECORD FROM rejected;

 

  • SPLIT_PART를 통해 하나의 컬럼으로 되어있는 테이블을 CTAS로 새로운 테이블을 생성
CREATE OR REPLACE TABLE rejected_values as
SELECT 
SPLIT_PART(rejected_record,',',1) as ORDER_ID, 
SPLIT_PART(rejected_record,',',2) as AMOUNT, 
SPLIT_PART(rejected_record,',',3) as PROFIT, 
SPLIT_PART(rejected_record,',',4) as QUATNTITY, 
SPLIT_PART(rejected_record,',',5) as CATEGORY, 
SPLIT_PART(rejected_record,',',6) as SUBCATEGORY
FROM rejected;

 

  • 생성한 테이블 확인
SELECT * FROM rejected_values;




3. SIZE_LIMIT


임계값을 초과하면 Copy Command가 로드를 중지함
첫 번째 파일 이후에 이 임계값을 초과하면 다음 파일은 더 이상 로드되지 않음

 

  • 데이터베이스 생성
CREATE OR REPLACE DATABASE COPY_DB;

 

  • 테이블 생성
CREATE OR REPLACE TABLE  COPY_DB.PUBLIC.ORDERS (
    ORDER_ID VARCHAR(30),
    AMOUNT VARCHAR(30),
    PROFIT INT,
    QUANTITY INT,
    CATEGORY VARCHAR(30),
    SUBCATEGORY VARCHAR(30));

 

  • 스테이지 생성
CREATE OR REPLACE STAGE COPY_DB.PUBLIC.aws_stage_copy
    url='s3://snowflakebucket-copyoption/size/';

 

  • 스테이지 확인 - 첫 번재 파일 : 54,600, 두 번째 파일 : 54598
LIST @aws_stage_copy;

 

  • SIZE_LIMIT 옵션을 통해 사이즈 제한 (첫 번째 파일에서 이미 SIZE_LIMIT=20000를 초과하기 때문에 두 번째 파일을 아예 로드되지 않음)
COPY INTO COPY_DB.PUBLIC.ORDERS
    FROM @aws_stage_copy
    file_format= (type = csv field_delimiter=',' skip_header=1)
    pattern='.*Order.*'
    SIZE_LIMIT=20000;





4. RETURN_FAILED_ONLY


로드에 실패한 파일만 반환할지 여부 지정

 

  • 테이블 생성
CREATE OR REPLACE TABLE  COPY_DB.PUBLIC.ORDERS (
    ORDER_ID VARCHAR(30),
    AMOUNT VARCHAR(30),
    PROFIT INT,
    QUANTITY INT,
    CATEGORY VARCHAR(30),
    SUBCATEGORY VARCHAR(30));

 

  • 스테이지 생성
CREATE OR REPLACE STAGE COPY_DB.PUBLIC.aws_stage_copy
    url='s3://snowflakebucket-copyoption/returnfailed/';

 

  • 스테이지 확인
LIST @COPY_DB.PUBLIC.aws_stage_copy;

 

  • RETURN_FAILED_ONLY=TRUE 옵션을 통해 Copy Command 중에서 오류가 있는 파일만 표시 
COPY INTO COPY_DB.PUBLIC.ORDERS
    FROM @aws_stage_copy
    file_format= (type = csv field_delimiter=',' skip_header=1)
    pattern='.*Order.*'
    RETURN_FAILED_ONLY = TRUE;

 

  • ON_ERROR =CONTINUE 옵션과 함께 RETURN_FAILED_ONLY = TRUE 옵션을 사용하여 부분적으로 로드된 파일만 표시
COPY INTO COPY_DB.PUBLIC.ORDERS
    FROM @aws_stage_copy
    file_format= (type = csv field_delimiter=',' skip_header=1)
    pattern='.*Order.*'
    ON_ERROR =CONTINUE
    RETURN_FAILED_ONLY = TRUE;



cf) RETURN_FAILED_ONLY 옵션을 언급하지 않는다면 Default 값인 FALSE로 입력된 것과 같은 결과가 나옴



5. TRUNCATECOLUMNS


컬럼의 형식에 따라 값을 잘라줌

 

  • 테이블 생성
CREATE OR REPLACE TABLE  COPY_DB.PUBLIC.ORDERS (
    ORDER_ID VARCHAR(30),
    AMOUNT VARCHAR(30),
    PROFIT INT,
    QUANTITY INT,
    CATEGORY VARCHAR(10),
    SUBCATEGORY VARCHAR(30));

 

  • 스테이지 생성
CREATE OR REPLACE STAGE COPY_DB.PUBLIC.aws_stage_copy
    url='s3://snowflakebucket-copyoption/size/';

 

  • 스테이지 확인
LIST @COPY_DB.PUBLIC.aws_stage_copy;

 

  • TRUNCATECOLUMNS 옵션을 사용하지 않은 경우 오류 발생(형식과 맞지 않음)
COPY INTO COPY_DB.PUBLIC.ORDERS
    FROM @aws_stage_copy
    file_format= (type = csv field_delimiter=',' skip_header=1)
    pattern='.*Order.*';

 

  • TRUNCATECOLUMNS 옵션을 통해 테이블 형식에 맞게 맞춰줌
COPY INTO COPY_DB.PUBLIC.ORDERS
    FROM @aws_stage_copy
    file_format= (type = csv field_delimiter=',' skip_header=1)
    pattern='.*Order.*'
    TRUNCATECOLUMNS = true;

 

  • 테이블 확인
SELECT * FROM ORDERS;






6. FORCE


이전에 테이블에 로드된 파일이 있어도 강제로 파일을 로드할 수 있게 함 

 

  • 테이블 생성
CREATE OR REPLACE TABLE  COPY_DB.PUBLIC.ORDERS (
    ORDER_ID VARCHAR(30),
    AMOUNT VARCHAR(30),
    PROFIT INT,
    QUANTITY INT,
    CATEGORY VARCHAR(30),
    SUBCATEGORY VARCHAR(30));

 

  • 스테이지 생성
CREATE OR REPLACE STAGE COPY_DB.PUBLIC.aws_stage_copy
    url='s3://snowflakebucket-copyoption/size/';

 

  • 스테이지 확인
LIST @COPY_DB.PUBLIC.aws_stage_copy;

 

  • 테이블 로드
COPY INTO COPY_DB.PUBLIC.ORDERS
    FROM @aws_stage_copy
    file_format= (type = csv field_delimiter=',' skip_header=1)
    pattern='.*Order.*';

 

  • 같은 테이블 로드 - 같은 테이블이기 때문에 Snowflake는 데이터를 로드하지 않음
COPY INTO COPY_DB.PUBLIC.ORDERS
    FROM @aws_stage_copy
    file_format= (type = csv field_delimiter=',' skip_header=1)
    pattern='.*Order.*';

 

  • 테이블 확인
SELECT * FROM ORDERS;

 

  • FORCE = TRUE 옵션을 통해 데이터가 중복되더라도 강제로 데이터를 로드함
COPY INTO COPY_DB.PUBLIC.ORDERS
    FROM @aws_stage_copy
    file_format= (type = csv field_delimiter=',' skip_header=1)
    pattern='.*Order.*'
    FORCE = TRUE;





7. Load history


database의 INFORMATION_SCHEMA - LOAD_HISTORY를 통해 로드한 쿼리를 복사하여 사용할 수 있음

 

  • database 지정
USE COPY_DB;

 

  • load history 보기
SELECT * FROM information_schema.load_history;

 

  • 모든 database에서 load history 보기
SELECT * FROM snowflake.account_usage.load_history;

 

  • 특정 테이블과 스키마를 지정하여 load history 보기
SELECT * FROM snowflake.account_usage.load_history
  where schema_name='PUBLIC' and
  table_name='ORDERS';
SELECT * FROM snowflake.account_usage.load_history
  where schema_name='PUBLIC' and
  table_name='ORDERS' and
  error_count > 0;
SELECT * FROM snowflake.account_usage.load_history
WHERE DATE(LAST_LOAD_TIME) <= DATEADD(days,-1,CURRENT_DATE);








Udemy의 'Snowflake - The Complete Masterclass (2023 Edition)'를 공부한 내용을 바탕으로 작성하였습니다.