Change Data Capture Source to Staging with Azure Data Factory
BimlFlex provides automated support for SQL Server Change Data Capture (CDC) when using Azure Data Factory (ADF) as the integration engine. This document covers the ADF-specific implementation patterns for CDC source extraction.
For general information about CDC concepts and SQL Server configuration, see Using SQL Server CDC as a Source with SSIS. For Microsoft's documentation on CDC, see About Change Data Capture (SQL Server).
Supported Target Platforms
BimlFlex generates ADF CDC pipelines that support loading to multiple target platforms:
- Azure Synapse Analytics
- Snowflake
- Databricks
- Azure SQL Database
- Azure SQL Managed Instance
- SQL Server
Prerequisites
Before implementing CDC with ADF in BimlFlex, ensure the following are in place:
- CDC is enabled on the source SQL Server database
- CDC is enabled on individual tables to be captured
- BimlCatalog database is deployed and accessible from ADF
- ADF linked services are configured for source and target connections
Project Configuration
A CDC Project for ADF is defined with an appropriate ADF integration template (e.g., ADF: Source -> Target).
The CDC specification is defined on the included objects. For each object, set the Object Type to either:
- CDC All: Uses
cdc.fn_cdc_get_all_changes_<capture_instance>to retrieve all changes - CDC Last: Uses
cdc.fn_cdc_get_net_changes_<capture_instance>to retrieve only the net result of changes
If there are non-CDC objects from the source, they need to be added to a separate project. Only CDC objects can be in a CDC project.
Pipeline Architecture
BimlFlex generates ADF pipelines with a structured flow that handles both initial loads and incremental CDC loads.
Pipeline Flow
LogExecutionStart
|
v
GetParameters (IfCondition)
|
+---> ERR_GetParameters (on failure)
|
v
MainActivity (IfCondition)
|
+---> ERR_MainActivity (on failure)
|
v
MainProcess (IfCondition)
|
+---> ERR_MainProcess (on failure)
|
v
LogExecutionEnd + File Archive/Cleanup
GetParameters Activity Block
The GetParameters block retrieves and manages LSN (Log Sequence Number) watermarks for CDC tracking.
| Activity | Purpose |
|---|---|
Lkp__from_lsn | Retrieves the stored high watermark (last loaded LSN) from BimlCatalog |
Lkp__to_lsn | Executes sys.fn_cdc_get_max_lsn() to find the current maximum LSN |
Lkp__min_lsn | Executes sys.fn_cdc_get_min_lsn('<capture_instance>') to find the minimum available LSN |
SetIsFullLoad | Determines if this is an initial load based on IsInitialLoad parameter or stored watermark |
Set__to_lsn | Sets the target LSN for this load cycle |
Set__from_lsn | Adjusts the starting watermark if it falls below the minimum available LSN |
MainActivity Block
The MainActivity block executes the actual data copy operation using either an initial load query or an incremental CDC query.
Initial Load Query: Reads from the base table when IsFullLoad is true:
SELECT [Column1], [Column2], ...
'@{formatDateTime(pipeline().parameters.BatchStartTime)}' AS [FlexRowEffectiveFromDate],
'@{activity('LogExecutionStart').output.firstRow.ExecutionID}' AS [FlexRowAuditId],
'RecordSource' AS [FlexRowRecordSource]
FROM [Schema].[TableName]
Incremental Load Query: Uses the CDC function when IsFullLoad is false:
SELECT [Column1], [Column2], ...
CONVERT(DATETIME2(7), sys.fn_cdc_map_lsn_to_time([__$start_lsn])) AS [FlexRowEffectiveFromDate],
'@{activity('LogExecutionStart').output.firstRow.ExecutionID}' AS [FlexRowAuditId],
'RecordSource' AS [FlexRowRecordSource]
FROM [cdc].[fn_cdc_get_net_changes_Schema_TableName](
CONVERT(BINARY(10), CONVERT(BIGINT, '@{variables('__from_lsn')}'), 1),
CONVERT(BINARY(10), CONVERT(BIGINT, '@{variables('__to_lsn')}'), 1),
'all')
Parameters
Pipeline Parameters
| Parameter | Type | Description |
|---|---|---|
IsInitialLoad | Bool | When true, forces initial load behavior (reads from base table) |
BatchStartTime | String | Timestamp for batch execution, used for initial load effective dates |
BatchExecutionID | String | Parent batch execution ID for logging hierarchy |
RecordSource | String | Record source identifier for lineage tracking |
Pipeline Variables
| Variable | Type | Description |
|---|---|---|
__from_lsn | Integer | Current high watermark stored as BIGINT (converted from binary LSN) |
__to_lsn | Integer | Target watermark for this load cycle |
IsFullLoad | Boolean | Computed flag indicating initial vs incremental load |
InitialQuery | String | Dynamically constructed query for full loads |
IncrementalQuery | String | Dynamically constructed CDC query for delta loads |
HasRowsCopied | Boolean | Indicates if the copy activity returned rows |
Metadata Column Mappings
BimlFlex automatically derives the following metadata columns in ADF CDC pipelines:
| Column | ADF Derivation |
|---|---|
FlexRowEffectiveFromDate | Initial: BatchStartTime parameter; Incremental: sys.fn_cdc_map_lsn_to_time(__$start_lsn) |
FlexRowAuditId | ExecutionID from LogExecutionStart activity output |
FlexRowRecordSource | Configured record source value from parameters |
CDC Function Usage
Get Net Changes vs Get All Changes
BimlFlex uses the Object Type setting to determine which CDC function to use:
- CDC Last (Get Net Changes): Returns only the final state of each row within the LSN range. Suitable for scenarios where only the latest value matters.
- CDC All (Get All Changes): Returns every change record within the LSN range. Required when tracking the complete change history is necessary.
Handling Empty Result Sets
A key challenge with CDC functions is that they can fail when no changes exist in the specified LSN range. BimlFlex addresses this by checking if changes exist before executing the CDC query:
@or(greater(int(variables('__to_lsn')), int(variables('__from_lsn'))), variables('IsFullLoad'))
This expression ensures the MainActivity block only executes when:
- There are changes to process (to_lsn > from_lsn), OR
- This is a full/initial load
If no changes exist, the pipeline exits gracefully without attempting to call the CDC function.
Error Handling and Logging
BimlFlex ADF pipelines include comprehensive logging through the BimlCatalog:
| Stored Procedure | Purpose |
|---|---|
[adf].[LogExecutionStart] | Initializes execution tracking, returns ExecutionID |
[adf].[LogExecutionEnd] | Marks successful completion |
[adf].[LogExecutionError] | Records error details for failed activities |
[adf].[LogRowCount] | Tracks rows processed by copy activities |
[adf].[GetConfigVariable] | Retrieves stored watermark values |
[adf].[SetConfigVariable] | Persists watermark values after successful loads |
Error handling activities (ERR_GetParameters, ERR_MainActivity, ERR_MainProcess) are configured to execute on failure of their respective parent activities, ensuring errors are logged before the pipeline fails.
LSN Storage and Conversion
In ADF pipelines, LSN values are stored and manipulated as BIGINT integers rather than binary values. This simplifies ADF expression handling while maintaining accuracy.
The conversion process:
- Retrieval:
sys.fn_cdc_get_max_lsn()returns binary, converted to BIGINT usingCONVERT(BIGINT, ..., 1) - Storage: BIGINT value stored in BimlCatalog via
SetConfigVariable - Usage: BIGINT converted back to binary for CDC function calls using
CONVERT(BINARY(10), CONVERT(BIGINT, value), 1)
Settings and Configurations
PsaDeltaUseHashDiff
When set, this BimlFlex setting adds a RowHashDiff column to CDC loads. This enables full comparison when reinitializing CDC from the source, useful for:
- Recovering from missed CDC windows
- Validating CDC data against source tables
- Re-synchronization scenarios
Considerations for Re-synchronization
The initial load assumes a clean load from the source system to an empty target. Incremental loads rely on the CDC change tables for loading only identified changes.
If the source process experiences outages or the CDC retention window is exceeded, re-synchronization may be required. This typically involves:
- Resetting the stored watermark to force an initial load
- Implementing a custom delete detection process from the base table
- Using the
PsaDeltaUseHashDiffsetting to enable hash comparison
Additional Notes
LSN Precision
LSN values are stored as BIGINT which provides sufficient precision for tracking SQL Server log positions. The original binary(10) LSN is converted without loss of precision.
CDC Capture Delay
There is an inherent delay between when changes occur in source tables and when they appear in CDC change tables. This is due to the asynchronous nature of the CDC capture process reading from the SQL Server transaction log. Consider this delay when designing near-real-time integration scenarios.
Timestamp Ordering
For multiple rapid changes to a table, several change records may have the same effective timestamp. BimlFlex uses the sys.fn_cdc_map_lsn_to_time() function to derive timestamps from the LSN, providing consistent ordering based on the actual log sequence.