Skip to main content

Content Driven Extraction with Biml: Part 2

Picking Up From Part 1

Part 1 set up a metadata-driven extraction layer with three procedures: one for connections, one for extraction header rows, and one for column mappings. With connections in place, the next step is to generate the extraction package itself, complete with per-row containers, truncations, dataflows, and explicit source-to-target column mappings.

Containers and Tasks per Extraction

The package iterates the extraction header rows and emits one Container per row. Each container holds an ExecuteSQL task that truncates the target table and a Dataflow that loads it. The package itself runs containers in Parallel while each container runs its tasks Linear, which gives parallelism across extractions while preserving truncate-then-load order inside one extraction.

<#@ import namespace="System.Data" #>
<#@ import namespace="System.Data.SqlClient" #>
<#@ template language="C#" tier="2" #>
<#
string metaConnString = @"Data Source=.\sql2019;Persist Security Info=true;Integrated Security=SSPI;Initial Catalog=RetailMeta";
DataTable extractRows = new DataTable();
SqlDataAdapter extractAdapter = new SqlDataAdapter("exec dbo.staging_extractions", metaConnString);
extractAdapter.Fill(extractRows);
#>
<Biml xmlns="http://schemas.varigence.com/biml.xsd">
<Packages>
<Package Name="002_Extractions" ConstraintMode="Parallel">
<Tasks>
<# foreach (DataRow ext in extractRows.Rows) { #>
<Container Name="<#= ext["extraction_id"] #>_<#= ext["entity_definition_tgt"] #>" ConstraintMode="Linear">
<Tasks>
<ExecuteSQL Name="truncate_<#= ext["extraction_id"] #>_<#= ext["entity_definition_tgt"] #>" ConnectionName="<#= ext["connection_tgt"] #>">
<DirectInput>Select 1</DirectInput>
</ExecuteSQL>
<Dataflow Name="extract_<#= ext["extraction_id"] #>_<#= ext["entity_definition_tgt"] #>">
</Dataflow>
</Tasks>
</Container>
<# } #>
</Tasks>
</Package>
</Packages>
</Biml>

The mixed constraint modes are the trick: parallel at the package level, linear inside the container.

Filling the Dataflow

The next snippet replaces the empty Dataflow with the source query, a derived column that stamps a load timestamp, and the destination. Each name and table reference is built from the current extraction row.

<Dataflow Name="extract_<#= ext["extraction_id"] #>_<#= ext["entity_definition_tgt"] #>">
<Transformations>
<OleDbSource
Name="src_<#= ext["extraction_id"] + "_" + ext["entity_schema_source"] + "_" + ext["entity_definition_source"] #>"
ConnectionName="<#= ext["connection_source"] #>">
<DirectInput>
Select * from <#= ext["entity_schema_source"] #>.<#= ext["entity_definition_source"] #>;
</DirectInput>
</OleDbSource>
<DerivedColumns Name="load_dt">
<Columns>
<Column Name="load_dt" DataType="DateTime">@[System::StartTime]</Column>
</Columns>
</DerivedColumns>
<OleDbDestination
Name="tgt_<#= ext["extraction_id"] + "_" + ext["entity_schema_tgt"] + "_" + ext["entity_definition_tgt"] #>"
ConnectionName="<#= ext["connection_tgt"] #>">
<ExternalTableOutput Table="<#= ext["entity_schema_tgt"] #>.<#= ext["entity_definition_tgt"] #>" />
</OleDbDestination>
</Transformations>
</Dataflow>

Explicit Column Mappings

If the package is generated as is, the destination uses default name-based mapping. Where source and target column names differ, an explicit Columns block on OleDbDestination is required. A second metadata procedure returns the mapping rows for the current extraction. A nested loop reads those rows and emits one Column per pair.

<Columns>
<#
DataTable colMaps = new DataTable();
string colSql = "exec [dbo].[staging_column_mappings] " + ext["extraction_id"];
SqlDataAdapter colAdapter = new SqlDataAdapter(colSql, metaConnString);
colAdapter.Fill(colMaps);
foreach (DataRow map in colMaps.Rows) { #>
<Column SourceColumn="<#= map["column_source"] #>" TargetColumn="<#= map["column_tgt"] #>" />
<# } #>
</Columns>

The shape of the nested iteration:

foreach (record ext in dbo.staging_extractions)
{
// emit container, truncate, source, derived column, destination
foreach (record map in dbo.staging_column_mappings(ext.extraction_id))
{
// emit Column SourceColumn / TargetColumn
}
}

The Final Generator

<#@ import namespace="System.Data" #>
<#@ import namespace="System.Data.SqlClient" #>
<#@ template language="C#" tier="2" #>
<#
string metaConnString = @"Data Source=.\sql2019;Persist Security Info=true;Integrated Security=SSPI;Initial Catalog=RetailMeta";
DataTable extractRows = new DataTable();
SqlDataAdapter extractAdapter = new SqlDataAdapter("exec dbo.staging_extractions", metaConnString);
extractAdapter.Fill(extractRows);
#>
<Biml xmlns="http://schemas.varigence.com/biml.xsd">
<Packages>
<Package Name="002_Extractions" ConstraintMode="Parallel" ProtectionLevel="EncryptSensitiveWithUserKey">
<Tasks>
<# foreach (DataRow ext in extractRows.Rows) { #>
<Container Name="<#= ext["extraction_id"] #>_<#= ext["entity_definition_tgt"] #>" ConstraintMode="Linear">
<Tasks>
<ExecuteSQL Name="truncate_<#= ext["extraction_id"] #>_<#= ext["entity_definition_tgt"] #>" ConnectionName="<#= ext["connection_tgt"] #>">
<DirectInput>truncate table <#= ext["entity_schema_tgt"] #>.<#= ext["entity_definition_tgt"] #>;</DirectInput>
</ExecuteSQL>
<Dataflow Name="extract_<#= ext["extraction_id"] #>_<#= ext["entity_definition_tgt"] #>">
<Transformations>
<OleDbSource Name="src_<#= ext["extraction_id"] + "_" + ext["entity_schema_source"] + "_" + ext["entity_definition_source"] #>" ConnectionName="<#= ext["connection_source"] #>">
<DirectInput>
Select * from <#= ext["entity_schema_source"] #>.<#= ext["entity_definition_source"] #>;
</DirectInput>
</OleDbSource>
<DerivedColumns Name="load_dt">
<Columns>
<Column Name="load_dt" DataType="DateTime">@[System::StartTime]</Column>
</Columns>
</DerivedColumns>
<OleDbDestination Name="tgt_<#= ext["extraction_id"] + "_" + ext["entity_schema_tgt"] + "_" + ext["entity_definition_tgt"] #>" ConnectionName="<#= ext["connection_tgt"] #>">
<ExternalTableOutput Table="<#= ext["entity_schema_tgt"] #>.<#= ext["entity_definition_tgt"] #>" />
<Columns>
<#
DataTable colMaps = new DataTable();
string colSql = "exec [dbo].[staging_column_mappings] " + ext["extraction_id"];
SqlDataAdapter colAdapter = new SqlDataAdapter(colSql, metaConnString);
colAdapter.Fill(colMaps);
foreach (DataRow map in colMaps.Rows) { #>
<Column SourceColumn="<#= map["column_source"] #>" TargetColumn="<#= map["column_tgt"] #>" />
<# } #>
</Columns>
</OleDbDestination>
</Transformations>
</Dataflow>
</Tasks>
</Container>
<# } #>
</Tasks>
</Package>
</Packages>
</Biml>

For three extractions, drag and drop is faster than this setup. The point is to grow without rewriting: adding an extraction is a row insert, and adding a column mapping is also a row insert.