Watching YouTube Activity with Apache Streams

We’re finishing up our series of blogs on providers with YouTube. After this we’ll get down business on building our own social monitoring quasi-app!

Getting YouTube Credentials

Before accessing any API, we of course need to get our credentials.

A good first step in an enterprise like this would be to read the docs, or at least thumb through them, regarding registering an application with the YouTube API.

But if you wanted to read lengthy docs, you probably wouldn’t be checking out this blog- so here’s the short version:

Open the Google Developers Dashboard we’re going to “Create a Project”

Screenshot from 2016-12-06 13-25-05.png

I’m going to call my application “Apache Streams Demo”

Screenshot from 2016-12-06 13-26-13.png

The next dialog is where things can go awry.  First, right click “Enable APIs you plan to use” and open that link in a new tab, then click the drop down arrow next to “Create Credentials” and then select “Create Service Account Key” (we’ll come back to API enablement in a minute).

Screenshot from 2016-12-06 13-30-46.png

In the next screen choose ‘New Service Account’ and select P12 download type (not JSON which is the default).  A dialogue will pop up with some key information. I set permissions to owner, but that probably isn’t necessary. After you hit create the key will be generated and downloaded.

Screenshot from 2016-12-06 13-47-00.png

Screenshot from 2016-12-06 13-50-32.png

Set that .p12 file aside somewhere safe. We’ll need it soon.

Finally, we need to enable the services we want with this key.  Go to the library tab (or open the ‘new tab’ you opened when you were creating the service account.

You’ll see a long list of all the APIs you can grant access to.  We want to focus, for the moment on the YouTube services.

Screenshot from 2016-12-08 13-47-13.png

Go through each API you want (the YouTubes for this demo) and click “Enable” at the top.

Screenshot from 2016-12-08 14-20-27.png

Configurations

Full Credit to Steve Blockmon on this well done notebook.

Import Setup.json

Import YouTube.json

Setup.json

Steve and I are working toward a suite of notebooks aimed at creating a Social Monitoring Quasi-App.  In the first paragraph of setup.json we see the following paragraph.

%spark.dep
z.reset()
z.addRepo("apache-snapshots").url("https://repository.apache.org/content/repositories/snapshots").snapshot()
z.load("org.apache.streams:streams-core:0.5-incubating-SNAPSHOT")
z.load("org.apache.streams:streams-converters:0.5-incubating-SNAPSHOT")
z.load("org.apache.streams:streams-pojo:0.5-incubating-SNAPSHOT")
z.load("org.apache.streams:streams-provider-twitter:0.5-incubating-SNAPSHOT")
z.load("org.apache.streams:streams-provider-facebook:0.5-incubating-SNAPSHOT")
z.load("org.apache.streams:streams-provider-youtube:0.5-incubating-SNAPSHOT")
z.load("org.apache.streams:google-gplus:0.5-incubating-SNAPSHOT")

We’re using the Zeppelin Dependency Loader here (you’ll likely see a message how this is deprecated). At the time of writing, there seems to be an issue with adding repositories via the GUI (e.g. it doesn’t pick up the new repository URL).

z.addRepo(... does add the new repository- in this case the Apache SNAPSHOT repository. The remaining lines load various Apache Streams providers.

In the next paragraph of the notebook Steve defines a couple of UDFs for dealing with punctuation and spaces in tweets and descriptions. These will come in handy later and are worth taking a moment to briefly grok.

Youtube.json

The first paragraph isn’t too exciting- some simple imports:

%spark
import org.apache.streams.config._
import org.apache.streams.core._
import org.apache.youtube.pojo._

import com.typesafe.config._
import com.youtube.provider._

import java.util.Iterator

The second paragraph is where things start to get interesting. As we do, we are building a hocon configuration. We are also leveraging Zeppelin Dynamic Forms

val apiKey = z.input("apiKey", "")
val serviceAccountEmailAddress = z.input("serviceAccountEmailAddress", "")
val pathToP12KeyFile = z.input("pathToP12KeyFile", "")

val credentials_hocon = s"""
youtube {
apiKey = $apiKey
oauth {
serviceAccountEmailAddress = "$serviceAccountEmailAddress"
pathToP12KeyFile = "$pathToP12KeyFile"
}
}
"""

This paragraph will yield:

screenshot-from-2016-12-06-15-08-39

To find your API Key and Service account (assuming you didn’t write them down earlier- tsk tsk). Go back to your Google Developers Console, on the Credentials Tab, and click “manage service accounts” on the right.  On this page you will see your service account and API Key. If you have multiple- make sure to use the one that cooresponds to the .p12 file you downloaded.

screenshot-from-2016-12-06-15-14-27

After you enter your crendentials and path to .p12file- make sure to click the “Play” button again to re-run the paragraph and update with your credentials.

The next configuration specifies who we want to follow. It is a list of UserIDs.

%spark
val accounts_hocon = s"""
youtube.youtubeUsers = [
# Apache Software Foundation - Topic
{ userId = "UCegQNPmRCAJvCq6JfHUKZ9A"},
# Apache Software Foundation
{ userId = "TheApacheFoundation"},
# Apache Spark
{ userId = "TheApacheSpark" },
# Apache Spark - Topic
{ userId = "UCwhtqOdWyCuqOboj-E1bpFQ"},
# Apache Flink Berlin
{ userId = "UCY8_lgiZLZErZPF47a2hXMA"},
# Apache Syncope
{ userId = "UCkrSQVb5Qzb13crS1kCOiQQ"},
# Apache Accumulo
{ userId = "apacheaccumulo"},
# Apache Hive - Topic
{ userId = "UCIjbkZAX5VlvSKoSzNUHIoQ"},
# Apache HBase - Topic
{ userId = "UCcGNHRiO9bi6BeH5OdhY2Kw"},
# Apache Cassandra - Topic
{ userId = "UC6nsS04n_wBpCDXqSAkFM-w"},
# Apache Hadoop - Topic
{ userId = "UCgRu3LbCjczooTVI9VSvstg"},
# Apache Avro - Topic
{ userId = "UCzHCk8Gl5eP85xz0HwXjkzw"},
# Apache Maven - Topic
{ userId = "UCBS2s2cwx-MW9rVeKwee_VA"},
# Apache Oozie - Topic
{ userId = "UCRyBrviuu3qMNliolYXvC0g"},
]
"""

I would recommend you stick with this list if you don’t have any activity on YouTube, or you’re just getting started and want to make sure some data is coming in.  But in the spirit of creating our own little social monitoring quasi-app, consider replacing the proceeding paragraph with this:

%spark
val myChannel = z.input("my YouTube Channel", "")
val myId = z.input("my ID", "")

val accounts_hocon = s"""
youtube.youtubeUsers = [
{ userId = "${myId}"},
{ userId = "${myChannel}"}
]
"""

Which is going to give you a dynamic form in which a user can insert their own YouTube channel Id.

Screen Shot 2016-12-09 at 3.32.42 PM.png

You’ll note here I have put in both my User ID and channel Id, and the only difference is adJyExT6JcrA0kvKNHrXZw vs UCadJyExT6JcrA0kvKNHrXZw is the proceeding UC in the latter- denoting it is a channel. Further if you notice the code- you submit User ID and channel ID as userId=.... This may be helpful because as we will see the channel and user APIs give us slightly different information.

To find channel IDs just surf around YouTube and go to channel pages. You’ll see the address is something like https://www.youtube.com/channel/UCGsMOj1pUwabpurXSKzfIfg (which is Astronomia Nova) and the UCGsMOj1pUwabpurXSKzfIfg part is the user ID.

Getting at that data

First we’re going to set up a ConfigFactory to load our config strings which we just did. This is going to create our YouTube configuration.

%spark
val reference = ConfigFactory.load()
val credentials = ConfigFactory.parseString(credentials_hocon)
val accounts = ConfigFactory.parseString(accounts_hocon)
val typesafe = accounts.withFallback(credentials).withFallback(reference).resolve()
val youtubeConfiguration = new ComponentConfigurator(classOf[YoutubeConfiguration]).detectConfiguration(typesafe, "youtube");

Next we will pull information about the Channel(s) we noted in our accounts_hocon config string. This may be all of the Apache Accounts or our personal one. We will create an ArrayBuffer to store all of the responses we get. This should be old hat by now- you’ll notice the very similar API as accessing the Facebook and Twitter APIs from previous posts.

%spark
import com.typesafe.config._
import org.apache.streams.config._
import org.apache.streams.core._
import com.youtube.provider._
import org.apache.youtube.pojo._
import java.util.Iterator

val youtubeChannelProvider = new YoutubeChannelProvider(youtubeConfiguration);
youtubeChannelProvider.prepare(null)
youtubeChannelProvider.startStream()

val channel_buf = scala.collection.mutable.ArrayBuffer.empty[Object]
while( youtubeChannelProvider.isRunning()) {
val resultSet = youtubeChannelProvider.readCurrent()
resultSet.size()
val iterator = resultSet.iterator();
while(iterator.hasNext()) {
val datum = iterator.next();
channel_buf += datum.getDocument
}
}
channel_buf.size

The important thing to watch for, especially if using something other than the Apache userID set is that the output of the previous paragraph doesn’t end with something like: res45: Int = 0 because if that is the case- your buffer is empty and you will get errors when you try to make a table. If your buffer is empty, try using different user Ids.

Next we’ll get the user activity results.

%spark

val buf = scala.collection.mutable.ArrayBuffer.empty[Object]

val provider = new YoutubeUserActivityProvider(youtubeConfiguration);
provider.prepare(null)
provider.startStream()
while(provider.isRunning()) {
val resultSet = provider.readCurrent()
resultSet.size()
val iterator = resultSet.iterator();
while(iterator.hasNext()) {
val datum = iterator.next();
//println(datum.getDocument)
buf += datum.getDocument
}
}
buf.size

Basically the same idea here- same as last time, make sure that buffer is greater than 0!

Processing data into something we can play with

Our goal here is to get our data into a format we can play with. The easiest route to this end is to leverage Spark DataFrames / SQL Context and Zeppelin’s table display.

At the end of the last session on Apache Streams, we left as an exercise to the user a way to untangle all of the messy characters and line feeds. Now we provide the solution, a couple of User Defined Functions (UDFs) to strip all of that bad noise out.

%spark
import org.apache.spark.sql.functions._
import org.apache.spark.sql.UserDefinedFunction
import java.util.regex.Pattern

val toLowerCase = udf {
(text: String) => text.toLowerCase
}

val removeLineBreaks = udf {
(text: String) =>
val regex = "[\\n\\r]"
val pattern = Pattern.compile(regex)
val matcher = pattern.matcher(text)

// Remove all matches, split at whitespace (repeated whitespace is allowed) then join again.
val cleanedText = matcher.replaceAll(" ").split("[ ]+").mkString(" ")

cleanedText
}

val removePunctuationAndSpecialChar = udf {
(text: String) =>
val regex = "[\\.\\,\\:\\-\\!\\?\\n\\t,\\%\\#\\*\\|\\=\\(\\)\\\"\\>\\ posts(s)
val YoutubeTypeConverter = new YoutubeTypeConverter()
YoutubeTypeConverter.prepare()

val pages_datums = channel_buf.flatMap(x => YoutubeTypeConverter.process(new StreamsDatum(x)))

Now we create our DataFrame and register it as a temp table. Notice we use our “cleaner function” aka removePunctuationAndSpecialChar to clean column summary. This gives us a nice clean summary, while preserving the original actor.summary. If actor.summary doesn’t make any sense as a column name, see when we get to querying the table- it has to do with how spark queries the nested fields of a JSON.

%spark
import org.apache.streams.jackson.StreamsJacksonMapper;
import sqlContext._
import sqlContext.implicits._

val mapper = StreamsJacksonMapper.getInstance();
val pages_jsons = pages_datums.map(o => mapper.writeValueAsString(o.getDocument))
val pagesRDD = sc.parallelize(pages_jsons)

val pagesDF = sqlContext.read.json(pagesRDD)

val cleanDF = pagesDF.withColumn("summary", removePunctuationAndSpecialChar(pagesDF("actor.summary")))
cleanDF.registerTempTable("youtube_pages")
cleanDF.printSchema

In the line where we create page_json we do a map over the pages_datums we had created just previously (which was type scala.collection.mutable.ArrayBuffer[org.apache.streams.core.StreamsDatum] if you’re in to that sort of thing), changing each StreamsDatum into a JSON. We then paralellize that array, and do the standard Spark things to create a DataFrame from the RDD. Hocus-pocus and our channels have now been processed into a temp table called youtube_pages. The process is repeated for our users’ posts.

%spark
import org.apache.streams.core.StreamsDatum
import com.youtube.processor._
import scala.collection.JavaConversions._
//Normalize activities -> posts(s)
val YoutubeTypeConverter = new YoutubeTypeConverter()
YoutubeTypeConverter.prepare()

val useractivity_posts = buf.flatMap(x => YoutubeTypeConverter.process(new StreamsDatum(x)))

And then…

%spark
import org.apache.streams.jackson.StreamsJacksonMapper;
import sqlContext._
import sqlContext.implicits._

val mapper = StreamsJacksonMapper.getInstance();
val jsons = useractivity_posts.map(o => mapper.writeValueAsString(o.getDocument))
val activitiesRDD = sc.parallelize(jsons)

val activitiesDF = sqlContext.read.json(activitiesRDD)

val cleanDF = activitiesDF.withColumn("content", removePunctuationAndSpecialChar(activitiesDF("content")))
cleanDF.registerTempTable("youtube_posts")
cleanDF.printSchema

Same idea, now we have a table called youtube_posts.

Let’s play.

At the end of the second paragraph for pages and posts above we called the .printSchema method on our DataFrame. This is helpful because it shows us … the schema of the DataFrame, which in a non-flat format such as JSON can be very useful. In general to traverse this schema we use field.subfield.subsubfield to query these things. Consider the following SQL statment:

%spark.sql
select actor.id
, actor.displayName
, summary
, actor.extensions.followers
, actor.extensions.posts
, extensions.youtube.statistics.viewCount from youtube_pages

and

%spark.sql
select id
, published
, actor.id
, actor.displayName
, content
, title from youtube_posts

Screen Shot 2016-12-09 at 4.19.19 PM.png

Here we see, I’m not such a popular guy on YouTube; 1 follower, 23 views, 1 post (a recent Apache Mahout Talk). My chance at being one of the great YouTube bloggers has passed me by, such is life.

Conclusions

The story arch of this series on Apache Streams has reached its zenith. If you have been following along, you’ll notice there isn’t really so much new information per Apache Streams anymore- the posts are more dedicated to ‘How to get your credentials for provider X’. I was thinking about doing one last one on Google+ since the credentialing is essentially the same, and you only need to change a couple of lines of code to say Google instead of YouTube.

In the dramatic conclusion to the Apache Streams mini-saga, we’re going to combine all of these together into a ‘quasi-app’ in which you collect all of your data from various providers and start merging it together using Spark SQL, visualizing with Angular, Zeppelin, and D3Js, and a little machine learning to see if we can’t learn a little something about our selves and the social world/bubble we (personally) live in.

Deep Magic, Volume 3: Eigenfaces

This week we’re going to really show off how easy it is to “roll our own” algorithms in Apache Mahout by looking at Eigenfaces. This algorithm is really easy and fun in Mahout because Mahout comes with a first class distributed stochastic singular value decomposition Mahout SSVD.

This is going to be a big job, and assumes you are have HDFS, and a small cluster to work with. You may be able to run this on your laptop, but I would recommend following the last post on setting up a small cloud-based hadoop cluster with Apache Zeppelin.

Eigenfaces are an image equivelent(ish) to eigenvectors if you recall your high school linear algebra classes. If you don’t recall: read wikipedia otherwise, it is a set of ‘faces’ that by a linear combination can be used to represent other faces.

Step 1. Get Data

The first thing we’re going to do is collect a set of 13,232 face images (250×250 pixels) from the Labeled Faces in the Wild data set.

Because some of these shell commands take a little while to run, it is worth taking a moment to set shell.command.timeout.millisecs in the sh interpreter to 600000.

My first paragraph, I’m going to make a directory for my final eigenface images that Zeppelin’s HTML loader can use. Then I’m going to download and untar the dataset. Finally, I’ll put the dataset into the tmp folder in HDFS.

%sh

mkdir zeppelin-0.7.0-SNAPSHOT/webapps/webapp/eigenfaces

wget http://vis-www.cs.umass.edu/lfw/lfw-deepfunneled.tgz
tar -xzf lfw-deepfunneled.tgz
hdfs dfs -put /home/guest/lfw-deepfunneled /tmp/lfw-deepfunneled

Step 2. Add dependency JARs

We use the Scala package scrimage to do our image processing, so we’ll want to add those jars. Also, there is a bug we’re working on in Mahout where broadcasting vectors doesn’t work so well in shell / Zeppelin mode. To get around this, I have added the transformer I need in custom jar of algorithms I use.

MAHOUT-1892: Broadcasting Vectors in Mahout-Shell

If that issue is still open, you’ll need to clone and build my branch (or make your own jar).

git clone https://github.com/rawkintrevo/mahout
cd mahout
git checkout mahout-1856
mvn clean package -DskipTests

Finally, assuming you are using Big Insights on Hadoop Cloud, you’ll need to copy the jar to the cluster.

scp algos/target/mahout-algos_2.10-0.12.3-SNAPSHOT.jar username@bi-hadoop-prod-XXX.bi.services.us-south.bluemix.net:/home/guest/mahout-algos_2.10-0.12.3-SNAPSHOT.jar

Changing username to your username and bi-hadoop-prod-XXX.bi.services.us-south.bluemix.net to your server address.

Back in Zeppelin, load the dependencies:

%sparkMahout.dep

z.load("com.sksamuel.scrimage:scrimage-core_2.10:2.1.0")
z.load("com.sksamuel.scrimage:scrimage-io-extra_2.10:2.1.0")
z.load("com.sksamuel.scrimage:scrimage-filters_2.10:2.1.0")

// add EXPERIMENTAL mahout algos
// https://github.com/rawkintrevo/mahout/tree/mahout-1856/algos
z.load("/home/guest/mahout-algos_2.10-0.12.3-SNAPSHOT.jar")

Step 3. Setup the Mahout Context

This step imports the Mahout packages and sets up the Mahout Distributed Context

%sparkMahout.spark

import org.apache.mahout.math._
import org.apache.mahout.math.scalabindings._
import org.apache.mahout.math.drm._
import org.apache.mahout.math.scalabindings.RLikeOps._
import org.apache.mahout.math.drm.RLikeDrmOps._
import org.apache.mahout.sparkbindings._

@transient implicit val sdc: org.apache.mahout.sparkbindings.SparkDistributedContext = sc2sdc(sc)

Step 4. Create a DRM of Vectorized Images

When doing image processing, we want a Matrix where each image is represented as a vector of numbers, where the number correspond to each pixel.

%sparkMahout.spark

import com.sksamuel.scrimage._
import com.sksamuel.scrimage.filter.GrayscaleFilter

val imagesRDD:DrmRdd[Int] = sc.binaryFiles("/tmp/lfw-deepfunneled/*/*", 500)
       .map(o => new DenseVector( Image.apply(o._2.toArray)
       .filter(GrayscaleFilter)
       .pixels
       .map(p => p.toInt.toDouble / 10000000)) )
   .zipWithIndex
   .map(o => (o._2.toInt, o._1))

val imagesDRM = drmWrap(rdd= imagesRDD).par(min = 500).checkpoint()

println(s"Dataset: ${imagesDRM.nrow} images, ${imagesDRM.ncol} pixels per image")

sc.binaryFiles is the spark way to read in our images. We set out partitioning to 500, and map each image into a scrimage object which drops the color information (we don’t need it for feature mapping) and converts the image to an array of large integers. We scale the integers down, and use the array to create a Mahout DenseVector. Finally we zipWithIndex and then use drmWrap to transform our RDD into a DRM.

Step 5. Subtract Means

IF vector broadcasting in the shell were working (it may be by the time you read this), this is how we would subtract our means.

%sparkMahout.spark

// How you would do this without help
// Doesn't work re BUG: MAHOUT-1892
val colMeansV = imagesDRM.colMeans
val bcastV = drmBroadcast(colMeansV)

val smImages = input.mapBlock(imagesDRM.ncol) {
case (keys, block) =>
      val copy: Matrix = block.cloned
      copy.foreach(row => row -= bcastV.value)
      (keys, copy)
}

We use the .colMeans method to get the colmeans, and then do a mapBlock on the images matrix, subtracting the mean from each image.

Since at the moment the above does not work, here is the hack to do it.

%sparkMahout.spark

// EXPERIMENTAL https://github.com/apache/mahout/pull/246
import org.apache.mahout.algos.transformer.SubtractMean

// Subtract Mean transforms each row by subtracting the column mean
val smTransformer = new SubtractMean()

smTransformer.fit(imagesDRM) // calculuates the column mean
val smImages = smTransformer.transform(imagesDRM) // return new DRM of subtracted means

smImages.checkpoint()

Again, this is only needed as a work around. If you’re interested in how easy it is to package your own functions into a jar- check out the SubtractMean source code

Step 6. Distributed Stochastic Singlar Value Decomposition

Based primarily on Nathan Halko’s dissertation most of the hard part here is done for us.

%sparkMahout.spark

import org.apache.mahout.math._
import decompositions._
import drm._

val(drmU, drmV, s) = dssvd(smImages, k= 20, p= 15, q = 0)

Here k is rank of the output e.g. the number of eigenfaces we want out. p is oversampling parameter, and q is the number of additional power iterations. Read Nathan’s paper or see ssvd docs for more information.

The above will take some time- about 35 minutes on a 5-node Big Insights Cloud Cluster.

drmV will be contain our Eigenfaces (transposed).

drmU will tell us the composition of each face we fed into the algorithm. For example

%sparkMahout

drmU.collect(0 until 1, ::)

yields

{0:-5.728272924185402E-4,1:0.005311020699576641,2:-0.009218182156949998,3:0.008125182744744356,4:0.001847134204087927,5:-0.01391318137456792,6:0.0038760898878500913,7:0.0022845256274037842,8:0.007046521884887152,9:0.004835772814429175,10:0.00338488781174816,11:0.009318311263249005,12:-0.005181665861179919,13:-0.004665157429436422,14:0.003072181956470255,15:-0.01285733757511248,16:0.005066140593688097,17:-0.016895601017982726,18:-0.012156252679821318,19:-0.008044144986630029 ... }

Which implies the first image = .005 * eigenFace1 + -.009 * eigenFace2 + …

Step 7. Write the Eigenfaces to disk

We want to SEE our eigenfaces! We use scrimage again to help us reconstruct our calculated eigenfaces back in to images.

%sparkMahout.spark

import java.io.File
import javax.imageio.ImageIO

val sampleImagePath = "/home/guest/lfw-deepfunneled/Aaron_Eckhart/Aaron_Eckhart_0001.jpg"
val sampleImage = ImageIO.read(new File(sampleImagePath))  
val w = sampleImage.getWidth
val h = sampleImage.getHeight

val eigenFaces = drmV.t.collect(::,::)
val colMeans = smImages.colMeans

for (i <- 0 until 20){
    val v = (eigenFaces(i, ::) + colMeans) * 10000000
    val output = new Array[com.sksamuel.scrimage.Pixel](v.size)
    for (i <- 0 until v.size) {
        output(i) = Pixel(v.get(i).toInt)
    }
    val image = Image(w, h, output)
    image.output(new File(s"/home/guest/zeppelin-0.7.0-SNAPSHOT/webapps/webapp/eigenfaces/${i}.png"))
}

First we load a sampleImage so we can get the height and width. I could have just opened it up in an Image viewer, but y’know, code. drmV has our eigenfaces as columns, so we transpose that. Also recall we subtracted the column means when we in-processed the images, so we’ll need to add that back.

We do a simple for loop over the eigenFaces, basically undoing the inprocessing we did. We then create an array of pixels, we do that by iterating through the vector. Finally we create the image with the width and height, and save it to the directory we set up. ZEPPELIN_HOME/webapps/webapp/ is the root directory for the %html interpreter, so we save our images in a directory there (we created this directory at the beginning).

Step 8. Display the Images

Scala is fun, but for simple string and list manipulation, I love Python

%python

r = 4
c = 5
print '%html\n<table style="width:100%">' + "".join(["<tr>" + "".join([ '<td><img src="eigenfaces/%i.png"></td>' % (i + j) for j in range(0, c) ]) + "</tr>" for i in range(0, r * c, r +1 ) ]) + '</table>'

All that does is create an html table of our images.

Screenshot from 2016-11-10 16-26-02.png

And that’s it!

I hope you enjoyed this one as much as I did, please leave comments and let me know how it went for you.

Deep Magic Volume2: Absurdly Large OLS with Apache Mahout

In this post we’re going to really show off the coolest (imho) use-case of Apache Mahout – roll your own distributed algorithms.

All of these posts are meant for you to follow-along at home, and it is entirely possible, you don’t have access to a large YARN cluster. That’s OK. Short story- they’re free on IBM’s BlueMix, where you can:
1. Sign up for a free 30-day account
2. Setup a 5-Node BigInsights (IBMs Hadoop Distro) on Cloud
3. Install Apache Zeppelin and Apache Mahout
4. Run an absurdly large OLS algorithm

The benefit of using a Hadoop as a Service environment is that for analytics, our dataset might not be HUGE enough that uploading is prohibitive, but big enough that we need more horse power than a single node can provide. Being able to spool up a cluster, upload a few files, do some work on it with 5 nodes/240GB RAM/ 48 Virtual processors- and then throw the environment away has a lot of use cases.

In this post we’re doing to write an Ordinary Least Squares algorithm, and then run it on a dataset of 100 million observations by 100 columns (dense).

This is a simplest of algorithms to get you started. Once you have this power at your finger tips, implement any algorithm you find specified in matrix form (which is most of them in academic papers) in Spark or Flink with ease. No longer are you tied to what ever the powers that be running SparkML and FlinkML decide to implement!

Disclosure: I’m working for IBM now, this isn’t an unsolicited sales pitch. The notebook available for part 4 will be available and will run on any Zeppelin instance (assuming you’ve followed the directions for setting up Mahout on Apache Spark listed in a previous post). If you are running Zeppelin locally, you also won’t be able to go as big as we do.

Step 1. Sign up for BlueMix-

Link to Signup

Step 2. Setup a 5-Node BigInsights on Cloud

In short IBM Big Insights on Cloud is Hadoop-as-a-Service. It’s also fairly simple to setup.

Log in to BlueMix. In the navigation bar you should see “Catalog”. Click that, and then search “BigInsights”.

Screen Shot 2016-10-13 at 1.35.31 PM.png

There are a few different UIs floating around at the moment, if you can’t find a way to create BigInsights, this link might help (must be logged in). Link

Click on BigInsights. In the next window, there should be a button that says ‘Create’ somewhere towards the right or bottom-right. (Old vs. new UI). From this point on, everything should look the same, so I’ll post more screen shots.

On the next screen click “Open”, and on the screen following that click “New Cluster”.

You should now be at a screen that looks like this:

Screen Shot 2016-10-13 at 1.45.09 PM.png

The cluster name/ username / password don’t really matter.

In the following section, make sure to set the following:

  • Number of Data Nodes: 5 (current max for free users)
  • IBM Open Platform Version: IOP 4.2 (4.3 has Spark 2.0- gross).
  • Optional Components: Make sure Spark is checked.

Screen Shot 2016-10-13 at 1.48.40 PM.png

Click “Create” and then go grab some coffee or whatever. It will take about 5-10 minutes to setup.

After you click create you’ll be taken to a screen talking about your cluster.

In the top center you will see SSH Host and a value below that looks something like bi-hadoop-prod-XXXX.bi.services.us-south.bluemix.net, where the XXXX will be four numbers. You will need this value for the next step.

Step 3. Install Apache Zeppelin and Apache Mahout

As a proud new IBM employee, I get unfettered access to these little clusters, to celebrate I’ve done a little Python witchcraft for quickly installing and my favorite services into BigInsights Cloud instances. These scripts also open up some ports in the firewall as needed for WebUIs (especially on Zeppelin, Apache NiFi, and Apache Flink).

The scripts in essence do the following:
– ssh in to the cluster
– download the binaries of the desired program (Zeppelin, NiFi, Flink, etc.)
– untar the program
– Upload some config files specific to BigInsights
– Start the service
– If a WebUI is necessary, a BlueMix app will be created which establishes an ssh tunnel between the world wide web and the port of the service (e.g. 8080 for Zeppelin).

Skip to the end of this section to see what I mean regarding the BlueMix app.

rawkintrevo’s scripts for installing extra services on BigInsights Cloud on github

The following assumes you are running on some sort of Ubuntu. The principals are the same, but you might need to take different steps to make this work on CentOS or OSX.

Obviously you need Python.

You also need the Python modules paramiko and scp, which are used for using ssh with Python. To install these in Ubunbu, from the command line run:
sudo apt-get install python-paramiko
sudo apt-get install python-scp

Next you will need to install Cloud Foundry and IBM’s Cloud Foundry bonus packs.

Install CloudFoundry

IBM BlueMix Docs

On Ubuntu, it’s going to look something like this-

Download Cloud Foundry Installer All Installers:

wget https://cli.run.pivotal.io/stable?release=debian64&amp;version=6.22.1&amp;source=github-rel

Unpack and install Cloud Foundry:

sudo dpkg -i ./cf-cli-*.deb &amp;&amp; apt-get install -f

Set BlueMix Plugin Registry Endpoint:

cf add-plugin-repo bluemix-cf-staging https://plugins.ng.bluemix.net

Install BlueMix Plugins:

cf install-plugin active-deploy -r bluemix-cf-staging
cf install-plugin bluemix-admin -r bluemix-cf-staging

Login to BlueMix via CloudFoundry:

cf login -a https://api.ng.bluemix.net

Now, clone my BlueMix extra services repo:

git clone https://github.com/rawkintrevo/bluemix-extra-services

In ./bluemix-extra-services you’ll find a script example_zeppelin_mahout.py. Open it up and set the following variables:
APP_PREFIX : Whatever you set this to, Zeppelin will become available at http://APP_PREFIX-zeppelin.mybluemix.net
SERVER : The name of the SSH Host from the end of Step 2. (not the name you assigned but the ssh address!)
USERNAME : Username you entered when you created the server
PASSWORD : Password you entered when you created the server

Note on S3 notebook repositories

You’ll see some commented out section about S3 notebooks. BigInsights free clusters only persist for 14 days. When they expire- so do all of your notebooks, if not persisted. You can use AWS S3 to persist notebooks so they always pop up when create a new cluster. If you have an AWS account, you can create a bucket, and set S3_BUCKET to that value. In that bucket create a folder, set S3_USERNAME to whatever that folder is called. In that folder, create a folder called notebook. There is a link also commented out that explains this further. A little ways down you will see a line commented out z.setS3auth(S3_USERNAME, S3_BUCKET). Uncomment that line to have the script update the config files to use your S3 bucket at a notebook repository.

Finally, in S3. Click on your name at the top right, there will be a drop down. Click on “Security and Credentials”. A window will pop up, click “Continue …”. In the page in the center, click on “Access Keys (Access Key ID and Secret Access Key)”. Click on the blue button that says, “Create New Access Key”. Click on “Download key file”. Save the file as /path/to/bluemix-extra-services/data/resources/aws/rootkey.csv.

These steps are required. That makes this demo a bit more tedious, but in general is good, because then all your Zeppelin notebooks follow you where ever you go. (In the future I’ll probably update so this isn’t required, I’ll update the post then.)

NOTE on S3 charges Using S3 will incur a cost… My notebooks cost me about 14 cents per month.

The important thing to do before running this script is to make sure your cluster is fully set up. To do this, in the browser tab where you created your cluster, click the little back arrow to get to the Cluster list. You will have one cluster listed, named what ever you named it. It should have a little green circle and say “Active”. If it is a yellow circle and says “Pending”, wait a little longer or hit the refresh button at the top of the list. If it is a red circle and says “Failed” has happened to me about one time in 100, hover over the little gear to the right, click “Delete Cluster”, then create a new one again. If that happens be advised your SSH Host will have changed.

Once the cluster is active and you have updated and saved the Python script, run it- it will give you some standard out that should end in

webapp should be available soon at `http://APP_PREFIX-zeppelin.mybluemix.net

where APP_PREFIX is whatever you described it as.

This script is downloading various packages, uploading config files, and finally starting a simple python web app that establishes an SSH tunnel from the webapp to the webUI of the service on the BigInsights cloud cluster.

Link to template webapp.

More information on Bluemix Apps

Step 4. Run an absurdly large OLS algorithm

So you’ve made it this far, eh? Well done. Crack a beer- the rest is pretty easy. Apache Mahout is a library that allows you to quickly ‘roll your own’ algorithms based on matrix representations, and run them on your favorite distributed engine (assuming that engine is either Apache Spark, Apache Flink, or H20).

Now- we’re assuming you followed Step 3. to create a YARN cluster, and as such you have a Spark interpreter with all of the appropriate Mahout dependencies and settings. If you didn’t follow step 3, that’s ok- just make sure to create the Mahout interpreter following the steps found in this previous post.

The first paragraph you need to run when using the Mahout interpreter in Zeppelin imports Mahout and sets the distributed context.

Initial Mahout Imports

%sparkMahout

import org.apache.mahout.math._
import org.apache.mahout.math.scalabindings._
import org.apache.mahout.math.drm._
import org.apache.mahout.math.scalabindings.RLikeOps._
import org.apache.mahout.math.drm.RLikeDrmOps._
import org.apache.mahout.sparkbindings._

implicit val sdc: org.apache.mahout.sparkbindings.SparkDistributedContext = sc2sdc(sc)

The example we are going to do today very closely follows the Playing With the Mahout Shell post from the Mahout website, except with a twist. In our version we are going to use import org.apache.spark.mllib.util.LinearDataGenerator to create a very large data set to crunch.

Synthesize some Linear Data

%sparkMahout

import org.apache.spark.mllib.util.LinearDataGenerator
val n = 100000000
val features = 100
val eps = 0.1 // i'm guessing error term, poorly documented
val partitions = 5500
val intercept = 10.0

val synDataRDD = LinearDataGenerator.generateLinearRDD(sc, n, features, eps, partitions, intercept)

Now we have a very large dataset, we need to convert it into a Mahout Distribute Row Matrix (think the Mahout equivelent of a RDD). A good primer on Mahout can be found here.

Create DRM from RDD

%sparkMahout
val tempRDD = synDataRDD.zipWithIndex.map( lv =&gt; {
val K = lv._2.toInt
val x = new DenseVector(lv._1.features.toArray )
//x = sparkVec2mahoutVec( lv._1.features ) // still doesn't serialize
val y = lv._1.label
(K, (y, x))
}).persist

println(&quot;----------- Creating DRMs --------------&quot;)
// temp RDD to X an y
val drmRddX:DrmRdd[Int] = tempRDD.map(o =&gt; (o._1, o._2._2))
val drmX = drmWrap(rdd= drmRddX)
val drmRddY:DrmRdd[Int] = tempRDD.map(o =&gt; (o._1, new DenseVector( Array(o._2._1) )))
val drmy = drmWrap(rdd= drmRddY)

Also note, the only reason we are using Spark instead of Flink here, is that SparkML comes with this nice linear data generator. Assuming you were loading your data from some other source, the following will code will run on Mahout on Spark OR Mahout on Flink.

For those a little rusty on Ordinary Least Squares method of regression:

screen-shot-2016-10-13-at-2-48-15-pm

Mahout OLS

%sparkMahout

val drmXtX = drmX.t %*% drmX
val drmXty = drmX.t %*% drmy
val beta = solve(drmXtX, drmXty)

And that’s it. An R-Like DSL for Scala that runs inside your Spark code and can be copy-pasted to Flink code.

Play with the paragraph that synthesizes data. This is a good exercise on how Spark partitioning strategies can affect performance. I played with it for an hour or so- 100 million rows by 100 columns was the largest I could get to run on the BigInsights Cloud ( which has approx 240GB RAM and 48 processors ).

In this post we spent a lot of time setting up an environment and not much time doing anything with it. The onus is now on you to go implement algorithms. In posts that follow, I intend to refer back to this post for setting up an environment that has the horse power to calculate big jobs. Because if you’re not going to be distributed then you have to ask your self, why not just do this in R?

tg

When things go wrong.

Two types of errors can happen in general:
– Zeppelin failed to start correctly
– The webapp failed to start

If you go to the address given and see a 404 error (make sure you typed it correctly), odds are the webapp failed. From the dashboard in BlueMix you should see your webapp, click on it and then click on logs. See if there are any clues.

If you go to the address and see a 502 error, Zeppelin didn’t start. Check the standard out from when you ran the program and look for errors.

If you go to the address and see a 503 error, the web app is tunneling correctly, but Zeppelin isn’t serving it data.

In a terminal ssh in to the cluster as follows:

ssh username@ssh-host

where username is the user name you picked in step 2, and ssh-host is the host given in SSH Host.

If you had a 502 error, from here you can manually start Zeppelin with

z*/bin/zeppelin-daemon.sh start

Then try the website again.

The BigInsights cluster is restrictive to say the least. Use tail to see the end of the logs.

tail z*/logs/*out -n 50

Look for something about AWS credentials not being accepted. If that happens, STOP ZEPPELIN

z*/bin/zeppelin-daemon.sh stop

Delete the zeppelin directory

rm -rf zeppelin-0.7.0-SNAPSHOT

Double check the rootkey.csv (maybe re download?) and run the python script again.

If you don’t see anything, at this point it is just standard Zeppelin troubleshooting.

How to get paid a gagillion dollars building crazy big data stuff on nights and weekends.

UPDATE 5-23: The world and ecosystem of Big Data evolves quickly.  Most of these tools have gone through multiple releases since I first penned this article.  I’ve tried to update accordingly. Good hunting.

That title is a lie, probably.

Spark is the hot new thing in big data. Flink will be the hot new thing in big data as internet-of-things, real-time analytics, and other buzz-words go from being stary-eyed promises made in sales pitches to things that actually happen. At the end of the day, most people don’t want to get their hands dirty monkeying around with the mechanics of this stuff, they just want a pretty web interface to use.

So to get paid a gagillion dollars, you basically just start tinkering with this and maybe contribute a bit here and there, then in 2 years when Flink and Spark are the new Microsoft Excel, you’re one of a couple thousand people in the world who have been working with this stuff for over a year. #!/bin/sh (pronounced ‘sha-bang’, more computer jokes, I digress) you’re getting paid a gagillion a year.

Let’s be real. Your proposed analytics stack could do some sci-fi black-magic analytics that perfectly predicts all of your KPIs, lottery numbers, and what color of ChuckTs you should rock with that awesome new dinosaur sweater you just got, but if it doesn’t have a sparkly, friendly, not-scary front-end you’re going to have a hard time getting any traction with it. (If you’re not doing this sort of thing day-to-day, then I’m sorry to say, this is the reality of things: people are inherently uncomfortable with submitting jobs via command line.)

Use Case #2: Having a pretty front end for your Spark / Flink like DataArtisans or DataBricks is nice, but for whatever reason you can’t put your data out on some cloud.

Because that one time that this happened…

So with out further ado, I present a nice recipe for setting up Apache Flink, Apache Spark, and Apache Zeppelin(incubating) in big-boy mode. (big-boy mode: Zeppelin comes pre-packaged with Flink and Spark, but you want to be pointing at full blown clusters of both of these because, y’know, science).

Ingredients:

Prep time: 2 hours.

Skill Level: Knows just enough to be dangerous

But really, this is a heavily documented (over documented?) recipe. It assumes no familiarity with linux and provides little blurbs and links about what each command is doing.  I am an autodidact when it comes to computers and while blindly following recipes is nice for getting something going it doesn’t teach you much and if the slightest thing goes wrong you are totally lost. So proceed with no fear.

Step 0:

Because jebus-only-knows what kind of wierd-o setup any given box could have, I present this recipe on a fishbone (that is minimal install) Ubuntu Server 14.04.3 virtual machine.  For demo purposes, this minimizes instances where

  1. something weird is causing unique problems and
  2. I miss a dependency because I have it on my computer, but you may not.

There are lots of tutorials on how to install Ubuntu on a virtual machine, but to be honest, if this step scares you, you should plan on this being a full day project or more.  It’s really nothing more than a recipe, but you’re going to be doing a lot of learning along the way.

Make sure to setup the virtual box to use a bridged network adapter. A good write up on the differences can be found here. In short, in Oracle VirtualBox go in to the machine Settings -> Networking and select Bridged Adapter from the drop down.

Select Bridged Adapter from the drop down menu.
Select Bridged Adapter from the drop down menu.

A bridged adapter basically has the fewest restrictions on the virtual machine. We’re going to be installing a lot of things that will be accessing ports on the host machine and this is just going to be a lot simple. If you’re curious go read more.

Gotchya

Throughout this tutorial I’ll point out things that seem trivial but if you variate from the script even slightly can totally derail you. I do this because I am not one to carefully follow directions and provide this information for others like me. In this case, the ‘gotchya’ is if the virtual machine is already running when you change this setting you need to restart the machine for the changes to take affect.

Install Ubuntu 14.04.3

A good tutorial is here.

Gotchya

I only attest to this recipe working on a bare bones Ubuntu 14.04.3 Server installation (the link to exactly what I used is in the ingredients). If you decide to use another version of Ubuntu or flavor of Linux, you may have to tweak some things. Zeppelin, Flink, and Spark are all written in Java/Scala so theoretically this could be done on OSX or Windows, but you wouldn’t run a cluster on Windows or OSX boxes, and for a number of other reasons, I’ve chosen Ubuntu 14.04.3. Get this working then try to do something weird if you want.

Step 1- Prepping the Box

Some basic applications that will be required by the programs we are going to use. Software in Ubuntu is managed via apt. Generally speaking there are three ways to get software.

  1. Download from the repository
  2. Download binaries directly
  3. Download and compile from source

In this episode, we’ll be doing all three. If you’re coming from windows and used to everything being pre-compiled for you with a nice pretty GUI installer… I don’t know what to tell you, other than ‘Welcome to Linux, this is life now.’

Any time you see the sudo apt-get we are telling the computer:

  1. sudo literally: super user do
  2. apt-get install use the package manager to install the requested software.

So we are using apt-get to install:

git

git is a program for software version control. We’ll be using to download the latest source code for programs we’re going to compile.

sudo apt-get install git
openssh-server

A basic Secure Shell server.

sudo apt-get install openssh-server
OpenJDK 7

The Java Development Kit version 7.

sudo apt-get install openjdk-7-jdk openjdk-7-doc openjdk-7-jre-lib

But Maven 3+ isn’t in the repository at the time of this writing. That is to say, if we use apt-get we will get a version of maven that is to old for what we need. So for maven, we are going to download a binary distribution and manually copy it into place.

Maven 3.1+

In the same way apt-get is a neat way to manage software which is kept in a repository, maven manages code libraries for us. Check out https://maven.apache.org/ for more info.

If you already have maven installed, we can use apt to remove software as well as install it.

sudo apt-get purge maven maven2

*note if you don’t have maven installed, this command isn’t going to hurt anything, you’re just going to see an error message about how there was nothing to uninstall.

Installing maven 3.3.3 quick and dirty
Download the maven 3.3.3 binary

wget is a CLI (command line interface) downloader.

wget "http://www.us.apache.org/dist/maven/maven-3/3.3.3/binaries/apache-maven-3.3.3-bin.tar.gz"

Unzip the binary (tar -zxvf) and then move (sudo mv) it to /usr/local

tar -zxvf apache-maven-3.3.3-bin.tar.gz
sudo mv ./apache-maven-3.3.3 /usr/local

Finally, create a symbolic link (ln -s) from /usr/bin to the new version. /usr/bin is one of the places Ubuntu looks for programs by default.

sudo ln -s /usr/local/apache-maven-3.3.3/bin/mvn /usr/bin/mvn

Installing Zeppelin

At the time this article went to press, the main branch of Zeppelin didn’t have support for Flink 0.10 (which at the time of press was the current stable release of Flink). There is a discussion here, but the short version is you either need to hack Zeppelin yourself or use Till Rohrmann’s branch.  For parsimony, I present the second method and leave ‘How to hack out Zeppelin’ as content for another post…

First, git clone Apache Zeppelin. This is the third method of getting software discussed earlier. We’re downloading source code to compile.

git clone https://github.com/tillrohrmann/incubator-zeppelin.git
git clone https://github.com/apache/incubator-zeppelin.git

UPDATE 5-23: Originally we wanted to get Flink v0.10 and Till’s branch had this, now the real Zeppelin is updated to branch 1.0, so we got to the (real) source.

change directory (cd) to incubator-zeppelin

cd incubator-zeppelin

A git can have multiple branches. A good overview is here. We want to checkout the branch that Till made with the Zeppelin configuration for flink-0.10-SNAPSHOT.

git checkout flink-0.10-SNAPSHOT

Now we instruct maven (mvn) to clean and package the source found in the directory (more on maven build life-cycles). Additionally we pass flags to maven instructing it to build against Spark version 1.5 (-Pspark-1.5) and to skip the tests that make sure it compiled correctly.  See Gotchya below.

UPDATE 5-23: This all seems to be working now.  We add flags -Psparkr -Ppyspark -Pspark-1.6 to make Zeppelin build against Spark 1.6 (included since last time), add support for SparkR and support for pyspark.  At the time of writing -Dflink.version=1.0 isn’t necessary, but will hopefully keep this working for a little while longer, especially after Flink v1.1 is released.

mvn clean package -DskipTests -Psparkr -Ppyspark -Pspark-1.6 -Dflink.version=1.0
Gotchya:

I explicitly didn’t use the -Pspark-1.5 flag. If I had, it would have built Zeppelin with an internal Spark interpreter at version 1.5. I was having all sorts of issues when doing this, and finally rolled back to make this a simple-as-possible case. If you want to try your hand at Spark 1.5, then add that flag and in the next section when you install Spark, checkout version 1.5 instead.

The maven build will take a little while (26 minutes for me). When it is done, you should see a message saying BUILD SUCEESS and some statistics.

And finally…drumroll… the moment you’ve all been waiting for … start the Zeppelin daemon.

sudo bin/zeppelin-daemon.sh start
Gotchya

You must use sudo when you start the zeppelin deamon. The onus is on you to remember to do this. It is absolutely possible to start the daemon without sudo and you will be able to run the Flink example listed below, however the Spark example won’t work. The Zeppelin internal Spark interpreter needs super user privileges for creating databases and other various writes.

UPDATE 5-23: Don’t use sudo .  If you do it once, you’ll have to do it always, and having Zeppelin running as super user is unwise and unnecessary.

Test flight of our new Zeppelin…

First determine the local IP address of the machine hosting Zeppelin.

ifconfig
This is the ouptput from my machine, your numbers will be different.
This is the output from my machine, your numbers will be different.

See that the IP of my machine is 192.168.1.109, yours will be different. In subsequent screenshots, in the browser address you will see this IP, however for those following along at home, you need to use your own IP address.

Open a browser and surf to http://yourip:8080, where yourip is the IP you found in the inet addr: field under the eth0 section. Port 8080 is the default port of the Zeppelin WebUI.

Guten Tag, Zeppelin.
Guten Tag, Zeppelin.

Open the Tutorial Notebook by clicking on Notebook -> Zeppelin Tutorial

Do this.
Do this.

When you open the tutorial notebook it will ask you to bind the interpreters, just do it by clicking save. Now run all of the examples in the notebook to make sure they are working. You can do this by going to each cell and clicking Shift+Enter or by clicking the little play button at the top of the note.

run all zeppelin

Now we are going to do a couple of simple examples in Flink and Spark. Zeppelin comes pre-built with its own Flink and Spark interpreters, and will use these until we have it pointed at our own cluster (which happens later). For now, we’re going to test some basic functionality of Zeppelin by running a Flink and a Spark word count example against the internal interpreters.

Flink Example

First, create a new notebook. Do this by clicking on Notebook -> Create New Notebook. Name this notebook “flink example”. Zeppelin doesn’t automatically open the notebook you’ve created, you’ll have to click on Notebook again and the name you gave the new notebook will appear in the list.

You can find a Flink word count gist here. Copy and paste the code from the gist into the Zeppelin note and either hit Shift+Enter or click the play button to run the paragraph.

Hopefully you see something like this...
Hopefully you see something like this…
Spark Example

Create another new notebook, call this one “spark example”, and open it.

Copy and paste the gist from here.

Assuming your examples are working go ahead and stop Zeppelin.

bin/zeppelin-daemon.sh stop

Installing Flink and Spark Clusters

There are lots of how to guides for getting full blown Flink and Spark clusters set up. For this example, we’re just going to install a stand alone of each. The important thing in this tutorial is how to get Zeppelin aimed at Flink and Spark instances outside of the ones that come prepackaged. These external versions can be scaled/built/setup to suit your use case.

Download, checkout, and start Flink
Download

We change directory back to our home directory

cd $HOME

Then clone the Apache Flink repository

git clone https://github.com/apache/flink.git

Then check out release-0.10  1.0

git checkout release-0.10
git checkout release-1.0

UPDATE 5-23: We’re on release 1.0 now.  Release 1.1 (which is what the master branch is on has some cool new stuff like streaming in the shell, but will also break backwards compatibility, e.g. it won’t work). I have a PR that makes it work, but I’ll save that for a future blog post.

And finally, build the package.

mvn clean package -DskipTests

Building Flink took 20 minutes on my virtual box.

Start Flink

Now start Flink with the following command:

build-target/bin/start-cluster.sh

Now go to http://yourip:8081 and check out the Flink web-ui.

Oh, hello there new friend.
Oh, hello there new friend.

Make sure there is something listed under the task manager. If there is nothing stop and restart the flink cluster like this:

build-target/bin/stop-cluster.sh
build-target/bin/start-cluster.sh
Download, checkout, and start Spark
Download

We change directory back to our home directory

cd $HOME

Then clone the Apache Spark repository

git clone https://github.com/apache/spark.git

Then check out branch-1.4 1.6

git checkout branch-1.4
git checkout branch-1.6 -Psparkr

UPDATE 5-23: We’re on branch 1.6 now, and we want SparkR support.

And finally, build the package.

mvn clean package -DskipTests

Building Spark took 38 minutes on my virtual box.

Start Spark

In a cluster, you have a boss that is in charge of distributing the work and collecting the results and a worker that is in charge of actually doing the work. In Spark these are referred to as the master and slave respectively.

In Flink we could start an entire stand alone cluster in one line. In Spark, we must start each individually. We start the master with a flag --webui-port 8082. By default the webui-port is 8080, which is already being used by Zeppelin.

sbin/start-master.sh --webui-port 8082

Now go check out the Spark master web-ui. It will be at http://yourip:8082.

spark://nameofhost:7077
spark://MARKDOWN_HASH61a4bd20fbd00664bbfa5c55f8b64df2MARKDOWN_HASH:7077

Note the URL listed. spark://ubuntu:7077. My URL is ubuntu because that is the name of my host. The name of your host will be what ever you set it up as during install. Write this url down, because next we are starting the slave. We have to tell the slave who its master is.

sbin/start-slave.sh spark://yourhostname:7077

The argument spark://yourhostname:7077 lets the slave know who its master is. This is literally the master’s URL. If you have another computer with Spark 1.4 installed you could run this line again (substituting ubuntu for the IP address of the master machine) and add another computer to your cluster.

Gotchya

For those that are not reading carefully and just copying and pasting, you probably won’t see this for a while anyway, but I want to say again, unless you just happen to have named your host ubuntu you need to change that to what ever the name is you found for the Master URL in the Spark Web-UI…

Now go back to your master webui and you should see the slave listed under workers.
spark master with slave

Start Zeppelin

Now everything is technically up and running. All we have left to do, is start Zeppelin back up, tell it to run code against our clusters (instead of the internal interpreters), and check that our examples still work.

Start Zeppelin with the following

cd $HOME
incubator-zeppelin/bin/zeppelin-daemon.sh start

Now go back to the Zeppelin web-ui at http://yourip:8080 and this time click on Interpreters at the top of the screen.

In the Spark section, click the edit button in the top right corner to make the property values editable. The only field that needs to be edited in the Spark interpreter is the master field. Change this value from local[*] to the URL you used to start the slave, mine was spark://ubuntu:7077.

Edit the spark and flink interpreters.
Edit the spark and flink interpreters.

Click ‘Save’, then scroll down to the Flink section. Click ‘edit’ and change the value of host from local to localhost. Click ‘Save’ again.

Now open the Flink notebook we made earlier.

Hit Shift+Enter or hit the play button at the top of the notebook to run the paragraphs. Hopefully the result is the same as before. Now in a new tab, go to the Flink Web-UI at http://yourip:8081. You should see the job has completed successfully on the cluster.

It's beeeeeautiful!
It’s beeeeeautiful!

Now open the spark example notebook from earlier and rerun this as well. After this notebook has run successfully go to the Spark Web-UI at http://yourip:8082 and see the job has run on this cluster.

Is great success.
Is great success.
Gotchya

if the Spark job seems to hang, go to the Spark Web-UI. If there is a job listed under running applications, but there are no workers listed, the slave has died, go back to the command line and run

cd $HOME
spark/sbin/start-slave.sh spark://yourhostname:7077

where yourhostname is the hostname you have been using for the Master URL this whole time.

Necromancing the Spark slave.
Necromancing the Spark slave.

Summary

Dude (or dudette), crack a beer. You just set up two of the most cutting edge big data engines available today in a cluster mode with an up-and-coming cutting edge (and pretty) web interface. Seriously, not a trivial task. Have you checked your linkedIn inbox in the last hour? because you probably have about 30 recruiters blowing you up.

Seriously though, It took me a couple of days smashing my head against the wall to make this rig work right and consistently. Seeing as I just saved you so much time, I think the least you could do is head over to, and sign up for, and participate in the user mailing lists for

Open source software is a beautiful thing, but it relies on a strong community. All of the projects, (and many more) could use your help, but especially Zeppelin which is still in incubator status.

(A thank you to the wonderful developers wouldn’t hurt either, they watch the mailing lists).

UPDATE 5-23:  I let this go with out updates longer than I should have, and I’m sorry.  To be honest, I probably won’t do it again.  I’m older and wiser now, the things listed here should remain valid for sometime to come. The big changes are:

  • We use the actual Zeppelin branch (not Till’s)
  • We build against Flink 1.0
  • We build against Spark 1.6 with SparkR support.

Happy hacking,

tg