Recently, I was working for a client where one of their needs was to extract data in near real-time from SQL Server and expose it to an event driven architecture in Azure. Not the new, fancy SQL Server versions, no… That would have been too easy. No, this could be as old as SQL Server 2008, which, by coincidence, is also the version of SQL Server where the native SQL Server CDC capability was first released.
As you can imagine, it was hard to find any SaaS offerings on the market that supported data extraction from SQL Server 2008 using the native SQL Server CDC capabilities.
As a result, we made the decision to look into how one might go about building this type of application ourselves.
"How hard can it be, right!?"
TL;DR
Note! This code is just to showcase the pure essence of getting changes from a SQL Server using CDC. In a production environment, you would need a much more sofisticated solution with state management, error handling, logging/monitoring and so on.
Before you get any data from this code snippet, you need to setup your SQL Server by enabling CDC on your database and table(s).
To test this code snippet, you need to do the following:
- Create a console application
- Install necessary dependencies: Dapper and - Microsoft.Data.SqlClient
- Copy-paste the code to your Program.cs
- Update
CONNECTION STRING
using Dapper;
using Microsoft.Data.SqlClient;
using Microsoft.IdentityModel.Tokens;
using System.Collections;
using System.Text.Json;
byte[]? _currentMaxLSN = null;
using var conn = new SqlConnection("<CONNECTION STRING>");
var tables = await conn.QueryAsync<Table>("EXEC sys.sp_cdc_help_change_data_capture");
var idx = 0;
while (true)
{
await conn.OpenAsync();
var current = _currentMaxLSN?.CloneByteArray();
if (current == null)
{
current = await conn.QuerySingleAsync<byte[]>("SELECT MIN(start_lsn) FROM cdc.lsn_time_mapping");
}
var newMaxLSN = await conn.QuerySingleAsync<byte[]>("SELECT MAX(start_lsn) FROM cdc.lsn_time_mapping WHERE tran_id <> 0x00");
// if newMaxLSN is greater than _currentMaxLSN
if (((IStructuralComparable)newMaxLSN).CompareTo(current, Comparer<byte>.Default) > 0)
{
if (_currentMaxLSN != null)
{
current = await conn.QuerySingleAsync<byte[]>("SELECT sys.fn_cdc_increment_lsn(@lsn_value)", new { lsn_value = current });
}
foreach (var table in tables)
{
var query = $"SELECT * FROM cdc.fn_cdc_get_all_changes_{table.Capture_Instance}(@from_lsn, @to_lsn, 'all update old')";
var fromLSN = current;
// if start_lsn of CDC table is greater than fromLSN, then fromLSN needs to be changed to the start_lsn of the table.
// The reason for this is that you will get an exception if you try to get changes from a capture instance where from_lsn is less than the start_lsn of the table.
if (((IStructuralComparable)table.Start_LSN).CompareTo(fromLSN, Comparer<byte>.Default) > 0)
{
fromLSN = table.Start_LSN;
}
var changes = await conn.QueryAsync(query, new { from_lsn = fromLSN, to_lsn = newMaxLSN });
Console.WriteLine(
JsonSerializer.Serialize(
changes,
options: new JsonSerializerOptions { WriteIndented = true }));
}
_currentMaxLSN = newMaxLSN;
}
else
{
idx++;
if (idx > 3) idx = 1;
ClearBottomLine();
WriteOnBottomLine(string.Format("No changes found{0}", new string(Enumerable.Repeat('.', idx).ToArray())));
}
await conn.CloseAsync();
await Task.Delay(1000);
void ClearBottomLine()
{
WriteOnBottomLine(new string(' ', Console.WindowWidth));
}
void WriteOnBottomLine(string text)
{
int x = Console.CursorLeft;
int y = Console.CursorTop;
Console.CursorTop = Console.WindowTop + Console.WindowHeight - 1;
Console.Write(text);
// Restore previous position
Console.SetCursorPosition(x, y);
}
}
public record Table
{
public string Capture_Instance { get; init; }
public byte[] Start_LSN { get; init; }
}
Defining constraints
First of all, we had to define what our aim was and what we were willing to put on hold or treat as "bonus features".
One of the constraints was that we were only going to handle incremental loads and on-demand replays based on the available data in the CDC tables in the SQL DB. In other words, no support for full loads or extracting data prior to what had been persisted in the CDC tables.
We also wanted to avoid having any table specific logic such as DDL parsing, data transformation, etc.
Keep it simple!
Setting up SQL Server
To work with the CDC capabilities in SQL Server, you first need to enable CDC on the database and the tables that you want to handle.
Once you have completed these steps, there will be jobs running in the background to capture and cleanup data in the CDC tables. These jobs can be modified to fit your needs where you can control things such as polling interval for the capture job and retention for the cleanup job which controls how long the data should be persisted in the CDC tables before its removed.
"Well then, how do I get changes in near real-time!?"
Identifying Changes
First of all, you need to understand what the LSN (Log Sequence Number) is and what role it plays in the CDC feature.
"In SQL Server, the Log Sequence Number (LSN) is a unique identifier used to track and order transactions and changes within the transaction log." -https://www.linkedin.com/pulse/log-sequence-number-lsn-gopikrishna-pinnamaneni/
You use the LSN to identify if there are any changes in the CDC tables. This is done by persisting the maximum LSN as you extract changes from the CDC tables, then you poll for the maximum LSN and when the value is different from what you have persisted from the previous execution, you know that there are changes which you can retrieve from the CDC tables.
Wait a minute… It's stated here that "Entries may also be logged for which there are no change tables entries." Doesn't that mean there might be new maximum LSN values which are not related to changes in CDC tables?
Yes, that could happen if you were to use the built-in system function called sys.fn_cdc_get_max_lsn(). You could potentially get new maximum LSN values which are not related to any changes in the CDC tables.
One way around this is to query directly against the cdc.lsn_time_mapping table instead with a filter. You are only interested in the maximum LSN where the column "tran_id" is not equal to 0x00. This is the approach that Debezium use in order to optimize identifying changes in CDC tables.
SELECT MAX(start_lsn) FROM cdc.lsn_time_mapping WHERE tran_id <> 0x00
If the value is greater than what you had previously, that means you have 2 LSN values that make up a range of LSN values. It is within this range that you will find the new changes in your CDC tables.
But how do I use this range to get the changes in my CDC tables!?
Getting new changes
When you have 2 LSN values that make up an LSN range, you can use those LSN values to get new changes from your CDC tables.
You have two options, either get ALL the changes within that range or get NET changes within that range.
The difference between the two is basically that if one unique row has been altered multiple times within that range of LSN values, then getting ALL the changes would get all modifications on that single row, while getting NET changes would only get the latest one.
To use the function that will get you net changes on a CDC table, you need to configure the CDC table to support net changes. This also requires either defining the "index_name" when enabling CDC on your table or having a primary key on your table which would then be selected as the index automatically.
While only getting net changes can be nice to have, do pay attention to the fact that this means that an extra index needs to be maintained which could have a negative effect on CDC performance.
I don't care! Just tell me how to get the changes!
Alright, if you choose to support net changes then two functions which are tailor made for your CDC table (or rather it's connected to the capture instance that you defined when enabling CDC for your table) are created. Otherwise, you only get a function to get all changes.
The function to get all changes looks like this: cdc.fn_cdc_get_all_changes_capture_instance where is an optional parameter when enabling CDC for your table. If no value is provided, the capture instance will default to schema_table
.
For net changes, it's like this: cdc.fn_cdc_get_net_changes_capture_instance.
When you call any of these functions, they will require a "from_lsn" and "to_lsn" value. This is where your LSN range comes in.
You also have a third parameter, called "row_filter_option". This parameter works a bit differently depending on whether you use it for getting all or net changes.
For all changes, you can set the third parameter to "all". This will only get you the outcome after an update operation and not the state it was in before the update. If you set it to "all update old", you will get both the state of the row before and after the update operation in the output.
Since performance was key in this project, we didn't really look at getting net changes since enabling that might have a negative effect on performance.
Conclusion
If you only need to extract new changes in near real-time from SQL Server and you don't need to take snapshots of historical data, transform, route and do all kinds of crazy stuff in one single application, then it might be suitable for you to just build a CDC application on your own to get more control and flexibility if something needs to be tailored to your specific needs.
But, if you do need a richer feature set. I would suggest looking at what Debezium has to offer with their open source solution.
Top comments (0)