Skip to main content

Metadata Based SSIS with Biml, Part 2

What Part One Left Out

The first cut of the metadata pattern handled a single source database and always loaded every column of every listed table. Real projects rarely fit that mold. They pull from more than one source. They want only a handful of columns from some tables. The metadata layer needs two more pieces of information: which source a table belongs to, and which columns of that table to import.

This walkthrough extends the metadata layer with both, then threads the new information through the same four file pipeline used in part one.

The Expanded Metadata

Two metadata tables live in the target database. The first lists the source connections. The second lists the tables to load, the columns to take from each table, and a flag to disable a row without deleting it:

USE WarehouseDb;
GO

CREATE TABLE dbo.LoadDriver_Connections (
ConnectionName nvarchar(50) NULL,
ServerName nvarchar(50) NULL,
DatabaseName nvarchar(50) NULL
);

CREATE TABLE dbo.LoadDriver_Tables (
ConnectionName nvarchar(50) NULL,
SchemaName nvarchar(50) NULL,
TableName nvarchar(50) NULL,
Columns nvarchar(max) NULL,
Disabled int NULL
);

INSERT INTO dbo.LoadDriver_Connections VALUES
('Ops', 'localhost', 'OperationsDb');

INSERT INTO dbo.LoadDriver_Tables VALUES
('Ops', 'Person', 'Person', '*', 0),
('Ops', 'Person', 'PersonPhone', 'PhoneNumber', 0);

A row with 'Columns' set to '*' loads every column. Anything else is treated as the literal column list to query.

The Updated Helper Class

The shared helper class picks up a few new fields. Most importantly, 'GetTableID' builds a deterministic staging name from the source connection, schema, and table:

using Varigence.Biml.CoreLowerer.SchemaManagement;
using Varigence.Biml.Extensions;
using Varigence.Languages.Biml;
using Varigence.Languages.Biml.Connection;
using System.Data;
using System.Collections.Generic;

public class LoadHelpers
{
public static string TargetDb = "WarehouseDb";
public static string TargetConnString =
"Provider=SQLNCLI11;Server=localhost;Initial Catalog=" + TargetDb + ";Integrated Security=SSPI;";
public static List<string> ImportColumns = new List<string>();

public static string GetTableID(string conn, string schema, string tbl)
{
return schema.ToUpper() == "DBO"
? (conn + "_" + tbl).ToUpper()
: (conn + "_" + schema + "_" + tbl).ToUpper();
}

public static List<string> GetNonEmptyList(AstDbConnectionNode conn, string sql)
{
var dt = ExternalDataAccess.GetDataTable(conn.ConnectionString, sql);
var list = new List<string>();
foreach (DataRow row in dt.Rows) list.Add(row[0].ToString());
if (list.Count == 0) list.Add("NONEMPTYSPACER");
return list;
}

public static ImportOptions DefaultImportOptions()
{
return ImportOptions.ExcludeIdentity
| ImportOptions.ExcludePrimaryKey
| ImportOptions.ExcludeUniqueKey
| ImportOptions.ExcludeColumnDefault
| ImportOptions.ExcludeIndex
| ImportOptions.ExcludeCheckConstraint
| ImportOptions.ExcludeForeignKey;
}
}

Generating Connections From Metadata

The environment file no longer hard codes the source. It loops over 'LoadDriver_Connections' and emits one Biml connection per row, tagging each one as 'IsSource' so the next file can find them:

<#@ template language="C#" #>
<#@ import namespace="System.Data" #>
<#@ code file="../code/LoadHelpers.cs" #>
<Biml xmlns="http://schemas.varigence.com/biml.xsd">
<Connections>
<OleDbConnection Name="Target" ConnectionString="<#=LoadHelpers.TargetConnString#>">
<Annotations>
<Annotation Tag="DbString"><#=LoadHelpers.TargetDb#>.dbo.</Annotation>
</Annotations>
</OleDbConnection>

<#
var connRows = ExternalDataAccess.GetDataTable(
LoadHelpers.TargetConnString,
"SELECT ConnectionName, ServerName, DatabaseName " +
"FROM dbo.LoadDriver_Connections " +
"WHERE ConnectionName <> 'Target' ORDER BY ConnectionName");

foreach (DataRow connRow in connRows.Rows) {
#>
<OleDbConnection Name="<#=connRow[0]#>"
ConnectionString="Provider=SQLNCLI11;Server=<#=connRow[1]#>;Initial Catalog=<#=connRow[2]#>;Integrated Security=SSPI;">
<Annotations>
<Annotation Tag="IsSource">True</Annotation>
</Annotations>
</OleDbConnection>
<# } #>
</Connections>

<Databases>
<Database Name="<#=LoadHelpers.TargetDb#>" ConnectionName="Target" />
</Databases>
<Schemas>
<Schema Name="dbo" DatabaseName="<#=LoadHelpers.TargetDb#>" />
</Schemas>
</Biml>

Adding a source means inserting a row in 'LoadDriver_Connections' and rerunning. The Biml connection appears automatically.

Building Tables With Per Source Filtering

The metadata file is the most heavily reworked. For each connection tagged 'IsSource', it queries 'LoadDriver_Tables' for that connection, runs 'GetDatabaseSchema' against the source, and emits a Biml table for each entry. The column list comes from the metadata: a list of literal names is used to filter the imported columns down. The original schema-qualified source name and the source connection name are stored as annotations so the load file can recover them later:

<#@ template language="C#" tier="2" #>
<#@ import namespace="System.Data" #>
<#@ code file="../code/LoadHelpers.cs" #>
<Biml xmlns="http://schemas.varigence.com/biml.xsd">
<Tables>
<#
var sourceConnections = RootNode.Connections
.OfType<AstDbConnectionNode>()
.Where(c => c.GetTag("IsSource") == "True");

foreach (var srcConn in sourceConnections) {
var tableSql =
"SELECT TableName FROM dbo.LoadDriver_Tables " +
"WHERE ConnectionName = '" + srcConn.Name + "' AND ISNULL(Disabled, 0) = 0";

var importResult = srcConn.GetDatabaseSchema(
null,
LoadHelpers.GetNonEmptyList(
(AstDbConnectionNode)RootNode.Connections["Target"],
tableSql),
LoadHelpers.DefaultImportOptions());

foreach (var srcTable in importResult.TableNodes) {
var stgName = LoadHelpers.GetTableID(srcConn.Name, srcTable.Schema.Name, srcTable.Name);
#>
<Table Name="<#=stgName#>" SchemaName="<#=RootNode.Schemas[0].ScopedName#>">
<Columns>
<#
var columnsSql =
"SELECT Columns FROM dbo.LoadDriver_Tables " +
"WHERE ConnectionName = '" + srcConn.Name + "' " +
"AND TableName = '" + srcTable.Name + "' " +
"AND SchemaName = '" + srcTable.Schema.Name + "' " +
"AND ISNULL(Disabled, 0) = 0";

var columnsLiteral = ExternalDataAccess.GetDataTable(
LoadHelpers.TargetConnString, columnsSql).Rows[0][0].ToString();

LoadHelpers.ImportColumns.Clear();
var probeSql = "SELECT TOP 0 " + columnsLiteral +
" FROM " + srcTable.SchemaQualifiedName;

foreach (DataColumn dc in ExternalDataAccess.GetDataTable(
srcConn.ConnectionString, probeSql).Columns) {
LoadHelpers.ImportColumns.Add(dc.Caption.ToUpper());
}

foreach (var column in srcTable.Columns
.Where(c => LoadHelpers.ImportColumns.IndexOf(c.Name.ToUpper()) != -1)) {
#>
<#=column.GetBiml()#>
<# } #>
</Columns>
<Annotations>
<Annotation AnnotationType="Tag" Tag="SourceSchemaQualifiedName">
<#=srcTable.SchemaQualifiedName#>
</Annotation>
<Annotation AnnotationType="Tag" Tag="Connection">
<#=srcConn.Name#>
</Annotation>
</Annotations>
</Table>
<# } } #>
</Tables>
</Biml>

Resolving the column list against the live source by issuing 'SELECT TOP 0' lets the metadata accept either '*' or an explicit comma separated list. The result is the same: a Biml column list filtered to only the columns the metadata asks for.

Creating the Staging Tables

The create file is identical to part one. The same loop over 'RootNode.Tables' emits one Execute SQL per table and 'GetDropAndCreateDdl' takes care of the rest:

<#@ template language="C#" tier="2" #>
<Biml xmlns="http://schemas.varigence.com/biml.xsd">
<Packages>
<Package Name="01_CreateStaging" ConstraintMode="Parallel">
<Tasks>
<# foreach (var stgTable in RootNode.Tables) { #>
<ExecuteSQL Name="Create <#=stgTable.Name#>" ConnectionName="Target">
<DirectInput><#=stgTable.GetDropAndCreateDdl()#></DirectInput>
</ExecuteSQL>
<# } #>
</Tasks>
</Package>
</Packages>
</Biml>

Loading From the Right Source

The load file changes only its 'OleDbSource' line. Instead of a fixed 'SourceConn', the connection name comes from the table's 'Connection' annotation, and the SELECT uses 'GetColumnList' against the source schema-qualified name pulled from the other annotation:

<#@ template language="C#" tier="4" #>
<Biml xmlns="http://schemas.varigence.com/biml.xsd">
<Packages>
<Package Name="02_PopulateStaging" ConstraintMode="Parallel">
<Tasks>
<# foreach (var stgTable in RootNode.Tables) { #>
<Container Name="Transfer <#=stgTable.Name#>" ConstraintMode="Linear">
<Tasks>
<ExecuteSQL Name="Truncate <#=stgTable.Name#>" ConnectionName="Target">
<DirectInput>TRUNCATE TABLE <#=stgTable.Name#></DirectInput>
</ExecuteSQL>
<Dataflow Name="Copy <#=stgTable.Name#>">
<Transformations>
<OleDbSource Name="RetrieveRows"
ConnectionName="<#=stgTable.GetTag("Connection").Trim()#>">
<DirectInput>
SELECT <#=stgTable.GetColumnList()#>
FROM <#=stgTable.GetTag("SourceSchemaQualifiedName")#>
</DirectInput>
</OleDbSource>
<OleDbDestination Name="LoadRows" ConnectionName="Target">
<TableOutput TableName="<#=stgTable.ScopedName#>" />
</OleDbDestination>
</Transformations>
</Dataflow>
</Tasks>
</Container>
<# } #>
</Tasks>
</Package>
</Packages>
</Biml>

Each table now reads from its own source connection, pulls only the columns its metadata row asked for, and lands the result in the matching staging table.

What the Annotation Layer Buys

Two annotations carry every table specific decision: which source it lives on and what its source path is. The template stays generic and the metadata table stays small. Adding a new source is a row insert. Filtering a table down to a few columns is a column list edit. Disabling a load is flipping the 'Disabled' flag.