Watching YouTube Activity with Apache Streams

We’re finishing up our series of blogs on providers with YouTube. After this we’ll get down business on building our own social monitoring quasi-app!

Getting YouTube Credentials

Before accessing any API, we of course need to get our credentials.

A good first step in an enterprise like this would be to read the docs, or at least thumb through them, regarding registering an application with the YouTube API.

But if you wanted to read lengthy docs, you probably wouldn’t be checking out this blog- so here’s the short version:

Open the Google Developers Dashboard we’re going to “Create a Project”

Screenshot from 2016-12-06 13-25-05.png

I’m going to call my application “Apache Streams Demo”

Screenshot from 2016-12-06 13-26-13.png

The next dialog is where things can go awry.  First, right click “Enable APIs you plan to use” and open that link in a new tab, then click the drop down arrow next to “Create Credentials” and then select “Create Service Account Key” (we’ll come back to API enablement in a minute).

Screenshot from 2016-12-06 13-30-46.png

In the next screen choose ‘New Service Account’ and select P12 download type (not JSON which is the default).  A dialogue will pop up with some key information. I set permissions to owner, but that probably isn’t necessary. After you hit create the key will be generated and downloaded.

Screenshot from 2016-12-06 13-47-00.png

Screenshot from 2016-12-06 13-50-32.png

Set that .p12 file aside somewhere safe. We’ll need it soon.

Finally, we need to enable the services we want with this key.  Go to the library tab (or open the ‘new tab’ you opened when you were creating the service account.

You’ll see a long list of all the APIs you can grant access to.  We want to focus, for the moment on the YouTube services.

Screenshot from 2016-12-08 13-47-13.png

Go through each API you want (the YouTubes for this demo) and click “Enable” at the top.

Screenshot from 2016-12-08 14-20-27.png

Configurations

Full Credit to Steve Blockmon on this well done notebook.

Import Setup.json

Import YouTube.json

Setup.json

Steve and I are working toward a suite of notebooks aimed at creating a Social Monitoring Quasi-App.  In the first paragraph of setup.json we see the following paragraph.

%spark.dep
z.reset()
z.addRepo("apache-snapshots").url("https://repository.apache.org/content/repositories/snapshots").snapshot()
z.load("org.apache.streams:streams-core:0.5-incubating-SNAPSHOT")
z.load("org.apache.streams:streams-converters:0.5-incubating-SNAPSHOT")
z.load("org.apache.streams:streams-pojo:0.5-incubating-SNAPSHOT")
z.load("org.apache.streams:streams-provider-twitter:0.5-incubating-SNAPSHOT")
z.load("org.apache.streams:streams-provider-facebook:0.5-incubating-SNAPSHOT")
z.load("org.apache.streams:streams-provider-youtube:0.5-incubating-SNAPSHOT")
z.load("org.apache.streams:google-gplus:0.5-incubating-SNAPSHOT")

We’re using the Zeppelin Dependency Loader here (you’ll likely see a message how this is deprecated). At the time of writing, there seems to be an issue with adding repositories via the GUI (e.g. it doesn’t pick up the new repository URL).

z.addRepo(... does add the new repository- in this case the Apache SNAPSHOT repository. The remaining lines load various Apache Streams providers.

In the next paragraph of the notebook Steve defines a couple of UDFs for dealing with punctuation and spaces in tweets and descriptions. These will come in handy later and are worth taking a moment to briefly grok.

Youtube.json

The first paragraph isn’t too exciting- some simple imports:

%spark
import org.apache.streams.config._
import org.apache.streams.core._
import org.apache.youtube.pojo._

import com.typesafe.config._
import com.youtube.provider._

import java.util.Iterator

The second paragraph is where things start to get interesting. As we do, we are building a hocon configuration. We are also leveraging Zeppelin Dynamic Forms

val apiKey = z.input("apiKey", "")
val serviceAccountEmailAddress = z.input("serviceAccountEmailAddress", "")
val pathToP12KeyFile = z.input("pathToP12KeyFile", "")

val credentials_hocon = s"""
youtube {
apiKey = $apiKey
oauth {
serviceAccountEmailAddress = "$serviceAccountEmailAddress"
pathToP12KeyFile = "$pathToP12KeyFile"
}
}
"""

This paragraph will yield:

screenshot-from-2016-12-06-15-08-39

To find your API Key and Service account (assuming you didn’t write them down earlier- tsk tsk). Go back to your Google Developers Console, on the Credentials Tab, and click “manage service accounts” on the right.  On this page you will see your service account and API Key. If you have multiple- make sure to use the one that cooresponds to the .p12 file you downloaded.

screenshot-from-2016-12-06-15-14-27

After you enter your crendentials and path to .p12file- make sure to click the “Play” button again to re-run the paragraph and update with your credentials.

The next configuration specifies who we want to follow. It is a list of UserIDs.

%spark
val accounts_hocon = s"""
youtube.youtubeUsers = [
# Apache Software Foundation - Topic
{ userId = "UCegQNPmRCAJvCq6JfHUKZ9A"},
# Apache Software Foundation
{ userId = "TheApacheFoundation"},
# Apache Spark
{ userId = "TheApacheSpark" },
# Apache Spark - Topic
{ userId = "UCwhtqOdWyCuqOboj-E1bpFQ"},
# Apache Flink Berlin
{ userId = "UCY8_lgiZLZErZPF47a2hXMA"},
# Apache Syncope
{ userId = "UCkrSQVb5Qzb13crS1kCOiQQ"},
# Apache Accumulo
{ userId = "apacheaccumulo"},
# Apache Hive - Topic
{ userId = "UCIjbkZAX5VlvSKoSzNUHIoQ"},
# Apache HBase - Topic
{ userId = "UCcGNHRiO9bi6BeH5OdhY2Kw"},
# Apache Cassandra - Topic
{ userId = "UC6nsS04n_wBpCDXqSAkFM-w"},
# Apache Hadoop - Topic
{ userId = "UCgRu3LbCjczooTVI9VSvstg"},
# Apache Avro - Topic
{ userId = "UCzHCk8Gl5eP85xz0HwXjkzw"},
# Apache Maven - Topic
{ userId = "UCBS2s2cwx-MW9rVeKwee_VA"},
# Apache Oozie - Topic
{ userId = "UCRyBrviuu3qMNliolYXvC0g"},
]
"""

I would recommend you stick with this list if you don’t have any activity on YouTube, or you’re just getting started and want to make sure some data is coming in.  But in the spirit of creating our own little social monitoring quasi-app, consider replacing the proceeding paragraph with this:

%spark
val myChannel = z.input("my YouTube Channel", "")
val myId = z.input("my ID", "")

val accounts_hocon = s"""
youtube.youtubeUsers = [
{ userId = "${myId}"},
{ userId = "${myChannel}"}
]
"""

Which is going to give you a dynamic form in which a user can insert their own YouTube channel Id.

Screen Shot 2016-12-09 at 3.32.42 PM.png

You’ll note here I have put in both my User ID and channel Id, and the only difference is adJyExT6JcrA0kvKNHrXZw vs UCadJyExT6JcrA0kvKNHrXZw is the proceeding UC in the latter- denoting it is a channel. Further if you notice the code- you submit User ID and channel ID as userId=.... This may be helpful because as we will see the channel and user APIs give us slightly different information.

To find channel IDs just surf around YouTube and go to channel pages. You’ll see the address is something like https://www.youtube.com/channel/UCGsMOj1pUwabpurXSKzfIfg (which is Astronomia Nova) and the UCGsMOj1pUwabpurXSKzfIfg part is the user ID.

Getting at that data

First we’re going to set up a ConfigFactory to load our config strings which we just did. This is going to create our YouTube configuration.

%spark
val reference = ConfigFactory.load()
val credentials = ConfigFactory.parseString(credentials_hocon)
val accounts = ConfigFactory.parseString(accounts_hocon)
val typesafe = accounts.withFallback(credentials).withFallback(reference).resolve()
val youtubeConfiguration = new ComponentConfigurator(classOf[YoutubeConfiguration]).detectConfiguration(typesafe, "youtube");

Next we will pull information about the Channel(s) we noted in our accounts_hocon config string. This may be all of the Apache Accounts or our personal one. We will create an ArrayBuffer to store all of the responses we get. This should be old hat by now- you’ll notice the very similar API as accessing the Facebook and Twitter APIs from previous posts.

%spark
import com.typesafe.config._
import org.apache.streams.config._
import org.apache.streams.core._
import com.youtube.provider._
import org.apache.youtube.pojo._
import java.util.Iterator

val youtubeChannelProvider = new YoutubeChannelProvider(youtubeConfiguration);
youtubeChannelProvider.prepare(null)
youtubeChannelProvider.startStream()

val channel_buf = scala.collection.mutable.ArrayBuffer.empty[Object]
while( youtubeChannelProvider.isRunning()) {
val resultSet = youtubeChannelProvider.readCurrent()
resultSet.size()
val iterator = resultSet.iterator();
while(iterator.hasNext()) {
val datum = iterator.next();
channel_buf += datum.getDocument
}
}
channel_buf.size

The important thing to watch for, especially if using something other than the Apache userID set is that the output of the previous paragraph doesn’t end with something like: res45: Int = 0 because if that is the case- your buffer is empty and you will get errors when you try to make a table. If your buffer is empty, try using different user Ids.

Next we’ll get the user activity results.

%spark

val buf = scala.collection.mutable.ArrayBuffer.empty[Object]

val provider = new YoutubeUserActivityProvider(youtubeConfiguration);
provider.prepare(null)
provider.startStream()
while(provider.isRunning()) {
val resultSet = provider.readCurrent()
resultSet.size()
val iterator = resultSet.iterator();
while(iterator.hasNext()) {
val datum = iterator.next();
//println(datum.getDocument)
buf += datum.getDocument
}
}
buf.size

Basically the same idea here- same as last time, make sure that buffer is greater than 0!

Processing data into something we can play with

Our goal here is to get our data into a format we can play with. The easiest route to this end is to leverage Spark DataFrames / SQL Context and Zeppelin’s table display.

At the end of the last session on Apache Streams, we left as an exercise to the user a way to untangle all of the messy characters and line feeds. Now we provide the solution, a couple of User Defined Functions (UDFs) to strip all of that bad noise out.

%spark
import org.apache.spark.sql.functions._
import org.apache.spark.sql.UserDefinedFunction
import java.util.regex.Pattern

val toLowerCase = udf {
(text: String) => text.toLowerCase
}

val removeLineBreaks = udf {
(text: String) =>
val regex = "[\\n\\r]"
val pattern = Pattern.compile(regex)
val matcher = pattern.matcher(text)

// Remove all matches, split at whitespace (repeated whitespace is allowed) then join again.
val cleanedText = matcher.replaceAll(" ").split("[ ]+").mkString(" ")

cleanedText
}

val removePunctuationAndSpecialChar = udf {
(text: String) =>
val regex = "[\\.\\,\\:\\-\\!\\?\\n\\t,\\%\\#\\*\\|\\=\\(\\)\\\"\\>\\ posts(s)
val YoutubeTypeConverter = new YoutubeTypeConverter()
YoutubeTypeConverter.prepare()

val pages_datums = channel_buf.flatMap(x => YoutubeTypeConverter.process(new StreamsDatum(x)))

Now we create our DataFrame and register it as a temp table. Notice we use our “cleaner function” aka removePunctuationAndSpecialChar to clean column summary. This gives us a nice clean summary, while preserving the original actor.summary. If actor.summary doesn’t make any sense as a column name, see when we get to querying the table- it has to do with how spark queries the nested fields of a JSON.

%spark
import org.apache.streams.jackson.StreamsJacksonMapper;
import sqlContext._
import sqlContext.implicits._

val mapper = StreamsJacksonMapper.getInstance();
val pages_jsons = pages_datums.map(o => mapper.writeValueAsString(o.getDocument))
val pagesRDD = sc.parallelize(pages_jsons)

val pagesDF = sqlContext.read.json(pagesRDD)

val cleanDF = pagesDF.withColumn("summary", removePunctuationAndSpecialChar(pagesDF("actor.summary")))
cleanDF.registerTempTable("youtube_pages")
cleanDF.printSchema

In the line where we create page_json we do a map over the pages_datums we had created just previously (which was type scala.collection.mutable.ArrayBuffer[org.apache.streams.core.StreamsDatum] if you’re in to that sort of thing), changing each StreamsDatum into a JSON. We then paralellize that array, and do the standard Spark things to create a DataFrame from the RDD. Hocus-pocus and our channels have now been processed into a temp table called youtube_pages. The process is repeated for our users’ posts.

%spark
import org.apache.streams.core.StreamsDatum
import com.youtube.processor._
import scala.collection.JavaConversions._
//Normalize activities -> posts(s)
val YoutubeTypeConverter = new YoutubeTypeConverter()
YoutubeTypeConverter.prepare()

val useractivity_posts = buf.flatMap(x => YoutubeTypeConverter.process(new StreamsDatum(x)))

And then…

%spark
import org.apache.streams.jackson.StreamsJacksonMapper;
import sqlContext._
import sqlContext.implicits._

val mapper = StreamsJacksonMapper.getInstance();
val jsons = useractivity_posts.map(o => mapper.writeValueAsString(o.getDocument))
val activitiesRDD = sc.parallelize(jsons)

val activitiesDF = sqlContext.read.json(activitiesRDD)

val cleanDF = activitiesDF.withColumn("content", removePunctuationAndSpecialChar(activitiesDF("content")))
cleanDF.registerTempTable("youtube_posts")
cleanDF.printSchema

Same idea, now we have a table called youtube_posts.

Let’s play.

At the end of the second paragraph for pages and posts above we called the .printSchema method on our DataFrame. This is helpful because it shows us … the schema of the DataFrame, which in a non-flat format such as JSON can be very useful. In general to traverse this schema we use field.subfield.subsubfield to query these things. Consider the following SQL statment:

%spark.sql
select actor.id
, actor.displayName
, summary
, actor.extensions.followers
, actor.extensions.posts
, extensions.youtube.statistics.viewCount from youtube_pages

and

%spark.sql
select id
, published
, actor.id
, actor.displayName
, content
, title from youtube_posts

Screen Shot 2016-12-09 at 4.19.19 PM.png

Here we see, I’m not such a popular guy on YouTube; 1 follower, 23 views, 1 post (a recent Apache Mahout Talk). My chance at being one of the great YouTube bloggers has passed me by, such is life.

Conclusions

The story arch of this series on Apache Streams has reached its zenith. If you have been following along, you’ll notice there isn’t really so much new information per Apache Streams anymore- the posts are more dedicated to ‘How to get your credentials for provider X’. I was thinking about doing one last one on Google+ since the credentialing is essentially the same, and you only need to change a couple of lines of code to say Google instead of YouTube.

In the dramatic conclusion to the Apache Streams mini-saga, we’re going to combine all of these together into a ‘quasi-app’ in which you collect all of your data from various providers and start merging it together using Spark SQL, visualizing with Angular, Zeppelin, and D3Js, and a little machine learning to see if we can’t learn a little something about our selves and the social world/bubble we (personally) live in.

Getting to Know Your Friends with Apache Streams

In out last adventure we got did a basic example of Apache Streams with Twitter data. This week we’re going to extend that example with Facebook data! Also note, if this seems a little light it is because it’s not that different from the last post and the full explanations are there. Our goal here is not to explain all of Apache Streams (last week) but to begin branching out into other providers.

Getting credentialed through Facebook

The first step in any new provider is going to be to go get credentials. To get Facebook credentials you’ll need to visit Facebook App Dashboard. Once you get there and sign up, you’ll need to click “Add new App” (top right). After you’ve create the new app click “Settings”, in the panel to right. Here you will see the “AppID”, and if you click “Click show to see App Secret”, you’ll have two important strings you’re going to need soon.

The final piece of credentials you’ll need to set up is a userAccessToken. The easiest way to do this is to go to Graph API Explorer. These are temporary tokens that expire quickly (after an hour or so). This isn’t a “production” grade approach, but it is a good way to test things out and get going. Persistent tokening with Facebook is out of scope of this blog post and things which I care about at this point in my life.

Setting up the Stream

First you’ll need to add some dependencies, including the Facebook provider jar:

%spark.dep
z.reset()
z.addRepo("apache-snapshots").url("https://repository.apache.org/content/repositories/snapshots").snapshot()
z.load("org.apache.streams:streams-provider-facebook:0.4-incubating-SNAPSHOT")
z.load("org.apache.streams:streams-converters:0.4-incubating-SNAPSHOT")

Next we create a paragraph to make a cute little GUI for entering our credentials.

%spark.spark
val appId = z.input("appId", "")
val appSecret = z.input("appSecret", "")
val userAccessToken = z.input("userAccessToken", "")

That’s going to create a from where you (or your friends) can copy paste their credentials into.

screenshot-from-2016-11-03-18-31-53

You’ll also want all of your inputs… and there are a LOT. (Likely not all necessary).

%spark.spark

import com.typesafe.config._

import org.apache.streams.config._
import org.apache.streams.converter.ActivityConverterProcessor
import org.apache.streams.core.StreamsProcessor
import org.apache.streams.core._
import org.apache.streams.facebook._
import org.apache.streams.facebook.FacebookUserInformationConfiguration
import org.apache.streams.facebook.FacebookUserstreamConfiguration
import org.apache.streams.facebook.Page
import org.apache.streams.facebook.Post
import org.apache.streams.facebook.processor.FacebookTypeConverter
import org.apache.streams.facebook.provider.FacebookFriendFeedProvider
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity

import scala.collection.JavaConverters
import scala.collection.JavaConversions._
import java.util.Iterator

Next we’ll create the config files. We dynamically create this string based on the values we enter at our little GUI we made. The info section is supposed list which users your are interested in. I think for this provider it is not important especially considering the way we got our AccessToken. In this case it basically just gives you everything you are entitled to and nothing else. Specifically this provider gives you a stream of everything your friends are doing.

%spark

val credentials =
s"""
{
facebook {
oauth {
appId = "$appId"
appSecret = $appSecret
userAccessToken = $userAccessToken
}

info = [
rawkintrevo
]

}
}
"""
val credentialsConfig = ConfigFactory.parseString(credentials)

val typesafe = ConfigFactory.parseString(credentials)
val config = new ComponentConfigurator(classOf[FacebookUserstreamConfiguration]).detectConfiguration(typesafe, "facebook");
val provider = new FacebookFriendFeedProvider(config, classOf[org.apache.streams.facebook.Post] );

val timeline_buf = scala.collection.mutable.ArrayBuffer.empty[Object]

provider.prepare()
provider.startStream()

This should look very similar to the setup we did for the Twitter stream, except of course we are using FacebookUserstreamConfiguration and the FacebookFriendFeedProvider classes, and our stream will consist of org.apache.streams.facebook.Posts

%spark
//while(provider.isRunning()) {
for (i <- 0 to 20) {
  val resultSet = provider.readCurrent()
  resultSet.size()
  val iterator = resultSet.iterator();
  while(iterator.hasNext()) {
    val datum = iterator.next();
    //println(datum.getDocument)
    timeline_buf += datum.getDocument
  }
  println(s"Iteration $i")
}
provider.cleanUp()

The key difference here is that we stop running after 40 iterations. That should be PLENTY of data. If we use the while loop, this would run forever updating each time our friends had another action.

Now we have to process our tweets facebook posts.

%spark
import org.apache.streams.converter.ActivityConverterProcessor
import org.apache.streams.facebook.processor.FacebookTypeConverter
import org.apache.streams.core.StreamsProcessor
import org.apache.streams.pojo.json.Activity
import scala.collection.JavaConverters
import scala.collection.JavaConversions._

import org.apache.streams.pojo.json.Activity
import org.apache.streams.facebook.Post
import org.apache.streams.facebook.Page

//val converter = new ActivityConverterProcessor()
val converter = new FacebookTypeConverter(classOf[Post], classOf[Activity])

converter.prepare()

val status_datums = timeline_buf.map(x => new StreamsDatum(x))
val activity_datums = status_datums.flatMap(x => converter.process(x)).map(x => x.getDocument.asInstanceOf[Activity])

import org.apache.streams.jackson.StreamsJacksonMapper;

val mapper = StreamsJacksonMapper.getInstance();
val activitiesRDD = sc.parallelize(activity_datums.map(o => mapper.writeValueAsString(o)))

val activitiesDF = sqlContext.read.json(activitiesRDD)

activitiesDF.registerTempTable("activities")

The only line that changed here was

//val converter = new ActivityConverterProcessor()
val converter = new FacebookTypeConverter(classOf[Post], classOf[Activity])

In the future, we won’t even need to change that line! But for now, we have to explicitly set the FacebookTypeConverter and let it know we are expecting to see Posts and we want Activities (which are Activity Streams 1.0 compliant.)

At this point it is pretty much the same as last time. Feel free to check out the schema of the activitiesDF by running

%spark.spark

activitiesDF.printSchema()

If you are comparing to the tweet activities you collected before, you’ll notice there are some places where Facebook post-based activities contain more or less information, but at a high level the schema is similar: actor, likes, rebroadcasts, etc.

Let’s do a simple SQL query that won’t reveal the identity of any of my friends…

%spark.sql

select location.city, count(id) from activities where location is not null group by location.city

Screenshot from 2016-11-03 17-25-53.png

And here we see most of my friends and friends of friends of friend hail from Chicago, which makes sense because it is the best city in the country, and my friends aren’t dummies.

We’ll also need to do a little post-processing on some of these columns. For instance, times are all given as stings- so to work with them as dates we can do something like this:

%spark.sql
select a.hour_of_day, sum(rebroadcasts)
from (select hour(from_utc_timestamp(published, "MM/dd/yyyyZHH:mm:ss")) as hour_of_day, rebroadcasts.count as rebroadcasts from activities) a
group by hour_of_day

Screenshot from 2016-11-03 17-42-18.png

Notes

  1. Zeppelin’s Table API doesn’t handle new lines well. If you try to build a table of the messages, it’s not going to work out great. I leave it as an exercise to the user to clean that up.

Link to full notebook

Dipping Your Toes in Apache Streams

Do you know the origin story of the Apache Software Foundation? Let me tell you the short version: In the late 90s most of the internet was being powered by a web-server that no one was maintaining. It was a good product, but no one was feeding and caring for it (e.g. bug fixes, security updates). On BBS boards a number of people were talking about the issue, and how before too long they were going to have to start writing their own web-servers to replace it.

Around this time they realized, none of their respective companies competed based on web-servers; they were selling books, and flights, and hosting gifs of dancing hamsters. It made more sense for everyone to work together on a common web-server project- as a web server is something everyone needs, but no one uses for competitive advantage. Thus the Apache Web Server was born, and with it the Apache Software Foundation and the legally defensible, yet business friendly copyright.

This history lesson has a point, fast forward to present day and enter Apache Streams-incubating- lots of people from several industries are looking at social data from across multiple providers, everyone builds their own processors to transform into a useful format- that might be extracting text from tweets and Facebook posts. No one competes to do a ‘better job’ extracting data – its a straight forward task to connect to a providers API and reshaping a JSON into a table or other object- but it is something that everyone has to do. That’s where Apache Streams comes in.

Apache Streams is a package of tools for:
1. Connecting to a provider (e.g. Facebook or Twitter)
2. Retrieving data and reshaping it into a WC3 common format
3. Storing that data in a specified target (e.g. local file or database)
4. Doing this all in the JVM or a distributed engine (e.g. Apahce Flink or Apache Spark)
5. Much of this can be done with JSON configurations

We are going to write code to build this stream, but soon (upcoming post) we will show how to do it code free.

About Apache Streams

High level, Apache Streams is a framework that collects data from multiple social media providers, transform the data into a common, WC3 backed, format known as Activity Streams, and persist the data to multiple endpoints (e.g. files or databases).

It is also possible to use Apache Streams inline with regular Java/Scala code, which is what we will be demonstrating here today.

Step 1. Configuring

This post assuming you have Apache Zeppelin installed. Other posts on this blog explain how to setup up Zeppelin on clusters- you don’t need any of that. A local version of Zeppelin will work just find for what we’re about to do- see the official Zeppelin side for getting started

Method 1- The %spark.dep interpreter.

This is the fail-safe method- however it relies on the deprecated %dep interpreter.

In the first paragraph of the notebook enter the following:

%spark.dep
z.addRepo("apache-snapshots").url("https://repository.apache.org/content/repositories/snapshots").snapshot()
z.load("org.apache.streams:streams-provider-twitter:0.4-incubating-SNAPSHOT")
z.load("org.apache.streams:streams-converters:0.4-incubating-SNAPSHOT")

What we’ve done here is add the Apache Snapshots repository, which contains the Streams 0.4 Snapshots. A Snapshot is a most up-to-date version of a project. It’s not production grade but it has all of the latest and greatest features- unstable though they may be. At the time of writing Apache Streams is in the process of releasing 0.4.

If you get an error:

Must be used before SparkInterpreter (%spark) initialized
Hint: put this paragraph before any Spark code and restart Zeppelin/Interpreter

Go to the interpreters page and restart the interpreter. (Click on “Anonymous” at the top right, a drop down menu will pop up, click on “Interpreters”, then at the new page search or scroll down to “Spark”. In the top right of the Spark interpreter tile there are three buttons, “Edit”, “Restart”, “Remove”. Click “Restart” then try running the paragraph again.

Method 2- The config GUI

Follow the directions directly above for accessing the interpreter config GUI in Zeppelin. This time however, click “Edit”.

You will see Properties and Values. Scroll down and find the property:

zeppelin.dep.additionalRemoteRepository

In the Values, add the following code.

apache-snapshots,https://repository.apache.org/content/repositories/snapshots,true;

This step is equivalent to the z.addRepo(... line in the other method.

Now scroll down past the Properties and Values to the Dependencies section. In the Artifact section, add org.apache.streams:streams-provider-twitter:0.4-incubating-SNAPSHOT and org.apache.streams:streams-converters:0.4-incubating-SNAPSHOT, hit the little “+” button to the right after adding each. Finally save the changes (little button at bottom), which will restart the interpreter.

Step 2. Getting credentials from Twitter

Each provider will have its own method for authenticating the calls you make for data, and its own process for granting credentials. In the case of Twitter, go to https://apps.twitter.com/.

There are several great blog posts you can find if you Google “create twitter app”. I’ll leave you to go read and learn. It’s a fairly painless process.

When you create the app, or if you already have an app- we need the following information:
– Consumer Key
– Consumer Secret
– Access Token
– Access Token Secret

Step 3. Configuring the Stream

First we are going to use the Apache Zeppelin’s Dynamic Forms to make it easy for users to copy and paste their Twitter Credentials into the program. Could you hard code your credentials? Sure, but this is a demo and we’re showing off.

The UserID is who we are going to follow. It should work with User ID’s or Twitter handles, however at the time of writing there is a bug where only UserIDs are available. In the mean time, use an online tool to convert usernames into IDs

You’ll see that we are simply getting values to fill into a string that looks sort of like a JSON called hocon. This config string, is the makings of the config JSONs that would allow you to define a Stream with out very little code. For the sake of teaching however, we’re going to cut code anyway.

Full setup Paragraph

%spark
val consumerKey = z.input("ConsumerKey", "")
val consumerSecret = z.input("ConsumerSecret", "")
val accessToken = z.input("AccessToken", "")
val accessTokenSecret = z.input("AccessTokenSecret", "")
val userID = z.input("UserID", "")

val hocon = s"""
twitter {
oauth {
consumerKey = "$consumerKey"
consumerSecret = "$consumerSecret"
accessToken = "$accessToken"
accessTokenSecret = "$accessTokenSecret"
}
retrySleepMs = 5000
retryMax = 250
info = [ $userID ]
}
"""

Note: Becareful when sharing this notebook, your credentials (the last ones used) will be stored in the notebook, even though they are not hard-coded.

Step 4. Writing the Pipeline

We want to collect tweets, transform them into ActivityStreams objects, and persist the objects into a Spark DataFrame for querying and analytics.

After taking care of our imports, we will create an ArrayBuffer to which we will add tweets. Note, if you have any problems with the imports- check that you have done Step 1 correctly.

val timeline_buf = scala.collection.mutable.ArrayBuffer.empty[Object]

Next, we are going to parse the config string we created in the last step into a ComponentConfigurator which will be used to setup the Stream. Again, this all could be handled by Streams its self, we are literally recreating the wheel slightly for demonstration purposes.

val typesafe = ConfigFactory.parseString(hocon)
val config = new ComponentConfigurator(classOf[TwitterUserInformationConfiguration]).detectConfiguration(typesafe, "twitter");

In the remaining code, we create a Streams provider from our config file, run it and collect tweets into our array buffer.

Full paragraph worth of code for copy+paste

"%spark
import com.typesafe.config._
import java.util.Iterator

import org.apache.streams.config._
import org.apache.streams.core._

import org.apache.streams.twitter.TwitterUserInformationConfiguration
import org.apache.streams.twitter.pojo._
import org.apache.streams.twitter.provider._

val timeline_buf = scala.collection.mutable.ArrayBuffer.empty[Object]
val typesafe = ConfigFactory.parseString(hocon)
val config = new ComponentConfigurator(classOf[TwitterUserInformationConfiguration]).detectConfiguration(typesafe, "twitter");

val provider = new TwitterTimelineProvider(config);
provider.prepare(null)
provider.startStream()

while(provider.isRunning()) {
val resultSet = provider.readCurrent()
resultSet.size()
val iterator = resultSet.iterator();
while(iterator.hasNext()) {
val datum = iterator.next();
//println(datum.getDocument)
timeline_buf += datum.getDocument
}
}

Transforming Tweets into Activity Stream Objects

This is only a few lines of fairly self evident code. We create an ActivityConverterProcessor() and call the prepare method.

Recall, our timeline_buf was an ArrayBuffer of tweets- we are going to map each tweet into a StreamsDatum, because that is the format the converter expects for input. You can see on the next line our converter processes StreamsDatums.

The net result of this block is a scala.collection.mutable.ArrayBuffer[org.apache.streams.pojo.json.Activity] called activity_datums.

%spark

import org.apache.streams.converter.ActivityConverterProcessor
import org.apache.streams.core.StreamsProcessor
import org.apache.streams.pojo.json.Activity
import scala.collection.JavaConverters
import scala.collection.JavaConversions._

val converter = new ActivityConverterProcessor()
converter.prepare()

val status_datums = timeline_buf.map(x => new StreamsDatum(x))
val activity_datums = status_datums.flatMap(x => converter.process(x)).map(x => x.getDocument.asInstanceOf[Activity])

Step 5. Visualizing the results

This is more of a Spark / Apache Zeppelin trick than anything else. The StreamsJacksonMapper converts an Activity Stream Object into a JSON. We create activitiesRDD as an org.apache.spark.rdd.RDD[String] and in the next two lines create a DataFrame and register it as a temp table, in the usual ways.

%spark
import org.apache.streams.jackson.StreamsJacksonMapper;

val mapper = StreamsJacksonMapper.getInstance();
val activitiesRDD = sc.parallelize(activity_datums.map(o => mapper.writeValueAsString(o)))

val activitiesDF = sqlContext.read.json(activitiesRDD)

activitiesDF.registerTempTable("activities")

Zeppelin is so fun, because now we can use regular SQL in the next paragraph to explore this table- try this one on for size:

%spark.sql
select ht, count(id) as mentions
from (
select explode(hashtags) as ht, id
from activities
) a

group by ht
order by mentions desc

Screen Shot 2016-10-28 at 8.31.37 AM.png

This query gives us a county by hashtag. There are two thing you’ll want to review regarding SparkSQL and querying JSONs.
explode a row is with an array of length N it turned into N new rows, one for each element in N.
– To query a nested column use the style super_col.nested_col

Consider the following:

%spark

activitiesDF.printSchema()

Outputs this:

root
|-- actor: struct (nullable = true)
| |-- displayName: string (nullable = true)
| |-- extensions: struct (nullable = true)
| | |-- favorites: long (nullable = true)
| | |-- followers: long (nullable = true)
| | |-- location: string (nullable = true)
| | |-- posts: long (nullable = true)
| | |-- screenName: string (nullable = true)
...

I truncated the output. The point is, if I wanted to query displayName and how many followers the person had I could do something like this.

%spark.sql

select actor.displayName, actor.extensions.favorites from activities limit 20

This query wasn’t very interesting because all of the rows were me, and 70 (my followers count).

Screen Shot 2016-10-28 at 8.30.19 AM.png

Another dude’s blog post on querying JSONs, which candidly I didn’t even read

Final Thoughts and next steps

Apache Streams is an exciting young project that has the opportunity to really do some good in the world, by way of reducing wasteful redundancy (everyone has to write their own methods for collecting and homogenizing social media data).

Now you may be thinking- that was a lot of extra work to collect and visualize some tweets. I say to you:
1 – It wasn’t that much extra work
2 – Your tweets are now in Activity Streams which can be easily combined with Facebook / YouTube / GooglePlus / etc. data.

Hopefully this is going to be the first in a series of blog posts that will build on each other. Future blog posts will show how to connect other service providers and eventually we will build a collection of tools for collecting and analyzing data about your own social foot print. The series will culminate in a collection of Zeppelin Notebooks that allow the use to build an entire UI backed by Streams.

Click this link for full notebook