Skip to main content

Change Data Capture Source to Staging with Azure Data Factory

BimlFlex provides automated support for SQL Server Change Data Capture (CDC) when using Azure Data Factory (ADF) as the integration engine. This document covers the ADF-specific implementation patterns for CDC source extraction.

For general information about CDC concepts and SQL Server configuration, see Using SQL Server CDC as a Source with SSIS. For Microsoft's documentation on CDC, see About Change Data Capture (SQL Server).

Supported Target Platforms

BimlFlex generates ADF CDC pipelines that support loading to multiple target platforms:

  • Azure Synapse Analytics
  • Snowflake
  • Databricks
  • Azure SQL Database
  • Azure SQL Managed Instance
  • SQL Server

Prerequisites

Before implementing CDC with ADF in BimlFlex, ensure the following are in place:

  • CDC is enabled on the source SQL Server database
  • CDC is enabled on individual tables to be captured
  • BimlCatalog database is deployed and accessible from ADF
  • ADF linked services are configured for source and target connections

Project Configuration

A CDC Project for ADF is defined with an appropriate ADF integration template (e.g., ADF: Source -> Target).

The CDC specification is defined on the included objects. For each object, set the Object Type to either:

  • CDC All: Uses cdc.fn_cdc_get_all_changes_<capture_instance> to retrieve all changes
  • CDC Last: Uses cdc.fn_cdc_get_net_changes_<capture_instance> to retrieve only the net result of changes

If there are non-CDC objects from the source, they need to be added to a separate project. Only CDC objects can be in a CDC project.

Pipeline Architecture

BimlFlex generates ADF pipelines with a structured flow that handles both initial loads and incremental CDC loads.

Pipeline Flow

LogExecutionStart
|
v
GetParameters (IfCondition)
|
+---> ERR_GetParameters (on failure)
|
v
MainActivity (IfCondition)
|
+---> ERR_MainActivity (on failure)
|
v
MainProcess (IfCondition)
|
+---> ERR_MainProcess (on failure)
|
v
LogExecutionEnd + File Archive/Cleanup

GetParameters Activity Block

The GetParameters block retrieves and manages LSN (Log Sequence Number) watermarks for CDC tracking.

ActivityPurpose
Lkp__from_lsnRetrieves the stored high watermark (last loaded LSN) from BimlCatalog
Lkp__to_lsnExecutes sys.fn_cdc_get_max_lsn() to find the current maximum LSN
Lkp__min_lsnExecutes sys.fn_cdc_get_min_lsn('<capture_instance>') to find the minimum available LSN
SetIsFullLoadDetermines if this is an initial load based on IsInitialLoad parameter or stored watermark
Set__to_lsnSets the target LSN for this load cycle
Set__from_lsnAdjusts the starting watermark if it falls below the minimum available LSN

MainActivity Block

The MainActivity block executes the actual data copy operation using either an initial load query or an incremental CDC query.

Initial Load Query: Reads from the base table when IsFullLoad is true:

SELECT [Column1], [Column2], ...
'@{formatDateTime(pipeline().parameters.BatchStartTime)}' AS [FlexRowEffectiveFromDate],
'@{activity('LogExecutionStart').output.firstRow.ExecutionID}' AS [FlexRowAuditId],
'RecordSource' AS [FlexRowRecordSource]
FROM [Schema].[TableName]

Incremental Load Query: Uses the CDC function when IsFullLoad is false:

SELECT [Column1], [Column2], ...
CONVERT(DATETIME2(7), sys.fn_cdc_map_lsn_to_time([__$start_lsn])) AS [FlexRowEffectiveFromDate],
'@{activity('LogExecutionStart').output.firstRow.ExecutionID}' AS [FlexRowAuditId],
'RecordSource' AS [FlexRowRecordSource]
FROM [cdc].[fn_cdc_get_net_changes_Schema_TableName](
CONVERT(BINARY(10), CONVERT(BIGINT, '@{variables('__from_lsn')}'), 1),
CONVERT(BINARY(10), CONVERT(BIGINT, '@{variables('__to_lsn')}'), 1),
'all')

Parameters

Pipeline Parameters

ParameterTypeDescription
IsInitialLoadBoolWhen true, forces initial load behavior (reads from base table)
BatchStartTimeStringTimestamp for batch execution, used for initial load effective dates
BatchExecutionIDStringParent batch execution ID for logging hierarchy
RecordSourceStringRecord source identifier for lineage tracking

Pipeline Variables

VariableTypeDescription
__from_lsnIntegerCurrent high watermark stored as BIGINT (converted from binary LSN)
__to_lsnIntegerTarget watermark for this load cycle
IsFullLoadBooleanComputed flag indicating initial vs incremental load
InitialQueryStringDynamically constructed query for full loads
IncrementalQueryStringDynamically constructed CDC query for delta loads
HasRowsCopiedBooleanIndicates if the copy activity returned rows

Metadata Column Mappings

BimlFlex automatically derives the following metadata columns in ADF CDC pipelines:

ColumnADF Derivation
FlexRowEffectiveFromDateInitial: BatchStartTime parameter; Incremental: sys.fn_cdc_map_lsn_to_time(__$start_lsn)
FlexRowAuditIdExecutionID from LogExecutionStart activity output
FlexRowRecordSourceConfigured record source value from parameters

CDC Function Usage

Get Net Changes vs Get All Changes

BimlFlex uses the Object Type setting to determine which CDC function to use:

  • CDC Last (Get Net Changes): Returns only the final state of each row within the LSN range. Suitable for scenarios where only the latest value matters.
  • CDC All (Get All Changes): Returns every change record within the LSN range. Required when tracking the complete change history is necessary.

Handling Empty Result Sets

A key challenge with CDC functions is that they can fail when no changes exist in the specified LSN range. BimlFlex addresses this by checking if changes exist before executing the CDC query:

@or(greater(int(variables('__to_lsn')), int(variables('__from_lsn'))), variables('IsFullLoad'))

This expression ensures the MainActivity block only executes when:

  • There are changes to process (to_lsn > from_lsn), OR
  • This is a full/initial load

If no changes exist, the pipeline exits gracefully without attempting to call the CDC function.

Error Handling and Logging

BimlFlex ADF pipelines include comprehensive logging through the BimlCatalog:

Stored ProcedurePurpose
[adf].[LogExecutionStart]Initializes execution tracking, returns ExecutionID
[adf].[LogExecutionEnd]Marks successful completion
[adf].[LogExecutionError]Records error details for failed activities
[adf].[LogRowCount]Tracks rows processed by copy activities
[adf].[GetConfigVariable]Retrieves stored watermark values
[adf].[SetConfigVariable]Persists watermark values after successful loads

Error handling activities (ERR_GetParameters, ERR_MainActivity, ERR_MainProcess) are configured to execute on failure of their respective parent activities, ensuring errors are logged before the pipeline fails.

LSN Storage and Conversion

In ADF pipelines, LSN values are stored and manipulated as BIGINT integers rather than binary values. This simplifies ADF expression handling while maintaining accuracy.

The conversion process:

  1. Retrieval: sys.fn_cdc_get_max_lsn() returns binary, converted to BIGINT using CONVERT(BIGINT, ..., 1)
  2. Storage: BIGINT value stored in BimlCatalog via SetConfigVariable
  3. Usage: BIGINT converted back to binary for CDC function calls using CONVERT(BINARY(10), CONVERT(BIGINT, value), 1)

Settings and Configurations

PsaDeltaUseHashDiff

When set, this BimlFlex setting adds a RowHashDiff column to CDC loads. This enables full comparison when reinitializing CDC from the source, useful for:

  • Recovering from missed CDC windows
  • Validating CDC data against source tables
  • Re-synchronization scenarios

Considerations for Re-synchronization

The initial load assumes a clean load from the source system to an empty target. Incremental loads rely on the CDC change tables for loading only identified changes.

If the source process experiences outages or the CDC retention window is exceeded, re-synchronization may be required. This typically involves:

  1. Resetting the stored watermark to force an initial load
  2. Implementing a custom delete detection process from the base table
  3. Using the PsaDeltaUseHashDiff setting to enable hash comparison

Additional Notes

LSN Precision

LSN values are stored as BIGINT which provides sufficient precision for tracking SQL Server log positions. The original binary(10) LSN is converted without loss of precision.

CDC Capture Delay

There is an inherent delay between when changes occur in source tables and when they appear in CDC change tables. This is due to the asynchronous nature of the CDC capture process reading from the SQL Server transaction log. Consider this delay when designing near-real-time integration scenarios.

Timestamp Ordering

For multiple rapid changes to a table, several change records may have the same effective timestamp. BimlFlex uses the sys.fn_cdc_map_lsn_to_time() function to derive timestamps from the LSN, providing consistent ordering based on the actual log sequence.