Menu Home

Azure Bootcamp 2015 – Twitter, Event Hub, Stream Analytics and Data Factory

Last Saturday the company I work for was hosting the Copenhagen edition of the Global Azure Bootcamp. Our day in Copenhagen, were focused around three sessions, one session about Recommendation engines with Hadoop by Sebastian Brandes, and an intro to docker containers by Morten Christensen.

I was fortunate enough to be able to give a 30 minutes quick intro to Event Hub, Stream Analytics and Data Factory. My presentation was meant a teaser on what you can do with these services in Azure, as many of the participants did not have any experience with them.

As part of the presentation, I provided the participants with some code to get started with the services, and it’s this code I want to share with people on my blog too.

My presentation was divided in two parts. The first part was about analyzing real time data with Event Hub and Stream Analytics. The example real time data I used was twitter data. I created a small console application with the tweetinvi library that forwarded certain tweets to an event hub.

Data from the event hub was then configured to flow into stream analytics, where I did some simple temporal analysis of counting the number of tweets per every ten second and output the time stamp and count to an Azure Table.

--- Example of a SQL the returns the number of tweets within a 10 second window
SELECT
    System.TimeStamp as T1,  Count(*), Datepart(yy,System.TimeStamp) as P1
INTO
    TweetCountTable
FROM
    tweethubsimple
    Timestamp by Created_At
Where lang = 48 
GROUP BY TumblingWindow(second,10)

From the Azure Table I showed how it is possible with the Power BI designer for end-users to gain insight from the data. The Power BI part was new to many of the developers, and they found it very useful.

Building the first part of the demo was easy, the only real issue I encounter was that the standard representation of the tweet from tweetinvi couldn’t be deserialized by stream analytics. So I had to wrap only the tweet properties interesting to my application in a simpler data structure.

The second part of my demo, was focused on Data Factory. Data Factory is in short a data orchestration service in Azure that is designed for moving data around in order to prepare it for further analysis. It contains tools for moving data from on-prem and into the cloud, and it supports calling HDInsight for deep analysis. Furthermore, it’s possible to write your own data pipeline components (activities) in e.g. .net that can do anything you can imagine.

In order to demonstrate some of this goodness I reconfigured my stream analytics pipeline to filter the stream of incoming tweets to only contain the English tweets and instead of storing them to Azure Table Storage, I outputted the result to a csv file in azure blob storage. This was necessary since Data Factory is not capable of working with real time data from event hubs. With the data stationary in blob storage, I wanted to present how to invoke custom .net Activities from the Data Factory pipeline, a topic I felt would be interesting to a developer focused audience.

The custom .net activity I implemented for my demo, was an activity that was able to call an external web service that could do sentiment analysis on the tweet text, thus giving the tweet a score of 1 if the tweet was positive and -1 if the tweet was negative.

using CsvHelper;
using Microsoft.DataFactories.Runtime;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Table;
using System;
using System.Collections.Generic;
using System.Data.Services.Client;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Net;
using System.Runtime.Serialization;
using System.Text;
using System.Threading.Tasks;

namespace SJKP.AzurebootCamp.DataFactoryActivity
{
    public class SentimentAnalysisActivity : IDotNetActivity
    {
        public string apikey;
        public string email;
        public IActivityLogger logger;
        public string url;
        /// <summary>
        /// Calls https://datamarket.azure.com/dataset/aml_labs/lexicon_based_sentiment_analysis to calculate a sentiment for a twitter tweet.
        /// Register at the site to get an apikey.
        /// </summary>
        /// <param name="inputTables"></param>
        /// <param name="outputTables"></param>
        /// <param name="properties"></param>
        /// <param name="logger"></param>
        /// <returns></returns>
        public IDictionary<string, string> Execute(IEnumerable<ResolvedTable> inputTables, IEnumerable<ResolvedTable> outputTables, IDictionary<string, string> properties, IActivityLogger logger)
        {
            this.logger = logger;
            try
            {
                url = properties["url"];
                logger.Write(TraceEventType.Information, "url {0}", url);

                apikey = properties["apikey"];
                logger.Write(TraceEventType.Information, "apikey {0}", apikey);

                email = properties["apikey"];
                logger.Write(TraceEventType.Information, "email {0}", email);
                

                foreach (var table in inputTables)
                {
                    var connectionString = table.LinkedService.GetConnectionString();
                    var folder = table.Table.GetFolderPath();

                    if (folder == null || connectionString == null)
                        continue;
                    BlobContinuationToken continuationToken = null;
                    CloudStorageAccount inputStorageAccount = CloudStorageAccount.Parse(connectionString);
                    CloudBlobClient inputClient = inputStorageAccount.CreateCloudBlobClient();
                    do
                    {
                        BlobResultSegment result = inputClient.ListBlobsSegmented(folder,
                                                    true,
                                                    BlobListingDetails.Metadata,
                                                    null,
                                                    continuationToken,
                                                    null,
                                                    null);
                        foreach (IListBlobItem listBlobItem in result.Results)
                        {
                            CloudBlockBlob inputBlob = listBlobItem as CloudBlockBlob;
                            
                            if (inputBlob != null)
                            {
                                foreach (var outputtable in outputTables)
                                {

                                    var outputstorageaccount = CloudStorageAccount.Parse(outputtable.LinkedService.GetConnectionString());
                                    var tableName = outputtable.Table.GetTableName();
                                    var tableClient = outputstorageaccount.CreateCloudTableClient();
                                    var outputAzureTable = tableClient.GetTableReference(tableName);
                                    outputAzureTable.CreateIfNotExists();
                                    ProcessTweetBlob(inputBlob, outputAzureTable, folder);
                                }
                            }
                        

                        }
                        continuationToken = result.ContinuationToken;

                    } while (continuationToken != null);
                }
            }

            catch (Exception ex)
            {
                this.logger.Write(TraceEventType.Error, ex.ToString());
            }
            return new Dictionary<string, string>();
        }

        public double GetScore(string url, string email, string apiKey, string textToAnalyze)
        {
            using (var wb = new WebClient())
            {
                var acitionUri = new Uri(url);
                DataServiceContext ctx = new DataServiceContext(acitionUri);
                var cred = new NetworkCredential(email, apiKey);
                var cache = new CredentialCache();

                cache.Add(acitionUri, "Basic", cred);
                ctx.Credentials = cache;
                var query = ctx.Execute<ScoreResult>(acitionUri, "POST", true, new BodyOperationParameter("Text", textToAnalyze));
                ScoreResult scoreResult = query.ElementAt(0);
                double result = scoreResult.result;
                return result;
            }
        }

        public void ProcessTweetBlob(CloudBlockBlob inputBlob, CloudTable outputAzureTable, string folder)
        {
            int count = 0;
            List<TweetSentimentScore> scores = new List<TweetSentimentScore>();
            using (var reader = new CsvReader(new StreamReader(inputBlob.OpenRead())))
            {
                while (reader.Read())
                {
                    if (count == 0)
                    {
                        logger.Write(TraceEventType.Information, "First line: [{0}]", string.Join(",", reader.CurrentRecord));
                    }
                    count++;
                    var tweet = reader.GetField(0); //get the tweet
                    var entity = new TweetSentimentScore()
                    {
                        PartitionKey = "tweetsentimentscore",
                        RowKey = Guid.NewGuid().ToString(),
                        Tweet = tweet,
                        SentimentScore = GetScore(url, email, apikey, tweet)
                    };
                    scores.Add(entity);

                    outputAzureTable.Execute(TableOperation.InsertOrReplace(entity)); //Do it one row at a time for demo output

                }
            }

            var iter = scores.Count() / 100;
            for (int i = 0; i <= iter; i++)
            {
                var batchOp = new TableBatchOperation();
                scores.Skip(100 * i).Take(100).ToList().ForEach(a =>
                {
                    batchOp.Add(TableOperation.InsertOrReplace(a));
                });
                //outputAzureTable.ExecuteBatch(batchOp); //Removed for demo purposes.
            }


            logger.Write(TraceEventType.Information, string.Format(CultureInfo.InvariantCulture,
                                "{0},{1},{2},{3},{4}\n",
                                folder,
                                inputBlob.Name,
                                count,
                                Environment.MachineName,
                                DateTime.UtcNow));
        }
    }

  

    public class ScoreResult
    {
        [DataMember]
        public double result
        {
            get;
            set;
        }
    }

    public class TweetSentimentScore : TableEntity
    {

        public string Tweet { get; set; }
        public double SentimentScore { get; set; }
    }
}

I found a service that was able to do the sentiment analysis in the Azure Data Market, unfortunately the service didn’t support batch processing, so analyzing a larger number of tweets with my activity is really slow. But I was able to get the concept working.

The options for testing a data pipeline in Azure Data Factory are really bad, and this is rather annoying as the developer experience in the preview stage is also somewhat lack lustering, the only option for authoring your data pipeline is by writing JSON for all pipeline components. So I have included the JSON definitions of the activities, data tables and linked services that I have used.

In order to test that I was able to successfully implement a custom activity I also implemented an activity that can post data to requestb.in, which can be useful in debugging scenarios or as sample code.

I also found the following powershell command line useful, for kicking off the data flow when trying to test a new activity.

Set-AzureDataFactoryPipelineActivePeriod -ResourceGroupName <resourceGroupName> -PipelineName <activityName> -DataFactoryName <dataFactoryName> -StartDateTime ([System.DateTime]::UtcNow.AddMinutes(-15)) -EndDateTime ([System.DateTime]::UtcNow.AddMinutes(15)) -ForceRecalculate  

To see all my code from the presentation check my github repo: https://github.com/sjkp/SJKP.AzureBootcamp2015

Categories: Software Windows Azure

Tagged as:

Simon J.K. Pedersen

Leave a Reply

Your email address will not be published. Required fields are marked *