DEV Community

Cover image for Details of how Snowflake Streams work
Semernitskaya
Semernitskaya

Posted on

Details of how Snowflake Streams work

Snowflake is a multicloud Data Warehouse that can be used for building cloud data platform in your company. To keep track of data changes in a table, Snowflake has introduced the Table Streams feature.

Snowflake Table Streams (or Snowflake Streams for short) can be used for implementing Change Data Capture (CDC) architecture design pattern. See this article for more details about CDC pattern.
In my post I'm going to explain some details about Snowflake Streams and show how they can be created and used.

Snowflake Streams definition and types

Definition from the Snowflake documentation:

A stream object records data manipulation language (DML) changes made to tables, including inserts, updates, and deletes, as well as metadata about each change, so that actions can be taken using the changed data. ... A table stream ... makes a “change table” available of what changed, at the row level, between two transactional points of time in a table. This allows querying and consuming a sequence of change records in a transactional fashion.

A stream itself does not contain any table data (this can be important if you have GDPR or similar requirements for you system). A stream stores only an offset for the source table and not any actual table columns or data.

When queried, a stream accesses and returns the historic data in the same shape as the source table (i.e. the same column names and ordering) with the following additional columns:

METADATA$ACTION - DML operation type: INSERTED or DELETED
METADATA$ISUPDATE - Shows if the operation was part of an UPDATE statement. UPDATE operation is represented as a combination of DELETE and INSERT records in the stream with METADATA$ISUPDATE = TRUE
METADATA$ROW_ID - Unique and immutable row ID from the source table

Snowflake Streams types

Stream type Supporting tables Is insert only Comment
Standard Standard tables, directory tables NO Tracks all DML changes to the source table, including inserts, updates, and deletes (including table truncates)
Append-only Standard tables YES Tracks row inserts only. Update and delete operations (including table truncates) are not recorded
Insert-only External tables YES Tracks row inserts only. They do not record delete operations that remove rows from an inserted set

Snowflake Streams creation

Stream can be created using CREATE statement, by default Standard stream will be created.

  • Use APPEND_ONLY = TRUE to create Append-only stream, use INSERT_ONLY = TRUE to create Insert-only stream
  • Use SHOW_INITIAL_ROWS = TRUE if you need new stream to include records presented in the table before it was created
  • You can create several streams for the same table and then read from them and advance offsets independently (see the next section for more details about stream offset). Because stream doesn't store actual data - you can create as many streams as you need for a table without significant cost rise. Example:
-- create a table
create or replace table members (
  id number(8) not null,
  name varchar(255) default null,
  fee number(3) null
);

-- create a Standard stream 
create or replace stream member_check on table members;

-- create an Append-only stream
create or replace stream member_check_append on table members append_only=true;
Enter fullscreen mode Exit fullscreen mode
  • Use SHOW STREAMS command to show streams you have an access to. Example:
SHOW STREAMS;
Enter fullscreen mode Exit fullscreen mode
  • Use STREAM_HAS_DATA option to check if stream has data or not. Example:
SELECT SYSTEM$STREAM_HAS_DATA('member_check');
Enter fullscreen mode Exit fullscreen mode
  • Insert-only stream is only supported for external tables. It can be created for a regular table but in this case this stream will act like a standard stream. Example:
-- for the members table

-- this stream will act like Standard stream
create or replace stream member_check_insert on table members insert_only=true;
Enter fullscreen mode Exit fullscreen mode

Snowflake Streams querying. Stream offset

We can query a stream the same way we query a view or a table. All the changes in the source table are visible in the stream only after they are committed. The stream returns all of the columns from the table and its metadata columns. Example:

-- for the members table

-- insert values
insert into members (id,name,fee)
values
(1,'Joe',0),
(2,'Jane',0),
(3,'George',0),
(4,'Betty',0),
(5,'Sally',0);
commit;

-- check streams
select * from member_check;
/**
ID  NAME    FEE METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID
1   Joe 0   INSERT  FALSE   e585f6fd43b0c68492d11c3e9af24198134c011d
2   Jane    0   INSERT  FALSE   878c5702cfe33d390310ec55f0003f320210f721
3   George  0   INSERT  FALSE   b8f4e37b5f8ba669656e8b7e961c473f872d10e9
4   Betty   0   INSERT  FALSE   116dff9927a916f986f4701ead7d73f30b9d2ff7
5   Sally   0   INSERT  FALSE   f739b6f1785779f728a52dfd1db7b2036c44ce96
**/

select * from member_check_append;
/**
ID  NAME    FEE METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID
1   Joe 0   INSERT  FALSE   e585f6fd43b0c68492d11c3e9af24198134c011d
2   Jane    0   INSERT  FALSE   878c5702cfe33d390310ec55f0003f320210f721
3   George  0   INSERT  FALSE   b8f4e37b5f8ba669656e8b7e961c473f872d10e9
4   Betty   0   INSERT  FALSE   116dff9927a916f986f4701ead7d73f30b9d2ff7
5   Sally   0   INSERT  FALSE   f739b6f1785779f728a52dfd1db7b2036c44ce96
**/
Enter fullscreen mode Exit fullscreen mode

As it was mentioned above the stream only stores the offset for the source table, then it uses the versioning history for the source table to show changed records.
 
Just querying the data from the stream will not change the offset. If we run the same SELECT statements from the code snippet above - the same 5 records will be returned.
 
A stream advances its offset only when it is used in a DML transaction (advancing offset for the stream can be compared to committing offset for Kafka topic for example). The standard approach here is to read changed records from the stream and insert them into some other table in a DML transaction. Example:

-- for the members table

-- create history table
create or replace table members_history (
  id number(8) not null,
  name varchar(255) default null
);

-- advance the offset of the stream member_check_append
begin;
insert into members_history 
select id, name from member_check_append;
commit;

-- check streams again
-- member_check remains the same, because we haven't advanced its offset yet
select * from member_check;
/**
ID  NAME    FEE METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID
1   Joe 0   INSERT  FALSE   e585f6fd43b0c68492d11c3e9af24198134c011d
2   Jane    0   INSERT  FALSE   878c5702cfe33d390310ec55f0003f320210f721
3   George  0   INSERT  FALSE   b8f4e37b5f8ba669656e8b7e961c473f872d10e9
4   Betty   0   INSERT  FALSE   116dff9927a916f986f4701ead7d73f30b9d2ff7
5   Sally   0   INSERT  FALSE   f739b6f1785779f728a52dfd1db7b2036c44ce96
**/

-- member_check_append is empty because we've advanced the offset for this stream
select * from member_check_append;
/**
ID  NAME    FEE METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID
**/
Enter fullscreen mode Exit fullscreen mode

NOTE
Just putting SELECT from the stream between BEGIN and COMMIT statements won't change the offset of the stream. Example:

-- for the members table

begin;
-- won't advance the offset
select * from member_check;
commit;
Enter fullscreen mode Exit fullscreen mode

If you need to advance stream offset to the current table version but don't want to consume the stream data in a DML transaction - you can use one of two approaches:

  • Recreate the stream (using the CREATE OR REPLACE command)
  • Insert the current change data into a dummy table. In the INSERT statement, query the stream but include a WHERE clause that filters out all of the change data. Example:
-- for the members table

-- create utility table
create or replace table utility (
  id number(8) not null
);

-- advance the offset of the stream member_check
begin;
insert into utility select id from member_check where false;
commit;

-- check member_check stream again 
-- member_check is empty because we've advanced offset for this stream
select * from member_check;
/**
ID  NAME    FEE METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID
**/

Enter fullscreen mode Exit fullscreen mode

A stream becomes stale when its offset is outside of the data retention period for its source table. When a stream becomes stale, the historical data for the source table is no longer accessible, including any unconsumed change records. To track new change records for the table, recreate the stream. To prevent a stream from becoming stale, consume the stream records within a transaction during the retention period for the table. See more details and related settings in the Snowflake documentation.

Additional details of Snowflake Streams

  • On DDL changes (e.g. ADD COLUMN, DROP COLUMN) we won’t get any new records in the stream, even adding a new column with a default value won’t cause any UPDATE-s. Existing / new records in the stream will include changes in columns. Example:
-- create a table
create or replace table test (
  id number(8) not null
);

-- create a Standard stream 
create or replace stream test_stream on table test;

-- insert values
insert into test (id)
values (1), (2);
commit;

-- check the stream
select * from test_stream;
/**
ID  METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID
1   INSERT  FALSE   71e6b8e0f43b2c14330ce1a3142d2e8001a93c61
2   INSERT  FALSE   c97db7c45da5efeb895cbd811f210a506600f620
**/

-- add a new column with a default value
alter table test add column fee number default 10 not null;

-- check the stream again => no new updates in the stream, a new column is visible
select * from test_stream;
/**
ID  FEE METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID
1   10  INSERT  FALSE   71e6b8e0f43b2c14330ce1a3142d2e8001a93c61
2   10  INSERT  FALSE   c97db7c45da5efeb895cbd811f210a506600f620
**/
Enter fullscreen mode Exit fullscreen mode
  • For the Standard stream type if a record was inserted and then deleted between two stream reads - it won’t be visible at all. Example:
-- for the members table

-- insert a new record and then delete it
insert into members (id,name,fee)
values (6, 'Mike', 0);
commit;
delete from members where id = 6;
commit;

-- check streams
-- member_check is empty because it's Standard stream
select * from member_check;
/**
ID  NAME    FEE METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID
**/

-- member_check_append contains a new record because it's Append-only stream
select * from member_check_append;
/**
ID  NAME    FEE METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID
6   Mike    0   INSERT  FALSE   d98b3586efc9e5b17ff13956bdb304094894a4ec
**/
Enter fullscreen mode Exit fullscreen mode
  • Updates are not visible in the stream if there is no actual data change. Example:
-- for the members table

-- update fee column with the same value
update members set fee = 0 where id = 3;
commit;

-- check the stream => no new values
select * from member_check;
/**
ID  NAME    FEE METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID
**/

-- update fee column with the new value
update members set fee = 10 where id = 3;
commit;

-- check the stream => update is visible
select * from member_check;
/**
ID  NAME    FEE METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID
3   George  10  INSERT  TRUE    640590445de028a556d79d6361a8aa59fc3301d3
3   George  0   DELETE  TRUE    640590445de028a556d79d6361a8aa59fc3301d3
**/
Enter fullscreen mode Exit fullscreen mode
  • MERGE operations are visible in the stream similar to simple CREATE / UPDATE operations
  • If the row was updated multiple times before we could read it - in this case, the stream will return only the net changes, i.e. only the last value is returned. Example:
-- for the members table

-- update fee column several times, update name column
update members set fee = 20 where id = 3;
update members set fee = 30 where id = 3;
update members set name = 'Anna' where id = 3;
commit;

-- check the stream => the last version of the record is visible
select * from member_check;
/**
ID  NAME    FEE METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID
3   Anna    30  INSERT  TRUE    640590445de028a556d79d6361a8aa59fc3301d3
3   George  0   DELETE  TRUE    640590445de028a556d79d6361a8aa59fc3301d3
**/
Enter fullscreen mode Exit fullscreen mode

Conclusion

Snowflake Streams is a tool that lets you implement CDC pattern in your Data Warehouse without using external ELT tools, or simply allow consumers to track changes in some of your tables without incurring significant cost. At the same time, you need to be aware of the limitations of streams, some of which I described in my article, for example how DDL changes can affect your stream, or the possibility to get a stale stream, or the way changes are accumulated in the stream between two reads.


Cover image: Shutterstock/Mariia Tagirova

Top comments (0)