Immersion In Data

Snowflake/Snowflake - The Complete Masterclass

[Snowflake] Snowpipe

sungjunminn 2023. 2. 7. 17:59

1. What is Snowpipe

  • 버킷에 파일이 나타나면 자동으로 데이터 로드를 활성화
  • 분석에 데이터를 즉시 사용하는 경우 적용
  • 웨어하우스 대신 서버리스를 사용

 



2. High-level steps

  • 스테이지 생성 - COPY COMMAND 테스트 - 파이프 생성 - S3 알림

 

 


3. Creating stage

  • 테이블 생성
CREATE OR REPLACE TABLE OUR_FIRST_DB.PUBLIC.employees (
  id INT,
  first_name STRING,
  last_name STRING,
  email STRING,
  location STRING,
  department STRING
  );

 

  • file format 생성(null값 처리)
CREATE OR REPLACE file format MANAGE_DB.file_formats.csv_fileformat
    type = csv
    field_delimiter = ','
    skip_header = 1
    null_if = ('NULL','null')
    empty_field_as_null = TRUE;



  • 스테이지 생성
CREATE OR REPLACE stage MANAGE_DB.external_stages.csv_folder
    URL = 's3://snowflakes3bucket112233/csv/snowpipe'
    STORAGE_INTEGRATION = s3_int
    FILE_FORMAT = MANAGE_DB.file_formats.csv_fileformat;

 

  • 스테이지 확인
LIST @MANAGE_DB.external_stages.csv_folder;

 




4. Creating pipe

  • 스키마 생성
CREATE OR REPLACE SCHEMA MANAGE_DB.pipes;

 

  • COPY Command 쿼리 작동 확인
COPY INTO OUR_FIRST_DB.PUBLIC.employees
FROM @MANAGE_DB.external_stages.csv_folder;

 

  • 파이프 생성
CREATE OR REPLACE pipe MANAGE_DB.pipes.employee_pipe
auto_ingest = TRUE
AS
COPY INTO OUR_FIRST_DB.PUBLIC.employees
FROM @MANAGE_DB.external_stages.csv_folder;



  • 파이프 확인하여 notification_channel값 복사(S3에 데이터가 들어오는 이벤트를 감지하기 위해) 
DESC pipe employee_pipe;

 

 


5. Configure pipe & notifications

  • 버킷 - 속성- 이벤트 알림 생성

 

 

  • 접두사 : 경로를 설정해 지정된 객체만 알림하도록 제한

 

 

  • 이벤트 유형 : 어떤 이벤트에서 객체가 생성될지 선택

 

 

  • 대상 : SQS 대기열 선택 및 SQS 대기열 ARN 입력란에 위에서 복사한 notification_channel값 입력

 

 

  • S3 버킷에 새로운 파일을 업로드 후 쿼리문으로 데이터 들어오는지 확인
SELECT * FROM OUR_FIRST_DB.PUBLIC.employees;

 

 

6. Error handling

  • 열이 맞지 않는 file format 생성(오류 발생)
CREATE OR REPLACE file format MANAGE_DB.file_formats.csv_fileformat
    type = csv
    field_delimiter = ','
    skip_header = 1
    null_if = ('NULL','null')
    empty_field_as_null = TRUE;

 

  • COPY Command 실행(오류 발생 : 파일에 단 하나의 열만 있으며,  파이프로 설정했기 때문에 열 구분 기호로 해석되지 않음, 모든 쉼표는 무시되며 열 구분 기호로 해석되지 않음) 
COPY INTO OUR_FIRST_DB.PUBLIC.employees
FROM @MANAGE_DB.external_stages.csv_folder;

 

  • S3 버킷에 새로운 파일 업로드

 

  • 파이프라인 새로고침
ALTER PIPE MANAGE_DB.PIPES.EMPLOYEE_PIPE refresh;

 

  • 파이프라인이 작동하는지 확인
SELECT SYSTEM$PIPE_STATUS('employee_pipe');

 

  • 파이프라인 에러메세지 확인(지난 2시간의 에러메세지)
SELECT * FROM TABLE(VALIDATE_PIPE_LOAD(
    PIPE_NAME => 'MANAGE_DB.pipes.employee_pipe',
    START_TIME => DATEADD(HOUR,-2,CURRENT_TIMESTAMP())));

 

  • COPY Command별 에러메세지 확인
SELECT * FROM TABLE (INFORMATION_SCHEMA.COPY_HISTORY(
   table_name  =>  'OUR_FIRST_DB.PUBLIC.EMPLOYEES',
   START_TIME =>DATEADD(HOUR,-2,CURRENT_TIMESTAMP())));

 



7. Manage pipes

  • 파이프라인의 속성 얻기
DESC pipe MANAGE_DB.pipes.employee_pipe;

 

  • 파이프라인의 목록만 얻기
SHOW PIPES;

 

  • 와일드카드로 파이프라인 목록 얻기
SHOW PIPES like '%employee%';

 

  • 데이터베이스를 제한해서 파이프라인 목록 얻기
SHOW PIPES in database MANAGE_DB;

 

  • 스키마를 제한해서 파이프라인 목록 얻기
SHOW PIPES in schema MANAGE_DB.pipes;

 

  • 결합하여 사용
SHOW PIPES like '%employee%' in Database MANAGE_DB;




example 1 : 파이프라인을 변경할 때

  • 테이블 생성
CREATE OR REPLACE TABLE OUR_FIRST_DB.PUBLIC.employees2 (
  id INT,
  first_name STRING,
  last_name STRING,
  email STRING,
  location STRING,
  department STRING
  );

 

  • 파이프라인 중지
ALTER PIPE MANAGE_DB.pipes.employee_pipe SET PIPE_EXECUTION_PAUSED = true;

 

  • 파이프라인 중지 확인
SELECT SYSTEM$PIPE_STATUS('MANAGE_DB.pipes.employee_pipe');

 

  • 파이프라인 재생성
CREATE OR REPLACE pipe MANAGE_DB.pipes.employee_pipe
auto_ingest = TRUE
AS
COPY INTO OUR_FIRST_DB.PUBLIC.employees2
FROM @MANAGE_DB.external_stages.csv_folder;

 

  • 파이프라인 새로고침
ALTER PIPE  MANAGE_DB.pipes.employee_pipe refresh;

 

  • 스테이지 확인

 

LIST @MANAGE_DB.external_stages.csv_folder;

 

  • 데이터 확인
SELECT * FROM OUR_FIRST_DB.PUBLIC.employees2;

 

  • 버킷안의 데이터를 다시 로드
COPY INTO OUR_FIRST_DB.PUBLIC.employees2
FROM @MANAGE_DB.external_stages.csv_folder;

 

  • 파이프라인 재개(SET PIPE_EXECUTION_PAUSED = false)
ALTER PIPE MANAGE_DB.pipes.employee_pipe SET PIPE_EXECUTION_PAUSED = false;

 

  • 파이프라인 재개 확인
SELECT SYSTEM$PIPE_STATUS('MANAGE_DB.pipes.employee_pipe');

 

 

 

 

 

Udemy의 'Snowflake - The Complete Masterclass (2023 Edition)'를 공부한 내용을 바탕으로 작성하였습니다. 

'Snowflake > Snowflake - The Complete Masterclass' 카테고리의 다른 글

[Snowflake] Fail Safe  (0) 2023.02.08
[Snowflake] Time Travel  (0) 2023.02.08
[Snowflake] Loading from AWS  (0) 2023.02.06
[Snowflake] Performance optimization  (0) 2023.02.03
[Snowflake] Loading unstructured data  (0) 2023.02.02