1. Understanding streams
- 소스 테이블에 변경 사항을 캡쳐하여
2. INSERT operation
- 일시 데이터베이스 생성
CREATE OR REPLACE TRANSIENT DATABASE STREAMS_DB;
- 소스 테이블 생성
create or replace table sales_raw_staging(
id varchar,
product varchar,
price varchar,
amount varchar,
store_id varchar);
- 소스 테이블에 데이터 삽입
insert into sales_raw_staging
values
(1,'Banana',1.99,1,1),
(2,'Lemon',0.99,1,1),
(3,'Apple',1.79,1,2),
(4,'Orange Juice',1.89,1,2),
(5,'Cereals',5.98,2,1);
- 소스 테이블의 store_id를 참조하는 테이블 생성
create or replace table store_table(
store_id number,
location varchar,
employees number);
- 참조 테이블에 데이터 삽입
INSERT INTO STORE_TABLE VALUES(1,'Chicago',33);
INSERT INTO STORE_TABLE VALUES(2,'London',12);
- final 테이블 생성 - 소스 테이블과 참조 테이블을 결합
create or replace table sales_final_table(
id int,
product varchar,
price number,
amount int,
store_id int,
location varchar,
employees int);
- final 테이블에 데이터 삽입 - 소스테이블과 참조테이블을 조인
INSERT INTO sales_final_table
SELECT
SA.id,
SA.product,
SA.price,
SA.amount,
ST.STORE_ID,
ST.LOCATION,
ST.EMPLOYEES
FROM SALES_RAW_STAGING SA
JOIN STORE_TABLE ST ON ST.STORE_ID=SA.STORE_ID ;
- stream 생성 - 소스테이블에 위치(소스테이블의 모든 변경사항이 캡처됨)
create or replace stream sales_stream on table sales_raw_staging;
- stream 확인
SHOW STREAMS;
DESC STREAM sales_stream;
- 아직 소스테이블에 어떠한 변화도 일어나지 않았기 때문에 stream 객체가 비어있는 것을 확인
select * from sales_stream;
- 소스테이블의 데이터 확인
select * from sales_raw_staging;
- 소스 테이블에 데이터 삽입
insert into sales_raw_staging
values
(6,'Mango',1.99,1,2),
(7,'Garlic',0.99,1,1);
- 소스테이블에 두 개의 데이터가 들어왔기 때문에 stream 객체에도 두 데이터가 들어옴
select * from sales_stream;
- 소스테이블 확인
select * from sales_raw_staging;
- final 테이블을 업데이트하지 않았기 때문에 변동사항 없음
select * from sales_final_table;
- final 테이블에 데이터 삽입 - stream 객체와 참조테이블을 조인
INSERT INTO sales_final_table
SELECT
SA.id,
SA.product,
SA.price,
SA.amount,
ST.STORE_ID,
ST.LOCATION,
ST.EMPLOYEES
FROM SALES_STREAM SA
JOIN STORE_TABLE ST ON ST.STORE_ID=SA.STORE_ID ;
- stream 객체를 이용하여 final 테이블에 데이터를 삽입했기 때문에 stream 객체는 비어있게 됨
select * from sales_stream;
- 소스테이블에 데이터 삽입
insert into sales_raw_staging
values
(8,'Paprika',4.99,1,2),
(9,'Tomato',3.99,1,2);
- stream 객체에 소스테이블의 변경사항이 생성됨
select * from sales_stream;
- final 테이블에 데이터 삽입 - stream 객체와 참조테이블을 조인
INSERT INTO sales_final_table
SELECT
SA.id,
SA.product,
SA.price,
SA.amount,
ST.STORE_ID,
ST.LOCATION,
ST.EMPLOYEES
FROM SALES_STREAM SA
JOIN STORE_TABLE ST ON ST.STORE_ID=SA.STORE_ID ;
- final 테이블 확인
SELECT * FROM SALES_FINAL_TABLE;
- 소스테이블 확인
SELECT * FROM SALES_RAW_STAGING;
- stream 객체 확인
SELECT * FROM SALES_STREAM;
3. UPDATE operation
example 1
- 소스테이블 변경
UPDATE SALES_RAW_STAGING
SET PRODUCT ='Potato' WHERE PRODUCT = 'Banana';
- 소스테이블 확인 - 데이터가 변경됨
SELECT * FROM SALES_RAW_STAGING;
- stream 객체 확인 - UPDATE를 실행하면 변경된 데이터가 삭제되고 다시 삽입되는 절차가 생김
SELECT * FROM SALES_STREAM;
- merge를 통해 final 테이블에서 업데이트 처리
merge into SALES_FINAL_TABLE F -- Target table to merge changes from source table
using SALES_STREAM S -- Stream that has captured the changes
on f.id = s.id
when matched
and S.METADATA$ACTION ='INSERT'
and S.METADATA$ISUPDATE ='TRUE' -- Indicates the record has been updated
then update
set f.product = s.product,
f.price = s.price,
f.amount= s.amount,
f.store_id=s.store_id;
- final 테이블 확인
SELECT * FROM SALES_FINAL_TABLE;
- 소스테이블 확인
SELECT * FROM SALES_RAW_STAGING;
- stream 객체 확인
SELECT * FROM SALES_STREAM;
example 2
- 소스테이블 변경
UPDATE SALES_RAW_STAGING
SET PRODUCT ='Green apple' WHERE PRODUCT = 'Apple';
- merge를 통해 final 테이블에서 업데이트 처리
merge into SALES_FINAL_TABLE F -- Target table to merge changes from source table
using SALES_STREAM S -- Stream that has captured the changes
on f.id = s.id
when matched
and S.METADATA$ACTION ='INSERT'
and S.METADATA$ISUPDATE ='TRUE' -- Indicates the record has been updated
then update
set f.product = s.product,
f.price = s.price,
f.amount= s.amount,
f.store_id=s.store_id;
- 소스테이블 확인
SELECT * FROM SALES_RAW_STAGING;
- stream 객체 확인
SELECT * FROM SALES_STREAM;
- final 테이블 확인
SELECT * FROM SALES_FINAL_TABLE;
4. DELETE operation
- 소스테이블 확인
SELECT * FROM SALES_RAW_STAGING;
- stream 객체 확인
SELECT * FROM SALES_STREAM;
- final 테이블 확인
SELECT * FROM SALES_FINAL_TABLE;
- 소스테이블에서 데이터 삭제
DELETE FROM SALES_RAW_STAGING
WHERE PRODUCT = 'Lemon';
- merge를 통해 final 테이블에서 삭제 처리
merge into SALES_FINAL_TABLE F -- Target table to merge changes from source table
using SALES_STREAM S -- Stream that has captured the changes
on f.id = s.id
when matched
and S.METADATA$ACTION ='DELETE'
and S.METADATA$ISUPDATE = 'FALSE'
then delete;
5. Process all data changes
- 소스테이블에 데이터 삽입
INSERT INTO SALES_RAW_STAGING VALUES (2,'Lemon',0.99,1,1);
- DELETE, UPDATE, INSERT를 모두 포함하는 merge(stream 객체와 final 테이블에 데이터 변동 확인)
merge into SALES_FINAL_TABLE F -- Target table to merge changes from source table
USING ( SELECT STRE.*,ST.location,ST.employees
FROM SALES_STREAM STRE
JOIN STORE_TABLE ST
ON STRE.store_id = ST.store_id
) S
ON F.id=S.id
when matched -- DELETE condition
and S.METADATA$ACTION ='DELETE'
and S.METADATA$ISUPDATE = 'FALSE'
then delete
when matched -- UPDATE condition
and S.METADATA$ACTION ='INSERT'
and S.METADATA$ISUPDATE = 'TRUE'
then update
set f.product = s.product,
f.price = s.price,
f.amount= s.amount,
f.store_id=s.store_id
when not matched
and S.METADATA$ACTION ='INSERT'
then insert
(id,product,price,store_id,amount,employees,location)
values
(s.id, s.product,s.price,s.store_id,s.amount,s.employees,s.location);
- 소스테이블 데이터 업데이트
UPDATE SALES_RAW_STAGING
SET PRODUCT = 'Lemonade'
WHERE PRODUCT ='Lemon';
- DELETE, UPDATE, INSERT를 모두 포함하는 merge(stream 객체와 final 테이블에 데이터 변동 확인)
merge into SALES_FINAL_TABLE F -- Target table to merge changes from source table
USING ( SELECT STRE.*,ST.location,ST.employees
FROM SALES_STREAM STRE
JOIN STORE_TABLE ST
ON STRE.store_id = ST.store_id
) S
ON F.id=S.id
when matched -- DELETE condition
and S.METADATA$ACTION ='DELETE'
and S.METADATA$ISUPDATE = 'FALSE'
then delete
when matched -- UPDATE condition
and S.METADATA$ACTION ='INSERT'
and S.METADATA$ISUPDATE = 'TRUE'
then update
set f.product = s.product,
f.price = s.price,
f.amount= s.amount,
f.store_id=s.store_id
when not matched
and S.METADATA$ACTION ='INSERT'
then insert
(id,product,price,store_id,amount,employees,location)
values
(s.id, s.product,s.price,s.store_id,s.amount,s.employees,s.location);
- 소스테이블에 데이터 삭제 (stream 객체와 final 테이블에 데이터 변동 확인)
DELETE FROM SALES_RAW_STAGING
WHERE PRODUCT = 'Lemonade';
- DELETE, UPDATE, INSERT를 모두 포함하는 merge
merge into SALES_FINAL_TABLE F -- Target table to merge changes from source table
USING ( SELECT STRE.*,ST.location,ST.employees
FROM SALES_STREAM STRE
JOIN STORE_TABLE ST
ON STRE.store_id = ST.store_id
) S
ON F.id=S.id
when matched -- DELETE condition
and S.METADATA$ACTION ='DELETE'
and S.METADATA$ISUPDATE = 'FALSE'
then delete
when matched -- UPDATE condition
and S.METADATA$ACTION ='INSERT'
and S.METADATA$ISUPDATE = 'TRUE'
then update
set f.product = s.product,
f.price = s.price,
f.amount= s.amount,
f.store_id=s.store_id
when not matched
and S.METADATA$ACTION ='INSERT'
then insert
(id,product,price,store_id,amount,employees,location)
values
(s.id, s.product,s.price,s.store_id,s.amount,s.employees,s.location);
6. Combine streams & tasks
- task 생성
CREATE OR REPLACE TASK all_data_changes
WAREHOUSE = COMPUTE_WH
SCHEDULE = '1 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('SALES_STREAM')
AS
merge into SALES_FINAL_TABLE F -- Target table to merge changes from source table
USING ( SELECT STRE.*,ST.location,ST.employees
FROM SALES_STREAM STRE
JOIN STORE_TABLE ST
ON STRE.store_id = ST.store_id
) S
ON F.id=S.id
when matched -- DELETE condition
and S.METADATA$ACTION ='DELETE'
and S.METADATA$ISUPDATE = 'FALSE'
then delete
when matched -- UPDATE condition
and S.METADATA$ACTION ='INSERT'
and S.METADATA$ISUPDATE = 'TRUE'
then update
set f.product = s.product,
f.price = s.price,
f.amount= s.amount,
f.store_id=s.store_id
when not matched
and S.METADATA$ACTION ='INSERT'
then insert
(id,product,price,store_id,amount,employees,location)
values
(s.id, s.product,s.price,s.store_id,s.amount,s.employees,s.location);
- task 시작
ALTER TASK all_data_changes RESUME;
- task 확인
SHOW TASKS;
- 소스테이블에 데이터 삽입
INSERT INTO SALES_RAW_STAGING VALUES (11,'Milk',1.99,1,2);
INSERT INTO SALES_RAW_STAGING VALUES (12,'Chocolate',4.49,1,2);
INSERT INTO SALES_RAW_STAGING VALUES (13,'Cheese',3.89,1,1);
- 소스테이블 데이터 변경
UPDATE SALES_RAW_STAGING
SET PRODUCT = 'Chocolate bar'
WHERE PRODUCT ='Chocolate';
- 소스테이블 데이터 삭제
DELETE FROM SALES_RAW_STAGING
WHERE PRODUCT = 'Mango';
- 소스테이블 확인
SELECT * FROM SALES_RAW_STAGING;
- stream 객체 확인(task가 실행되어 이미 stream에는 데이터가 없음)
SELECT * FROM SALES_STREAM;
- final 테이블 확인(task가 실행되어 이미 final 테이블에 소스테이블의 변동사항이 적용됨)
SELECT * FROM SALES_FINAL_TABLE;
- task history 확인
select *
from table(information_schema.task_history())
order by name asc,scheduled_time desc;
7. Types of streams
- stream 확인 - mode : standard(default)/append-only
SHOW STREAMS;
cf) standard vs append-only
standard : INSERT, UPDATE, DELETE
append-only : INSERT
- 소스테이블 확인
SELECT * FROM SALES_RAW_STAGING;
- standard stream 객체 생성
CREATE OR REPLACE STREAM SALES_STREAM_DEFAULT
ON TABLE SALES_RAW_STAGING;
- append-only stream 객체 생성
CREATE OR REPLACE STREAM SALES_STREAM_APPEND
ON TABLE SALES_RAW_STAGING
APPEND_ONLY = TRUE;
- stream 확인
SHOW STREAMS;
- 소스테이블에 데이터 삽입
INSERT INTO SALES_RAW_STAGING VALUES (14,'Honey',4.99,1,1);
INSERT INTO SALES_RAW_STAGING VALUES (15,'Coffee',4.89,1,2);
INSERT INTO SALES_RAW_STAGING VALUES (15,'Coffee',4.89,1,2);
- stream 객체 확인
SELECT * FROM SALES_STREAM_APPEND;
SELECT * FROM SALES_STREAM_DEFAULT;
- 소스테이블 확인
SELECT * FROM SALES_RAW_STAGING;
- 소스테이블 데이터 삭제
DELETE FROM SALES_RAW_STAGING WHERE ID=7;
- stream 객체 확인 - append-only stream에는 삭제 데이터가 캡처되지 않은 것을 확인할 수 있다.
SELECT * FROM SALES_STREAM_APPEND;
SELECT * FROM SALES_STREAM_DEFAULT;
- 임시테이블을 만들어서 stream 객체안의 데이터 소비
CREATE OR REPLACE TEMPORARY TABLE PRODUCT_TABLE
AS SELECT * FROM SALES_STREAM_DEFAULT;
CREATE OR REPLACE TEMPORARY TABLE PRODUCT_TABLE
AS SELECT * FROM SALES_STREAM_APPEND;
- 소스테이블 데이터 변경
UPDATE SALES_RAW_STAGING
SET PRODUCT = 'Coffee 200g'
WHERE PRODUCT ='Coffee';
- stream 객체 확인
SELECT * FROM SALES_STREAM_APPEND;
SELECT * FROM SALES_STREAM_DEFAULT;
8. Changes clause
- 데이터베이스 생성
CREATE OR REPLACE DATABASE SALES_DB;
- 테이블 생성
create or replace table sales_raw(
id varchar,
product varchar,
price varchar,
amount varchar,
store_id varchar);
- 데이터 삽입
insert into sales_raw
values
(1, 'Eggs', 1.39, 1, 1),
(2, 'Baking powder', 0.99, 1, 1),
(3, 'Eggplants', 1.79, 1, 2),
(4, 'Ice cream', 1.89, 1, 2),
(5, 'Oats', 1.98, 2, 1);
- 변경사항 추적을 위한 테이블 속성 변경
ALTER TABLE sales_raw
SET CHANGE_TRACKING = TRUE;
- time travel을 이용해 30초 전의 변경사항 확인
SELECT * FROM SALES_RAW
CHANGES(information => default)
AT (offset => -0.5*60);
- time stamp 확인
SELECT CURRENT_TIMESTAMP;
- 테이블에 데이터 삽입
INSERT INTO SALES_RAW VALUES (6, 'Bread', 2.99, 1, 2);
INSERT INTO SALES_RAW VALUES (7, 'Onions', 2.89, 1, 2);
- 위에서 확인한 time stamp로 이 시간의 변경사항 확인 - 위에서 두가지 데이터를 삽입했기 때문에 2개의 변경사항을 확인할 수 있음
SELECT * FROM SALES_RAW
CHANGES(information => default)
AT (timestamp => 'your-timestamp'::timestamp_tz);
- 데이터 변경
UPDATE SALES_RAW
SET PRODUCT = 'Toast2' WHERE ID=6;
- 위에서 확인한 time stamp로 이 시간의 변경사항 확인 - 같은 데이터를 변경하면, 최신 변경사항만 확인 가능
SELECT * FROM SALES_RAW
CHANGES(information => default)
AT (timestamp => 'your-timestamp'::timestamp_tz);
- append-only 타입 stream에서는 삽입 외에는 캡처되지 않음
SELECT * FROM SALES_RAW
CHANGES(information => append_only)
AT (timestamp => 'your-timestamp'::timestamp_tz);
- 테이블 생성
CREATE OR REPLACE TABLE PRODUCTS
AS
SELECT * FROM SALES_RAW
CHANGES(information => append_only)
AT (timestamp => 'your-timestamp'::timestamp_tz);
- 생성한 테이블 확인
SELECT * FROM PRODUCTS;
Udemy의 'Snowflake - The Complete Masterclass (2023 Edition)'를 공부한 내용을 바탕으로 작성하였습니다.
'Snowflake > Snowflake - The Complete Masterclass' 카테고리의 다른 글
[Snowflake] Dynamic Data Masking (0) | 2023.02.21 |
---|---|
[Snowflake] Materialized Views (0) | 2023.02.20 |
[Snowflake] Scheduling Tasks (0) | 2023.02.14 |
[Snowflake] Data Sampling (0) | 2023.02.14 |
[Snowflake] Data sharing (0) | 2023.02.13 |