1. What is Snowpipe
- 버킷에 파일이 나타나면 자동으로 데이터 로드를 활성화
- 분석에 데이터를 즉시 사용하는 경우 적용
- 웨어하우스 대신 서버리스를 사용
2. High-level steps
- 스테이지 생성 - COPY COMMAND 테스트 - 파이프 생성 - S3 알림
3. Creating stage
- 테이블 생성
CREATE OR REPLACE TABLE OUR_FIRST_DB.PUBLIC.employees (
id INT,
first_name STRING,
last_name STRING,
email STRING,
location STRING,
department STRING
);
- file format 생성(null값 처리)
CREATE OR REPLACE file format MANAGE_DB.file_formats.csv_fileformat
type = csv
field_delimiter = ','
skip_header = 1
null_if = ('NULL','null')
empty_field_as_null = TRUE;
- 스테이지 생성
CREATE OR REPLACE stage MANAGE_DB.external_stages.csv_folder
URL = 's3://snowflakes3bucket112233/csv/snowpipe'
STORAGE_INTEGRATION = s3_int
FILE_FORMAT = MANAGE_DB.file_formats.csv_fileformat;
- 스테이지 확인
LIST @MANAGE_DB.external_stages.csv_folder;
4. Creating pipe
- 스키마 생성
CREATE OR REPLACE SCHEMA MANAGE_DB.pipes;
- COPY Command 쿼리 작동 확인
COPY INTO OUR_FIRST_DB.PUBLIC.employees
FROM @MANAGE_DB.external_stages.csv_folder;
- 파이프 생성
CREATE OR REPLACE pipe MANAGE_DB.pipes.employee_pipe
auto_ingest = TRUE
AS
COPY INTO OUR_FIRST_DB.PUBLIC.employees
FROM @MANAGE_DB.external_stages.csv_folder;
- 파이프 확인하여 notification_channel값 복사(S3에 데이터가 들어오는 이벤트를 감지하기 위해)
DESC pipe employee_pipe;
5. Configure pipe & notifications
- 버킷 - 속성- 이벤트 알림 생성
- 접두사 : 경로를 설정해 지정된 객체만 알림하도록 제한
- 이벤트 유형 : 어떤 이벤트에서 객체가 생성될지 선택
- 대상 : SQS 대기열 선택 및 SQS 대기열 ARN 입력란에 위에서 복사한 notification_channel값 입력
- S3 버킷에 새로운 파일을 업로드 후 쿼리문으로 데이터 들어오는지 확인
SELECT * FROM OUR_FIRST_DB.PUBLIC.employees;
6. Error handling
- 열이 맞지 않는 file format 생성(오류 발생)
CREATE OR REPLACE file format MANAGE_DB.file_formats.csv_fileformat
type = csv
field_delimiter = ','
skip_header = 1
null_if = ('NULL','null')
empty_field_as_null = TRUE;
- COPY Command 실행(오류 발생 : 파일에 단 하나의 열만 있으며, 파이프로 설정했기 때문에 열 구분 기호로 해석되지 않음, 모든 쉼표는 무시되며 열 구분 기호로 해석되지 않음)
COPY INTO OUR_FIRST_DB.PUBLIC.employees
FROM @MANAGE_DB.external_stages.csv_folder;
- S3 버킷에 새로운 파일 업로드
- 파이프라인 새로고침
ALTER PIPE MANAGE_DB.PIPES.EMPLOYEE_PIPE refresh;
- 파이프라인이 작동하는지 확인
SELECT SYSTEM$PIPE_STATUS('employee_pipe');
- 파이프라인 에러메세지 확인(지난 2시간의 에러메세지)
SELECT * FROM TABLE(VALIDATE_PIPE_LOAD(
PIPE_NAME => 'MANAGE_DB.pipes.employee_pipe',
START_TIME => DATEADD(HOUR,-2,CURRENT_TIMESTAMP())));
- COPY Command별 에러메세지 확인
SELECT * FROM TABLE (INFORMATION_SCHEMA.COPY_HISTORY(
table_name => 'OUR_FIRST_DB.PUBLIC.EMPLOYEES',
START_TIME =>DATEADD(HOUR,-2,CURRENT_TIMESTAMP())));
7. Manage pipes
- 파이프라인의 속성 얻기
DESC pipe MANAGE_DB.pipes.employee_pipe;
- 파이프라인의 목록만 얻기
SHOW PIPES;
- 와일드카드로 파이프라인 목록 얻기
SHOW PIPES like '%employee%';
- 데이터베이스를 제한해서 파이프라인 목록 얻기
SHOW PIPES in database MANAGE_DB;
- 스키마를 제한해서 파이프라인 목록 얻기
SHOW PIPES in schema MANAGE_DB.pipes;
- 결합하여 사용
SHOW PIPES like '%employee%' in Database MANAGE_DB;
example 1 : 파이프라인을 변경할 때
- 테이블 생성
CREATE OR REPLACE TABLE OUR_FIRST_DB.PUBLIC.employees2 (
id INT,
first_name STRING,
last_name STRING,
email STRING,
location STRING,
department STRING
);
- 파이프라인 중지
ALTER PIPE MANAGE_DB.pipes.employee_pipe SET PIPE_EXECUTION_PAUSED = true;
- 파이프라인 중지 확인
SELECT SYSTEM$PIPE_STATUS('MANAGE_DB.pipes.employee_pipe');
- 파이프라인 재생성
CREATE OR REPLACE pipe MANAGE_DB.pipes.employee_pipe
auto_ingest = TRUE
AS
COPY INTO OUR_FIRST_DB.PUBLIC.employees2
FROM @MANAGE_DB.external_stages.csv_folder;
- 파이프라인 새로고침
ALTER PIPE MANAGE_DB.pipes.employee_pipe refresh;
- 스테이지 확인
LIST @MANAGE_DB.external_stages.csv_folder;
- 데이터 확인
SELECT * FROM OUR_FIRST_DB.PUBLIC.employees2;
- 버킷안의 데이터를 다시 로드
COPY INTO OUR_FIRST_DB.PUBLIC.employees2
FROM @MANAGE_DB.external_stages.csv_folder;
- 파이프라인 재개(SET PIPE_EXECUTION_PAUSED = false)
ALTER PIPE MANAGE_DB.pipes.employee_pipe SET PIPE_EXECUTION_PAUSED = false;
- 파이프라인 재개 확인
SELECT SYSTEM$PIPE_STATUS('MANAGE_DB.pipes.employee_pipe');
Udemy의 'Snowflake - The Complete Masterclass (2023 Edition)'를 공부한 내용을 바탕으로 작성하였습니다.
'Snowflake > Snowflake - The Complete Masterclass' 카테고리의 다른 글
[Snowflake] Fail Safe (0) | 2023.02.08 |
---|---|
[Snowflake] Time Travel (0) | 2023.02.08 |
[Snowflake] Loading from AWS (0) | 2023.02.06 |
[Snowflake] Performance optimization (0) | 2023.02.03 |
[Snowflake] Loading unstructured data (0) | 2023.02.02 |