Looping Over Twitter Search Terms with ForEachAdoLoop
Why this walkthrough
There is a Biml snippet for BigData: Reading a Twitter feed that pulls Twitter search results into a SQL Server staging table. It's a great demonstration of Biml's reach, but in its original form it supports only a single hard-coded search term. This walkthrough extends that snippet so it iterates over a configurable list of search terms, turning the original "single-term reader" into a re-usable Twitter search engine driven entirely by metadata in a SQL table.
Before you start
You'll need the original snippet working in your environment as the starting point. The original is here: BigData: Reading a Twitter feed.
NOTE: I am working with SQL server 2008 on my local machine. If you work through this walkthrough and just copy and paste my code into the original code it will NOT work as the connections are different. Any connection in the original code named "CnOleDBAdventureWorks2012" has been renamed to "Staging" in mine.
1. Create the search-terms table
To start off with, we are going to create a table to hold our search terms. Fire up SQL Server Management Studio and create a new table in the same database that dbo.TwitterLog resides in. You can use the following code to create the database:
CREATE TABLE dbo.SearchTerms (
SearchTermId bigint IDENTITY(1,1)PRIMARY KEY,
SearchTerm nvarchar(140) NULL
)
2. Populate the search-terms table
Add the different search terms you wish to use to the table in the SearchTerm column.
3. Adapt the snippet for multiple search terms
The original code can be changed to allow looping through the table and searching Twitter for each term.
a. In the connection element remove the one OleDbConnection entry and replace it with the following two entries:
<OleDbConnection Name="Config" ConnectionString="Data Source=localhost;Initial Catalog=TwitterSearch;Provider=SQLNCLI10.1;Integrated Security=SSPI;Connect Timeout=30;"/>
<OleDbConnection Name="Staging" ConnectionString="Data Source=localhost;Initial Catalog=TwitterSearch;Provider=SQLNCLI10.1;Integrated Security=SSPI;Connect Timeout=30;"/>
Explanation: The "Config" connection will be used to connect to the SearchTerm table and grab the terms to be searched. The "Staging" connection will be used to put the data grabbed from Twitter into the TwitterLog table.
b. Next, navigate down in the code to the Twitter.Reader Package element and replace the <Variable> tag with the following code:
<Variables>
<Variable Name="SearchTermsCollection" DataType="Object"></Variable>
</Variables>
<Tasks>
<ExecuteSQL Name="Get Search Terms From Database" ConnectionName="Config" ResultSet="Full">
<DirectInput>SELECT SearchTerm from SearchTerms</DirectInput>
<Results>
<Result Name="0" VariableName="User.SearchTermsCollection" />
</Results>
</ExecuteSQL>
<ForEachAdoLoop ConstraintMode="Linear" Name="Iterate Search Terms" SourceVariableName="User.SearchTermsCollection">
<Variables>
Explanation: This code adds an object that the search terms from the SearchTerms table are added to. These terms are then looped through in the ForEachAdoLoop element that is near the end of the pasted code.
c. To make the SearchTerm variable pull the search term from the SearchTermsCollection object add the following code after the Variables element that contains the LastTweetNumber and SearchTerm variables:
<VariableMappings>
<VariableMapping Name="0" VariableName="User.SearchTerm" />
</VariableMappings>
Explanation: The code now continues on as normal by using the SearchTerm variable to query twitter for new tweets.
d. The last thing to add is the closing tags for the ForEachAdoLoop and the new Tasks loop we created. To do this, add the following code before the </Package> tag near the end of the code:
</ForEachAdoLoop>
</Tasks>
4. Generate and run the package
Go ahead and generate and build the SSIS package. In a matter of seconds the twitter data related to the search terms in the SearchTerms table will be pulled into the TwtitterLog database.
Recap
Biml turns what would otherwise be a brittle copy-and-paste exercise into a small, metadata-driven framework: add a row to dbo.SearchTerms, regenerate, and the package picks up the new term automatically.
Full Biml listing
Here's the complete Biml file used above:
<Biml xmlns="http://schemas.varigence.com/biml.xsd">
<Annotations>
<Annotation>
Search Terms Table Script:
CREATE TABLE [dbo].[SearchTerms] (
[SearchTermId] [bigint] IDENTITY(1,1)PRIMARY KEY,
[SearchTerm] [nvarchar](140) NULL
)
Twitter Log Table Script:
CREATE TABLE [dbo].[TwitterLog](
[TweetNumber] [bigint] NULL,
[TweetID] [nvarchar](128) NULL,
[PublishedDateTime] [nvarchar](50) NULL,
[UpdatedDateTime] [nvarchar](50) NULL,
[TweetContent] [nvarchar](2048) NULL,
[TweetTitle] [nvarchar](2048) NULL,
[TweetURI] [nvarchar](2048) NULL,
[TweeterName] [nvarchar](2048) NULL,
[TweetLanguage] [nvarchar](128) NULL,
[TweetSource] [nvarchar](128) NULL,
[TweetResultType] [nvarchar](128) NULL,
[TweetGeoInfo] [nvarchar](128) NULL,
[TweeterImageLink] [nvarchar](128) NULL,
[TweetLink] [nvarchar](128) NULL,
[SearchTerm] [nvarchar](128) NULL
)
</Annotation>
</Annotations>
<Connections>
<OleDbConnection Name="Config" ConnectionString="Data Source=localhost;Initial Catalog=TwitterSearch;Provider=SQLNCLI10.1;Integrated Security=SSPI;Connect Timeout=30;"/>
<OleDbConnection Name="Staging" ConnectionString="Data Source=localhost;Initial Catalog=TwitterSearch;Provider=SQLNCLI10.1;Integrated Security=SSPI;Connect Timeout=30;"/>
</Connections>
<ScriptProjects>
<ScriptComponentProject Name ="SCS_Twitter_Feed">
<AssemblyReferences>
<AssemblyReference AssemblyPath="Microsoft.SqlServer.DTSPipelineWrap" />
<AssemblyReference AssemblyPath="Microsoft.SqlServer.DTSRuntimeWrap" />
<AssemblyReference AssemblyPath="Microsoft.SqlServer.PipelineHost" />
<AssemblyReference AssemblyPath="Microsoft.SqlServer.TxScript" />
<AssemblyReference AssemblyPath="System.dll" />
<AssemblyReference AssemblyPath="System.AddIn.dll" />
<AssemblyReference AssemblyPath="System.Data.dll" />
<AssemblyReference AssemblyPath="System.Xml.dll" />
</AssemblyReferences>
<OutputBuffers>
<OutputBuffer Name="Output0" IsSynchronous="false">
<Annotations>
<Annotation>
IsSynchronous="false" ==>
When there are no non-synchronus buffers,
the compiler does not emit the CreateNewOutputRows
virtual base method (to match the BIDS/SSDT behavior)
</Annotation>
</Annotations>
<Columns>
<Column Name="TweetNumber" DataType ="Int64"></Column>
<Column Name="TweetID" DataType="String" Length="128"></Column>
<Column Name="PublishedDateTime" DataType="String" Length="50"></Column>
<Column Name="UpdatedDateTime" DataType="String" Length="50"></Column>
<Column Name="TweetContent" DataType="String" Length="2048"></Column>
<Column Name="TweetTitle" DataType="String" Length="2048"></Column>
<Column Name="TweetURI" DataType="String" Length="2048"></Column>
<Column Name="TweeterName" DataType="String" Length="2048"></Column>
<Column Name="TweetLanguage" DataType="String" Length="128"></Column>
<Column Name="TweetSource" DataType="String" Length="128"></Column>
<Column Name="TweetResultType" DataType="String" Length="128"></Column>
<Column Name="TweetGeoInfo" DataType="String" Length="128"></Column>
<Column Name="TweeterImageLink" DataType="String" Length="128"></Column>
<Column Name="TweetLink" DataType="String" Length="128"></Column>
</Columns>
</OutputBuffer>
</OutputBuffers>
<ReadOnlyVariables>
<Variable Namespace="User" VariableName="LastTweetNumber" DataType="Int64"></Variable>
<Variable Namespace="User" VariableName="SearchTerm" DataType="String"></Variable>
</ReadOnlyVariables>
<Files>
<File Path="AssemblyInfo.cs">
using System.Reflection;
using System.Runtime.CompilerServices;
[assembly: AssemblyTitle("SCS_Twitter_Feed")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("SCS_Twitter_Feed")]
[assembly: AssemblyCopyright("Copyright @ 2012")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
[assembly: AssemblyVersion("1.0.*")]
</File>
<File Path="main.cs">
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using System.Xml;
using System.Web;
using System.Net;
[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent
{
public override void CreateNewOutputRows()
{
System.Xml.XmlDocument xml_doc = new System.Xml.XmlDocument();
string TwitterUri = "http://search.twitter.com/search.atom?q=%23"
+ this.Variables.SearchTerm
+ <![CDATA["&rpp=100&result_type=recent&since_id="]]>
+ this.Variables.LastTweetNumber ;
xml_doc = GetResponse(TwitterUri);
XmlNodeList child_nodes = xml_doc.GetElementsByTagName("entry");
string[] temp;
foreach (XmlNode child in child_nodes)
{
Output0Buffer.AddRow();
Output0Buffer.TweetID = child.ChildNodes.Item(0).InnerText;
temp = child.ChildNodes.Item(0).InnerText.Split(':');
Output0Buffer.TweetNumber = Convert.ToInt64(temp[2]);
Output0Buffer.PublishedDateTime= child.ChildNodes.Item(1).InnerText;
Output0Buffer.TweetLink = child.ChildNodes.Item(2).Attributes["href"].Value.ToString();
Output0Buffer.TweetTitle = child.ChildNodes.Item(3).InnerText;
Output0Buffer.TweetContent =child.ChildNodes.Item(4).InnerText;
Output0Buffer.UpdatedDateTime = child.ChildNodes.Item(5).InnerText;
Output0Buffer.TweeterImageLink = child.ChildNodes.Item(6).Attributes["href"].Value.ToString();
Output0Buffer.TweetGeoInfo = child.ChildNodes.Item(7).InnerText;
Output0Buffer.TweetResultType = child.ChildNodes.Item(8).ChildNodes.Item(0).InnerText;
Output0Buffer.TweetSource = child.ChildNodes.Item(9).InnerText;
Output0Buffer.TweetLanguage= child.ChildNodes.Item(10).InnerText;
Output0Buffer.TweeterName = child.ChildNodes.Item(11).ChildNodes.Item(0).InnerText;
Output0Buffer.TweetURI = child.ChildNodes.Item(11).ChildNodes.Item(1).InnerText;
}
}
public XmlDocument GetResponse(string uri)
{
XmlDocument doc = new XmlDocument();
WebRequest myRequest = WebRequest.Create(new Uri(uri));
IWebProxy proxy = myRequest.Proxy;
if (proxy != null)
{
proxy.GetProxy(myRequest.RequestUri);
}
doc.Load(myRequest.GetResponse().GetResponseStream());
return doc;
}
}
</File>
</Files>
</ScriptComponentProject>
</ScriptProjects>
<Packages>
<Package Name ="Twitter.Reader.Tut" ConstraintMode="Linear">
<Variables>
<Variable Name="SearchTermsCollection" DataType="Object"></Variable>
</Variables>
<Tasks>
<ExecuteSQL Name="Get Search Terms From Database" ConnectionName="Config" ResultSet="Full">
<DirectInput>SELECT SearchTerm from SearchTerms</DirectInput>
<Results>
<Result Name="0" VariableName="User.SearchTermsCollection" />
</Results>
</ExecuteSQL>
<ForEachAdoLoop ConstraintMode="Linear" Name="Iterate Search Terms" SourceVariableName="User.SearchTermsCollection">
<Variables>
<Variable Name ="LastTweetNumber" DataType="Int64" >0</Variable>
<Variable Name="SearchTerm" DataType="String"></Variable>
</Variables>
<VariableMappings>
<VariableMapping Name="0" VariableName="User.SearchTerm" />
</VariableMappings>
<Tasks>
<ExecuteSQL Name ="EST Get Last TweetNumber" ConnectionName="Staging" ResultSet="SingleRow">
<DirectInput> SELECT isnull(max([TweetNumber]),0) FROM [dbo].[TwitterLog] where [SearchTerm] =? </DirectInput>
<Parameters>
<Parameter Name="0" DataType="String" Length="128" Direction ="Input" VariableName ="User.SearchTerm"></Parameter>
</Parameters>
<Results>
<Result Name="0" VariableName="User.LastTweetNumber"></Result>
</Results>
</ExecuteSQL>
<Dataflow Name ="DFT Get Tweets">
<Transformations>
<ScriptComponentSource Name="SCS Twitter Feed">
<ScriptComponentProjectReference ScriptComponentProjectName="SCS_Twitter_Feed"></ScriptComponentProjectReference>
</ScriptComponentSource>
<DerivedColumns Name="DC SearchTerm">
<Columns>
<Column Name="SearchTerm" DataType ="String" Length ="128">@[User::SearchTerm]</Column>
</Columns>
</DerivedColumns>
<OleDbDestination Name="ODD dbo TwitterLog" ConnectionName="Staging">
<ExternalTableOutput Table="[dbo].[TwitterLog]"></ExternalTableOutput>
</OleDbDestination>
</Transformations>
</Dataflow>
</Tasks>
</ForEachAdoLoop>
</Tasks>
</Package>
</Packages>
</Biml>
Based on a snippet by John Minkjan.