Immersion In Data

Snowflake/Snowflake - The Complete Masterclass

[Snowflake] Streams

sungjunminn 2023. 2. 20. 11:30

1. Understanding streams

  • 소스 테이블에 변경 사항을 캡쳐하여 

 



2. INSERT operation

  • 일시 데이터베이스 생성
CREATE OR REPLACE TRANSIENT DATABASE STREAMS_DB;

 

  • 소스 테이블 생성
create or replace table sales_raw_staging(
  id varchar,
  product varchar,
  price varchar,
  amount varchar,
  store_id varchar);

 

  •   소스 테이블에 데이터 삽입 
insert into sales_raw_staging 
    values
        (1,'Banana',1.99,1,1),
        (2,'Lemon',0.99,1,1),
        (3,'Apple',1.79,1,2),
        (4,'Orange Juice',1.89,1,2),
        (5,'Cereals',5.98,2,1);

 

  • 소스 테이블의 store_id를 참조하는 테이블 생성
create or replace table store_table(
  store_id number,
  location varchar,
  employees number);

 

  • 참조 테이블에 데이터 삽입
INSERT INTO STORE_TABLE VALUES(1,'Chicago',33);
INSERT INTO STORE_TABLE VALUES(2,'London',12);

 

  • final 테이블 생성 - 소스 테이블과 참조 테이블을 결합
create or replace table sales_final_table(
  id int,
  product varchar,
  price number,
  amount int,
  store_id int,
  location varchar,
  employees int);

 

  •  final 테이블에 데이터 삽입 - 소스테이블과 참조테이블을 조인
INSERT INTO sales_final_table 
    SELECT 
    SA.id,
    SA.product,
    SA.price,
    SA.amount,
    ST.STORE_ID,
    ST.LOCATION, 
    ST.EMPLOYEES 
    FROM SALES_RAW_STAGING SA
    JOIN STORE_TABLE ST ON ST.STORE_ID=SA.STORE_ID ;

 

  • stream 생성 - 소스테이블에 위치(소스테이블의 모든 변경사항이 캡처됨)
create or replace stream sales_stream on table sales_raw_staging;

 

  • stream 확인
SHOW STREAMS;

DESC STREAM sales_stream;

 

  • 아직 소스테이블에 어떠한 변화도 일어나지 않았기 때문에 stream 객체가 비어있는 것을 확인
select * from sales_stream;

 

  • 소스테이블의 데이터 확인
select * from sales_raw_staging;

 

  • 소스 테이블에 데이터 삽입 
insert into sales_raw_staging  
    values
        (6,'Mango',1.99,1,2),
        (7,'Garlic',0.99,1,1);

 

  • 소스테이블에 두 개의 데이터가 들어왔기 때문에 stream 객체에도 두 데이터가 들어옴
select * from sales_stream;

 

  • 소스테이블 확인
select * from sales_raw_staging;

 

  • final 테이블을 업데이트하지 않았기 때문에 변동사항 없음
select * from sales_final_table;

 

  • final 테이블에 데이터 삽입 - stream 객체와 참조테이블을 조인  
INSERT INTO sales_final_table 
    SELECT 
    SA.id,
    SA.product,
    SA.price,
    SA.amount,
    ST.STORE_ID,
    ST.LOCATION, 
    ST.EMPLOYEES 
    FROM SALES_STREAM SA
    JOIN STORE_TABLE ST ON ST.STORE_ID=SA.STORE_ID ;

 

  • stream 객체를 이용하여 final 테이블에 데이터를 삽입했기 때문에 stream 객체는 비어있게 됨
select * from sales_stream;

 

  • 소스테이블에 데이터 삽입 
insert into sales_raw_staging  
    values
        (8,'Paprika',4.99,1,2),
        (9,'Tomato',3.99,1,2);

 

  • stream 객체에 소스테이블의 변경사항이 생성됨
select * from sales_stream;

 

  • final 테이블에 데이터 삽입 - stream 객체와 참조테이블을 조인  
INSERT INTO sales_final_table 
    SELECT 
    SA.id,
    SA.product,
    SA.price,
    SA.amount,
    ST.STORE_ID,
    ST.LOCATION, 
    ST.EMPLOYEES 
    FROM SALES_STREAM SA
    JOIN STORE_TABLE ST ON ST.STORE_ID=SA.STORE_ID ;

 

  • final 테이블 확인              
SELECT * FROM SALES_FINAL_TABLE;

 

  • 소스테이블 확인
SELECT * FROM SALES_RAW_STAGING;

 

  • stream 객체 확인        
SELECT * FROM SALES_STREAM;

 





3. UPDATE operation



example 1 

  • 소스테이블 변경
UPDATE SALES_RAW_STAGING
SET PRODUCT ='Potato' WHERE PRODUCT = 'Banana';

 

  • 소스테이블 확인 - 데이터가 변경됨
SELECT * FROM SALES_RAW_STAGING;

 

  • stream 객체 확인 - UPDATE를 실행하면 변경된 데이터가 삭제되고 다시 삽입되는 절차가 생김
SELECT * FROM SALES_STREAM;

 

  • merge를 통해 final 테이블에서 업데이트 처리
merge into SALES_FINAL_TABLE F      -- Target table to merge changes from source table
using SALES_STREAM S                -- Stream that has captured the changes
   on  f.id = s.id                 
when matched 
    and S.METADATA$ACTION ='INSERT'
    and S.METADATA$ISUPDATE ='TRUE'        -- Indicates the record has been updated 
    then update 
    set f.product = s.product,
        f.price = s.price,
        f.amount= s.amount,
        f.store_id=s.store_id;

 

  • final 테이블 확인
SELECT * FROM SALES_FINAL_TABLE;

 

  • 소스테이블 확인
SELECT * FROM SALES_RAW_STAGING;

 

  • stream 객체 확인
SELECT * FROM SALES_STREAM;

 


example 2

  • 소스테이블 변경
UPDATE SALES_RAW_STAGING
SET PRODUCT ='Green apple' WHERE PRODUCT = 'Apple';

 

  • merge를 통해 final 테이블에서 업데이트 처리
merge into SALES_FINAL_TABLE F      -- Target table to merge changes from source table
using SALES_STREAM S                -- Stream that has captured the changes
   on  f.id = s.id                 
when matched 
    and S.METADATA$ACTION ='INSERT'
    and S.METADATA$ISUPDATE ='TRUE'        -- Indicates the record has been updated 
    then update 
    set f.product = s.product,
        f.price = s.price,
        f.amount= s.amount,
        f.store_id=s.store_id;

 

  • 소스테이블 확인
SELECT * FROM SALES_RAW_STAGING;

 

  • stream 객체 확인
SELECT * FROM SALES_STREAM;

 

  • final 테이블 확인
SELECT * FROM SALES_FINAL_TABLE;

 

 



4. DELETE operation

  • 소스테이블 확인
SELECT * FROM SALES_RAW_STAGING;

 

  • stream 객체 확인
SELECT * FROM SALES_STREAM;

 

  • final 테이블 확인
SELECT * FROM SALES_FINAL_TABLE;

 

  • 소스테이블에서 데이터 삭제
DELETE FROM SALES_RAW_STAGING
WHERE PRODUCT = 'Lemon';

 

  • merge를 통해 final 테이블에서 삭제 처리
merge into SALES_FINAL_TABLE F      -- Target table to merge changes from source table
using SALES_STREAM S                -- Stream that has captured the changes
   on  f.id = s.id          
when matched 
    and S.METADATA$ACTION ='DELETE' 
    and S.METADATA$ISUPDATE = 'FALSE'
    then delete;

 

 

 

5. Process all data changes

  • 소스테이블에 데이터 삽입
INSERT INTO SALES_RAW_STAGING VALUES (2,'Lemon',0.99,1,1);

 

  • DELETE, UPDATE, INSERT를 모두 포함하는 merge(stream 객체와 final 테이블에 데이터 변동 확인)
merge into SALES_FINAL_TABLE F      -- Target table to merge changes from source table
USING ( SELECT STRE.*,ST.location,ST.employees
        FROM SALES_STREAM STRE
        JOIN STORE_TABLE ST
        ON STRE.store_id = ST.store_id
       ) S
ON F.id=S.id
when matched                        -- DELETE condition
    and S.METADATA$ACTION ='DELETE' 
    and S.METADATA$ISUPDATE = 'FALSE'
    then delete                   
when matched                        -- UPDATE condition
    and S.METADATA$ACTION ='INSERT' 
    and S.METADATA$ISUPDATE  = 'TRUE'       
    then update 
    set f.product = s.product,
        f.price = s.price,
        f.amount= s.amount,
        f.store_id=s.store_id
when not matched 
    and S.METADATA$ACTION ='INSERT'
    then insert 
    (id,product,price,store_id,amount,employees,location)
    values
    (s.id, s.product,s.price,s.store_id,s.amount,s.employees,s.location);

 

  • 소스테이블 데이터 업데이트
UPDATE SALES_RAW_STAGING
SET PRODUCT = 'Lemonade'
WHERE PRODUCT ='Lemon';

 

  • DELETE, UPDATE, INSERT를 모두 포함하는 merge(stream 객체와 final 테이블에 데이터 변동 확인)
merge into SALES_FINAL_TABLE F      -- Target table to merge changes from source table
USING ( SELECT STRE.*,ST.location,ST.employees
        FROM SALES_STREAM STRE
        JOIN STORE_TABLE ST
        ON STRE.store_id = ST.store_id
       ) S
ON F.id=S.id
when matched                        -- DELETE condition
    and S.METADATA$ACTION ='DELETE' 
    and S.METADATA$ISUPDATE = 'FALSE'
    then delete                   
when matched                        -- UPDATE condition
    and S.METADATA$ACTION ='INSERT' 
    and S.METADATA$ISUPDATE  = 'TRUE'       
    then update 
    set f.product = s.product,
        f.price = s.price,
        f.amount= s.amount,
        f.store_id=s.store_id
when not matched 
    and S.METADATA$ACTION ='INSERT'
    then insert 
    (id,product,price,store_id,amount,employees,location)
    values
    (s.id, s.product,s.price,s.store_id,s.amount,s.employees,s.location);

 

  • 소스테이블에 데이터 삭제 (stream 객체와 final 테이블에 데이터 변동 확인)
DELETE FROM SALES_RAW_STAGING
WHERE PRODUCT = 'Lemonade';

 

  • DELETE, UPDATE, INSERT를 모두 포함하는 merge
merge into SALES_FINAL_TABLE F      -- Target table to merge changes from source table
USING ( SELECT STRE.*,ST.location,ST.employees
        FROM SALES_STREAM STRE
        JOIN STORE_TABLE ST
        ON STRE.store_id = ST.store_id
       ) S
ON F.id=S.id
when matched                        -- DELETE condition
    and S.METADATA$ACTION ='DELETE' 
    and S.METADATA$ISUPDATE = 'FALSE'
    then delete                   
when matched                        -- UPDATE condition
    and S.METADATA$ACTION ='INSERT' 
    and S.METADATA$ISUPDATE  = 'TRUE'       
    then update 
    set f.product = s.product,
        f.price = s.price,
        f.amount= s.amount,
        f.store_id=s.store_id
when not matched 
    and S.METADATA$ACTION ='INSERT'
    then insert 
    (id,product,price,store_id,amount,employees,location)
    values
    (s.id, s.product,s.price,s.store_id,s.amount,s.employees,s.location);

 

 





6. Combine streams & tasks

  • task 생성
CREATE OR REPLACE TASK all_data_changes
    WAREHOUSE = COMPUTE_WH
    SCHEDULE = '1 MINUTE'
    WHEN SYSTEM$STREAM_HAS_DATA('SALES_STREAM')
    AS 
merge into SALES_FINAL_TABLE F      -- Target table to merge changes from source table
USING ( SELECT STRE.*,ST.location,ST.employees
        FROM SALES_STREAM STRE
        JOIN STORE_TABLE ST
        ON STRE.store_id = ST.store_id
       ) S
ON F.id=S.id
when matched                        -- DELETE condition
    and S.METADATA$ACTION ='DELETE' 
    and S.METADATA$ISUPDATE = 'FALSE'
    then delete                   
when matched                        -- UPDATE condition
    and S.METADATA$ACTION ='INSERT' 
    and S.METADATA$ISUPDATE  = 'TRUE'       
    then update 
    set f.product = s.product,
        f.price = s.price,
        f.amount= s.amount,
        f.store_id=s.store_id
when not matched 
    and S.METADATA$ACTION ='INSERT'
    then insert 
    (id,product,price,store_id,amount,employees,location)
    values
    (s.id, s.product,s.price,s.store_id,s.amount,s.employees,s.location);

 

  • task 시작
ALTER TASK all_data_changes RESUME;

 

  • task 확인
SHOW TASKS;

 

  • 소스테이블에 데이터 삽입
INSERT INTO SALES_RAW_STAGING VALUES (11,'Milk',1.99,1,2);
INSERT INTO SALES_RAW_STAGING VALUES (12,'Chocolate',4.49,1,2);
INSERT INTO SALES_RAW_STAGING VALUES (13,'Cheese',3.89,1,1);

 

  • 소스테이블 데이터 변경
UPDATE SALES_RAW_STAGING
SET PRODUCT = 'Chocolate bar'
WHERE PRODUCT ='Chocolate';

 

  • 소스테이블 데이터 삭제
DELETE FROM SALES_RAW_STAGING
WHERE PRODUCT = 'Mango';

 

  • 소스테이블 확인 
SELECT * FROM SALES_RAW_STAGING;

 

  • stream 객체 확인(task가 실행되어 이미 stream에는 데이터가 없음)
SELECT * FROM SALES_STREAM;

 

  • final 테이블 확인(task가 실행되어 이미 final 테이블에 소스테이블의 변동사항이 적용됨)
SELECT * FROM SALES_FINAL_TABLE;

 

  • task history 확인
select *
from table(information_schema.task_history())
order by name asc,scheduled_time desc;

 

 

 

 


7. Types of streams

  • stream 확인 - mode : standard(default)/append-only
SHOW STREAMS;

 


cf) standard vs append-only

standard : INSERT, UPDATE, DELETE
append-only : INSERT

 

  • 소스테이블 확인
SELECT * FROM SALES_RAW_STAGING;

 

  • standard stream 객체 생성
CREATE OR REPLACE STREAM SALES_STREAM_DEFAULT
ON TABLE SALES_RAW_STAGING;

 

  • append-only stream 객체 생성
CREATE OR REPLACE STREAM SALES_STREAM_APPEND
ON TABLE SALES_RAW_STAGING 
APPEND_ONLY = TRUE;

 

  • stream 확인
SHOW STREAMS;

 

  • 소스테이블에 데이터 삽입
INSERT INTO SALES_RAW_STAGING VALUES (14,'Honey',4.99,1,1);
INSERT INTO SALES_RAW_STAGING VALUES (15,'Coffee',4.89,1,2);
INSERT INTO SALES_RAW_STAGING VALUES (15,'Coffee',4.89,1,2);

 

  • stream 객체 확인
SELECT * FROM SALES_STREAM_APPEND;
SELECT * FROM SALES_STREAM_DEFAULT;

 

  • 소스테이블 확인
SELECT * FROM SALES_RAW_STAGING;

 

  • 소스테이블 데이터 삭제
DELETE FROM SALES_RAW_STAGING WHERE ID=7;

 

  • stream 객체 확인 - append-only stream에는 삭제 데이터가 캡처되지 않은 것을 확인할 수 있다. 
SELECT * FROM SALES_STREAM_APPEND;
SELECT * FROM SALES_STREAM_DEFAULT;

 

  • 임시테이블을 만들어서 stream 객체안의 데이터 소비
CREATE OR REPLACE TEMPORARY TABLE PRODUCT_TABLE
AS SELECT * FROM SALES_STREAM_DEFAULT;
CREATE OR REPLACE TEMPORARY TABLE PRODUCT_TABLE
AS SELECT * FROM SALES_STREAM_APPEND;

 

  • 소스테이블 데이터 변경
UPDATE SALES_RAW_STAGING
SET PRODUCT = 'Coffee 200g'
WHERE PRODUCT ='Coffee';

 

  • stream 객체 확인
SELECT * FROM SALES_STREAM_APPEND;
SELECT * FROM SALES_STREAM_DEFAULT;



 


8. Changes clause

  • 데이터베이스 생성
CREATE OR REPLACE DATABASE SALES_DB;

 

  • 테이블 생성
create or replace table sales_raw(
id varchar,
product varchar,
price varchar,
amount varchar,
store_id varchar);

 

  • 데이터 삽입
insert into sales_raw
values
(1, 'Eggs', 1.39, 1, 1),
(2, 'Baking powder', 0.99, 1, 1),
(3, 'Eggplants', 1.79, 1, 2),
(4, 'Ice cream', 1.89, 1, 2),
(5, 'Oats', 1.98, 2, 1);

 

  • 변경사항 추적을 위한 테이블 속성 변경
ALTER TABLE sales_raw
SET CHANGE_TRACKING = TRUE;

 

  • time travel을 이용해 30초 전의 변경사항 확인 
SELECT * FROM SALES_RAW
CHANGES(information => default)
AT (offset => -0.5*60);

 

  • time stamp 확인
SELECT CURRENT_TIMESTAMP;

 

  • 테이블에 데이터 삽입
INSERT INTO SALES_RAW VALUES (6, 'Bread', 2.99, 1, 2);
INSERT INTO SALES_RAW VALUES (7, 'Onions', 2.89, 1, 2);

 

  • 위에서 확인한 time stamp로 이 시간의 변경사항 확인 - 위에서 두가지 데이터를 삽입했기 때문에 2개의 변경사항을 확인할 수 있음
SELECT * FROM SALES_RAW
CHANGES(information  => default)
AT (timestamp => 'your-timestamp'::timestamp_tz);

 

  • 데이터 변경
UPDATE SALES_RAW
SET PRODUCT = 'Toast2' WHERE ID=6;

 

  • 위에서 확인한 time stamp로 이 시간의 변경사항 확인 - 같은 데이터를 변경하면, 최신 변경사항만 확인 가능
SELECT * FROM SALES_RAW
CHANGES(information  => default)
AT (timestamp => 'your-timestamp'::timestamp_tz);

 

  • append-only 타입 stream에서는 삽입 외에는 캡처되지 않음
SELECT * FROM SALES_RAW
CHANGES(information  => append_only)
AT (timestamp => 'your-timestamp'::timestamp_tz);

 

  • 테이블 생성
CREATE OR REPLACE TABLE PRODUCTS 
AS
SELECT * FROM SALES_RAW
CHANGES(information  => append_only)
AT (timestamp => 'your-timestamp'::timestamp_tz);

 

  • 생성한 테이블 확인
SELECT * FROM PRODUCTS;




 

 

 

 

 

Udemy의 'Snowflake - The Complete Masterclass (2023 Edition)'를 공부한 내용을 바탕으로 작성하였습니다. 

'Snowflake > Snowflake - The Complete Masterclass' 카테고리의 다른 글

[Snowflake] Dynamic Data Masking  (0) 2023.02.21
[Snowflake] Materialized Views  (0) 2023.02.20
[Snowflake] Scheduling Tasks  (0) 2023.02.14
[Snowflake] Data Sampling  (0) 2023.02.14
[Snowflake] Data sharing  (0) 2023.02.13