Using JNIs (like OpenCV) in Flink

For a bigger project which I hope to blog about soon, I needed to get the OpenCV Java Native Library (JNI) running in a Flink stream. It was a pain in the ass, so I’m putting this here to help the next person.

First thing I tried… Just doing it.

For OpenCV, you need to statically initialize (I’m probably saying this wrong) the library, so I tried something like this

val stream = env
.map(record => {

Well, that kicks an error that looks like:

Exception in thread "Thread-143" java.lang.UnsatisfiedLinkError:
Native Library /usr/lib/jni/ already loaded in
another classloader

Ok, that’s fair. Multiple task managers, this is getting loaded all over the place. I get it. I tried moving this around a bunch. No luck.

Second thing I tried… Monkey see- monkey do: The RocksDB way.

In the Flink-RocksDB connector, and other people have given this advice, the idea is to include the JNI in the resources/fat-jar, then write out a tmp one and have that loaded.

This, for me, resulted in seemingly one tmp copy being generated for each record processed.


* This is an example of an extremely stupid way (and consequently the way Flink does RocksDB) to handle the JNI problem.
* Basically we include in src/main/resources so then it creates a tmp version.
* I deleted from it resources, so this would fail. Only try it for academic purposes. E.g. to see what stupid looks like.
object NativeUtils {
   // heavily based on
    def loadOpenCVLibFromJar() = {

        val temp = File.createTempFile("libopencv_java330", ".so")

        val inputStream= getClass().getResourceAsStream("/")

        val os = new FileOutputStream(temp)
        var readBytes: Int = 0
        var buffer = new Array[Byte](1024)

        try {
           while ({(readBytes =
              readBytes != -1}) {
              os.write(buffer, 0, readBytes)
        finally {
        // If read/write fails, close streams safely before throwing an exception



Third way: Sanity.

There were more accurately like 300 hundred ways I tried to make this S.O.B. work, I’m really just giving you the way points- major strategies I tried in my journey. This is the solution. This is the ‘Tomcat solution’ I’d seen referenced throughout my journey but didn’t understand what they meant. Hence why, I’m writing this blog post.

I created an entirely new module. I called it org.rawkintrevo.cylons.opencv. In that module there is one class.

package org.rawkintrevo.cylon.opencv;

import org.opencv.core.Core;

public class LoadNative {

   static {

   native void loadNative();

I compiled that as a fat jar and dropped it in flink/lib

Then, where I would have run System.loadLibrary(Core.NATIVE_LIBRARY_NAME), I now put Class.forName("org.rawkintrevo.cylon.opencv.LoadNative").

   val stream = env
      .map(record => {
         // System.loadLibrary(Core.NATIVE_LIBRARY_NAME)

Further, I copy the OpenCV Java wrapper (opencv/build/bin/opencv-330.jar), and library (opencv/build/lib/ in flink/lib

Then I have great success and profit.

Good hunting,


Introducing Pre-canned Algorithms in Apache Mahout

Apache Mahout v0.13.0 is out and there are a lot of exciting new features and integration including GPU acceleration, Spark 2.x/Scala 2.10 integration (experimental- full blown in 0.13.1), and a new framework for “precanned algorithms”.  In this post we’re going to talk about the new algorithm framework, and how you can contribute to your favorite machine learning on Big Data library.

If you’re not familiar with the Apache Mahout project, it might be helpful to watch this video but in short- it allows you to quickly and easily write your own algorithms in a distributed back-end independent (think Apache Spark, Apache Flink, etc), and mathematically expressive extension of the Scala language. Now v0.13.0 allows the user to accelerate their distribute cluster with GPUs (this is independent of Spark- ANY cluster can be accelerated), and lays out a framework of pre-canned algorithms.

Key Concepts

The Algorithms Framework in Apache Mahout, borrows from the traditions of many of the great machine learning and statistical frameworks available today, but most notably- R and Python’s sklearn.  When reasonable, Mahout makes a good faith effort to draw on the best parts of each of these.

  • sklearnhas a very consistent API.
  • R is very flexible.
  • Both are extendable, and encourage users to create and submit their own implementations to be available for other users (via CRAN and Pypi respectively).

Fitters versus Models

The first concept we want to address is the idea of a fitter and a model.  Now that I have setup the Mahout Algorithms framework, I instantly point out a major break from the way things are done in R and sklearn. As the great thinkers Ralph Waldo Emerson and the person who wrote PEP-8 said, “A foolish consistency is the hobgoblin of little minds.”

In sklearn, the model and the fitter are contained in the same class.  In R, there is an implicitly similar paradigm…  sometimes. 

Model is an object which contains the parameter estimates.  The R function lm generates models. In this way, a Fitter in Apache Mahout generates a model of the same name (by convention.  E.g. OrdinaryLeastSquares generates an OrdinaryLeastSquaresModel which contains the parameter estimates and a .predict(drmX) method for predicting new values based on the model.

Recap: A Fitter Generates a Model. A model is an object that contains the parameter estimates, fit statistics, summary, and a predict() method.

Now if you’re thinking, “but why?”, good on you for questioning things. Why this break from sklearn? Why not let the fitter and the model live in the same object? The answer is because at the end of the day- we are dealing in big data, and we want our models to be serialized as small as is reasonable. If we were to include everything in the same object (the fitter, with the parameter estimates, etc.) then when we saved the model or shipped it over the network we would have to serialize all of the code required to fit the model and ship that with it. This would be somewhat wasteful.

Class Heirarchy

The following will make the most sense if you understand class hierarchy and class inheritance. If you don’t know/remember these things, now would be a good time to review.

Screen Shot 2017-03-30 at 10.27.34 AM

This isn’t a complete diagram but it is illustrative. For example- all Models have a summary string. All SupervisedModels have a Map of testResults. All RegressorModels have a .predict(...) method, and when ClassifierModel is introduced, they may have a .predict(...) method as well, or perhaps they will have a .classify(...) method.

Preprocessors are treated as unsupervised models. They must also be fit. Consider a StandardScaler, which must be “fit” on a data set to learn the mean and standard deviation.

The hierarchy of fitters is identical.

Hyper-parameter system

Hyper-parameters are passed in fitter functions as symbols. For example:

val model = new OrdinaryLeastSquares[Int]().fit(drmX, drmY, 'calcCommonStatistics → false)

Different methods have different hyper-parameters which maybe set. This method has advantages of extreme flexibility. It also side-steps the type safety of the Scala language, which depending on weather or not you like or hate type-safety, you might consider to be a good or bad thing. A notable draw back- if you pass a parameter that isn’t used by the method, it will be ignored silently, that is to say it will be ignored and it won’t warn you are throw an error. The real threat here is typos- where you think are doing something like, specifying an interceptless regression, however instead of specifying 'addIntercept -> false you accidentally type 'addInterept -> false, then the regression will add an intercept and throw no warnings that you’ve committed a typo. (This will possibly be fixed soon).

Also, in both hyperparameter examples given have had Boolean values, however the value can be anything. For example, in Cochrane-Orcutt on of the hyperparameters 'regressor can be any sub-class of LinearRegressorFitter!

In Practice


There are currently three pre-processors available.
* AsFactor which is sometimes referred to as Dummy Variables or One-Hot encoder (Mahout chose R-semantics here over Python)
* StandardScaler which is goes by the same name in sklearn and the function scale in R.
* MeanCenter which is very similar to the standard scaler, however it only centers each column. In the future it is possible that MeanCenter will be combined with StandardScaler (as is done in R).

A preprocessor example

A fun tip: the unit tests of any package are full of great example. This one comes from:

Setup Code
val A = drmParallelize(dense(
(3, 2, 1, 2),
(0, 0, 0, 0),
(1, 1, 1, 1)), numPartitions = 2)

// 0 -> 2, 3 -> 5, 6 -> 9
How to use AsFactor from Apache Mahout
val factorizer: AsFactorModel = new AsFactor().fit(A)
val factoredA = factorizer.transform(A)
val myAnswer = factoredA.collect
Check our results
val correctAnswer = sparse(
svec((3 → 1.0) :: (6 → 1.0) :: (8 → 1.0) :: (11 → 1.0) :: Nil, cardinality = 12),
svec((0 → 1.0) :: (4 → 1.0) :: (7 → 1.0) :: ( 9 → 1.0) :: Nil, cardinality = 12),
svec((1 → 1.0) :: (5 → 1.0) :: (8 → 1.0) :: (10 → 1.0) :: Nil, cardinality = 12)
val epsilon = 1E-6
(myAnswer.norm - correctAnswer.norm) should be <= epsilon
(myAnswer.norm - correctAnswer.norm) should be <= epsilon

The big call out from the above- is that the interface for this preprocessor (the second block of code) is exceptionally clean for a distributed, GPU accelerated, machine learning package.


There are currently two regressors available:
* OrdinaryLeastSquares – Closed form linear regression
* Cochrane-Orcutt – A method for dealing with Serial Correlation

Oh, horay- another linear regressor for big data. First off- don’t be sassy. Second, OLS in Apache Mahout is closed form- that is to say, it doesn’t rely on Stochastic Gradient Descent to approximate the parameter space β.

Among other things, this means we are able to know the standard errors of our estimates and make a number of statistical inferences, such as the significance of various parameters.

For the initial release of the algorithms framework, OrdinaryLeastSquares was chosen because of its widespread familiarity. CochraneOrcutt was chosen for its relative obscurity (in the Big Data Space). The Cochrane Orcutt procedure is used frequently in econometrics to correct for auto correlation in the error terms. When auto-correlation (sometimes called serial-correlation) is present the standard errors are biased, and so is our statistical inference. The Cochrane Orcutt procedure attempts to correct for this.

It should be noted, implementations of Cochrane-Orcutt in many statistics packages such as R’s orcutt iterate this procedure to convergence. This is ill-advised on small data and big data alike. Kunter et. al recommend no more than three iterations of the Cochrane Orcutt procedure- if suitable parameters are not achieved, the user is advised to use another method to estimate ρ.

The point of implementing the CochraneOrcutt procedure was to show, that the framework is easily extendable to esoteric statistical/machine-learning methods, and users are encouraged to extend and contribute. Observe the implementation of the algorithm, and after groking, the reader will see that the code is quite expressive and tractable, and the majority of the fit method is dedicated to copying variables of interest into the resulting Model object.

A Regression Example
Setup Code
val alsmBlaisdellCo = drmParallelize( dense(
(20.96,  127.3),
(21.40,  130.0),
(21.96,  132.7),
(21.52,  129.4),
(22.39,  135.0),
(22.76,  137.1),
(23.48,  141.2),
(23.66,  142.8),
(24.10,  145.5),
(24.01,  145.3),
(24.54,  148.3),
(24.30,  146.4),
(25.00,  150.2),
(25.64,  153.1),
(26.36,  157.3),
(26.98,  160.7),
(27.52,  164.2),
(27.78,  165.6),
(28.24,  168.7),
(28.78,  171.7) ))

val drmY = alsmBlaisdellCo(::, 0 until 1)
val drmX = alsmBlaisdellCo(::, 1 until 2)
var coModel = new CochraneOrcutt[Int]().fit(drmX, drmY , ('iterations -> 2))
Regression Tests

Unlike R and sklearn, all regression statistics should be considered optional, and very few are enabled by default. The rationale for this is that when working on big data, calculating common statistics could be costly enough that, unless the user explicitly wants this information, the calculation should be avoided.

The currently available regression tests are
* CoefficientOfDetermination – calculated by default, also known as the R-Square
* MeanSquareError – calculated by default, aka MSE
* DurbinWatson – not calculated by default, a test for the presence of serial correlation.

When a test is run, the convention is the following:

var model = ...
model = new MyTest(model)

The model is then updated with the test result appended to the model’s summary string, and the value of the test result added to the model’s testResults Map.


Apache Mahout’s algorithm framework was designed to be extended. Even with the few example given, it should be evident that it is much more extensible than SparkML/MLLib and even sklearn (as all of the native optimization is abstracted away).

While the user may create their own algorithms with great ease- all are strongly encouraged to contribute back to the project. When creating a “contribution grade” implemenation of an algorithm a few considerations must be taken.

  1. The algorithm must be expressed purely in Samasara (The Mahout R-Like DSL). That is to say, the algorithm may not utilize any calls specific to an underlying engine such as Apache Spark.
  2. The algorithm must fit into the existing framework or extend the framework as necessary to ‘fit’. For example, we’d love to see a classification algorithm, but one would have to write the Classifier trait (similar to the Regressor trait).
  3. New algorithms must demonstrate a prototype in either R, sklearn, or someother package. That isn’t to say the algorithm must exist (though currently, all algorithms have an analgous R implementation). If there is no function that performs your algorithm, you must create a simple version in another language and include it in the comments of your unit test. This ensures that others can easily see and understand what it is that the algorithm is supposed to do.

Examples of number three are abound in the current unit tests. Example


Apache Mahout v0.13.0 offers a number of exciting new features, but the algorithms framework is (biasedly) one of my favorite. It is an entire framework that encourages statisticians and data scientists who have until now been intimidated by contributing to open source a green field opportunity to implement their favorite algorithms and commit them to a top-level Apache Software Foundation project.

There has been to date a mutually exclusive choice between ‘powerful, robust, and extendable modeling’ and ‘big data modeling’, each having advantages and disadvantages. It is my sincere hope and believe that the Apache Mahout project will represent the end of that mutual exclusivity.

Lucky Number 0.13.0

Apache Mahout has just released the long awaited 0.13.0 which introduces modular native solvers (e.g. GPU support!).

TensorFlow has done a great job driving the conversation around bringing GPU accelerated linear algebra to the masses for implementing custom algorithms, however it has a major draw back that it prefers to manage its own cluster or can work on top of Apache Spark. Mahout on the other hand was designed to work on top of Spark or with any user defined distributed back-end (it just so happens Spark is the generic one we recommend for people trying out Mahout- but if you happen to use Spark great, you’re all set!)

Mahout for Hybrid Spark-GPU clusters

In Mahout, we strive to make everything modular, use something off the shelf or write your own that is optimized to you environment. In 0.13.0 Mahout has introduced a concept of modular native solvers. The native solver is a what gets called when you do any BLAS Like operations (e.g. matrix-matrix multiplication, matrix-vector multiplication, etc.). Many machine learning packages such as R, Python’s sklearn, and Spark’s MLLib utilize Fortran based solvers for speeding up these types of operations. The two native solvers currently included in Mahout are based on ViennaCL, which allow the user to utilize GPUs (via the org.apache.mahout:mahout-native-viennacl_2.10:0.13.0 artifact) or CPUs (via the org.apache.mahout:mahout-native-viennacl-omp_2.10:0.13.0 artifact).

A quick review on how Mahout works under the hood

To understand how this gets used in practice, let’s review how Mahout does math on a Distributed Row Matrix (DRM). A DRM is really a wrapper for an underlying data structure, for example in Spark a RDD[org.apache.mahout.math.Vector]. When we do some math in Mahout on a DRM, for example

drmA %*% incoreV

We are taking a DRM (which is wrapping an RDD) and taking the dot product of each row. In Spark for instance, each executor is multiplying every row of each partition it has locally by a vector (taking the dot product), this happens in the JVM.

When a native solver such as ViennaCL is in use, each executor attempts to use the ViennaCL library to dump the data out of the JVM (where the exectutor is running), utilize a GPU if one exists, load the data into the GPU and execute the operation there. (GPUs are super charged for BLAS operations like this).

This works in a similar way using ViennaCL-OMP, the difference is that all available CPUs are used.

On a Spark-GPU hybrid cluster

Supposing then you have a Spark cluster, and on each node there are GPUs available- then what you now have is statistics/machine learning on Spark which is GPU accelerated. If you have some other distributed engine that executed in the JVM (such as Apache Flink) then you also get GPU acceleration there!

If you don’t have GPUs on your Spark Cluster, you can also use the ViennaCL-OMP package to see significant performance gains. It should be noted though, the optimal ‘tuning’ of one’s Spark jobs changes when using OMP. Since ViennaCL-OMP will utilize all available CPUs, it is advised to set the partitioning of the DRM to be equal to the number of nodes you have. This is because they way Spark normally paralellizes jobs is by using one core per partition to do work- however, since OMP will invoke all cores, you don’t gain anything by ‘over parallelizing’ your job. (This assumes that setting partitions = number of executors will still allow your full data set to fit in memory).

Other cool stuff

In addition to the super-awesome GPU stuff, there are also a couple of other fun little additions.

Algorithms Framework

Currently this is very sparse, but represents Mahout moving toward offering a large collection of contributed algorithms via a consistent API similar to Python’s sklearn.

Consider classic OLS.

val drmData = drmParallelize(dense(
  (2, 2, 10.5, 10, 29.509541),  // Apple Cinnamon Cheerios
  (1, 2, 12,   12, 18.042851),  // Cap'n'Crunch
  (1, 1, 12,   13, 22.736446),  // Cocoa Puffs
  (2, 1, 11,   13, 32.207582),  // Froot Loops
  (1, 2, 12,   11, 21.871292),  // Honey Graham Ohs
  (2, 1, 16,   8,  36.187559),  // Wheaties Honey Gold
  (6, 2, 17,   1,  50.764999),  // Cheerios
  (3, 2, 13,   7,  40.400208),  // Clusters
  (3, 3, 13,   4,  45.811716)), numPartitions = 2)

val drmX = drmData(::, 0 until 4)
val drmY = drmData(::, 4 until 5)

import org.apache.mahout.math.algorithms.regression.OrdinaryLeastSquares

var model = new OrdinaryLeastSquares[Int](), drmY)

Which returns:

res3: String = 
Coef.       Estimate        Std. Error      t-score         Pr(Beta=0)
X0  -1.336265388326865  2.6878127323908942  -0.49715717625097144    1.354836239139669
X1  -13.157701320678825 5.393984138816236   -2.4393288860442244 1.9287400286811958
X2  -4.152654199019935  1.7849055635870432  -2.326540005105108  1.9194430753543341
X3  -5.67990809423236   1.886871957793384   -3.0102244462177334 1.960458377021527
(Intercept) 163.17932687840948  51.91529676169986   3.143183937239744   0.03474107366050938

Feels just like R, with the one caveat that X0 is not the intercept- a slight cosmetic issue for the time being.

The following algorithms are included as of 0.13.0:

– AsFactor: Given a column of integers, returns a sparse matrix (also sometimes called “OneHot” encoding).
– MeanCenter: Centers a column on its mean
– StandardScaler: Scales a column to mean= 0 and unit variance.
– OrdinaryLeastSquares: Closed form ordinary least squares regression
Remedial Measures
– CochraneOrcutt: When serial correlation in the error terms is present the test statistics are biased- this procedure attempts to ‘fix’ the estimates
– Coefficent of Determination: Also known as R-Squared
– MeanSquareError
– DurbinWatson: Tests for presence of serial correlation


This is a small package that adds Scala-like methods to org.apache.mahout.math.Vector. Specifically it adds the toArray and toMap method.

While these may seem trivial, I for one have found them extremely convenient (which is why I contributed them 🙂 )

Spark DataFrame / MLLib convenience wrappers.

Spark can be a really useful tool for getting your data ready for Mahout- however there was a notable lack of methods for easily working with DataFrames or RDDs of org.apache.spark.mllib.linalg.Vector (the kinds of RDD that MLLib likes).

For convenience, the following were added:


val myRDD: RDD[org.apache.spark.mllib.regression.LabeledPoint] = ...
val myDRM = drmWrapMLLibLabeledPoint(myRDD)
// myDRM is a DRM where the 'label' is the last column


val myRDD: RDD[org.apache.spark.mllib.linalg.Vector] = ...
val myDRM = drmWrapMLLibVector(myRDD)


val myDF = ... // a dataframe where all of the values are Doubles
val myDRM = drmWrapDataFrame(myDF)


This was a really hard fought release, and an extra special thanks to Andrew Palumbo and Andrew Musselman. If you see them at a bar, buy them a beer; if you see them at a taco stand, buy them some extra guac.

Watching YouTube Activity with Apache Streams

We’re finishing up our series of blogs on providers with YouTube. After this we’ll get down business on building our own social monitoring quasi-app!

Getting YouTube Credentials

Before accessing any API, we of course need to get our credentials.

A good first step in an enterprise like this would be to read the docs, or at least thumb through them, regarding registering an application with the YouTube API.

But if you wanted to read lengthy docs, you probably wouldn’t be checking out this blog- so here’s the short version:

Open the Google Developers Dashboard we’re going to “Create a Project”

Screenshot from 2016-12-06 13-25-05.png

I’m going to call my application “Apache Streams Demo”

Screenshot from 2016-12-06 13-26-13.png

The next dialog is where things can go awry.  First, right click “Enable APIs you plan to use” and open that link in a new tab, then click the drop down arrow next to “Create Credentials” and then select “Create Service Account Key” (we’ll come back to API enablement in a minute).

Screenshot from 2016-12-06 13-30-46.png

In the next screen choose ‘New Service Account’ and select P12 download type (not JSON which is the default).  A dialogue will pop up with some key information. I set permissions to owner, but that probably isn’t necessary. After you hit create the key will be generated and downloaded.

Screenshot from 2016-12-06 13-47-00.png

Screenshot from 2016-12-06 13-50-32.png

Set that .p12 file aside somewhere safe. We’ll need it soon.

Finally, we need to enable the services we want with this key.  Go to the library tab (or open the ‘new tab’ you opened when you were creating the service account.

You’ll see a long list of all the APIs you can grant access to.  We want to focus, for the moment on the YouTube services.

Screenshot from 2016-12-08 13-47-13.png

Go through each API you want (the YouTubes for this demo) and click “Enable” at the top.

Screenshot from 2016-12-08 14-20-27.png


Full Credit to Steve Blockmon on this well done notebook.

Import Setup.json

Import YouTube.json


Steve and I are working toward a suite of notebooks aimed at creating a Social Monitoring Quasi-App.  In the first paragraph of setup.json we see the following paragraph.


We’re using the Zeppelin Dependency Loader here (you’ll likely see a message how this is deprecated). At the time of writing, there seems to be an issue with adding repositories via the GUI (e.g. it doesn’t pick up the new repository URL).

z.addRepo(... does add the new repository- in this case the Apache SNAPSHOT repository. The remaining lines load various Apache Streams providers.

In the next paragraph of the notebook Steve defines a couple of UDFs for dealing with punctuation and spaces in tweets and descriptions. These will come in handy later and are worth taking a moment to briefly grok.


The first paragraph isn’t too exciting- some simple imports:

import org.apache.streams.config._
import org.apache.streams.core._

import com.typesafe.config._

import java.util.Iterator

The second paragraph is where things start to get interesting. As we do, we are building a hocon configuration. We are also leveraging Zeppelin Dynamic Forms

val apiKey = z.input("apiKey", "")
val serviceAccountEmailAddress = z.input("serviceAccountEmailAddress", "")
val pathToP12KeyFile = z.input("pathToP12KeyFile", "")

val credentials_hocon = s"""
youtube {
apiKey = $apiKey
oauth {
serviceAccountEmailAddress = "$serviceAccountEmailAddress"
pathToP12KeyFile = "$pathToP12KeyFile"

This paragraph will yield:


To find your API Key and Service account (assuming you didn’t write them down earlier- tsk tsk). Go back to your Google Developers Console, on the Credentials Tab, and click “manage service accounts” on the right.  On this page you will see your service account and API Key. If you have multiple- make sure to use the one that cooresponds to the .p12 file you downloaded.


After you enter your crendentials and path to .p12file- make sure to click the “Play” button again to re-run the paragraph and update with your credentials.

The next configuration specifies who we want to follow. It is a list of UserIDs.

val accounts_hocon = s"""
youtube.youtubeUsers = [
# Apache Software Foundation - Topic
{ userId = "UCegQNPmRCAJvCq6JfHUKZ9A"},
# Apache Software Foundation
{ userId = "TheApacheFoundation"},
# Apache Spark
{ userId = "TheApacheSpark" },
# Apache Spark - Topic
{ userId = "UCwhtqOdWyCuqOboj-E1bpFQ"},
# Apache Flink Berlin
{ userId = "UCY8_lgiZLZErZPF47a2hXMA"},
# Apache Syncope
{ userId = "UCkrSQVb5Qzb13crS1kCOiQQ"},
# Apache Accumulo
{ userId = "apacheaccumulo"},
# Apache Hive - Topic
{ userId = "UCIjbkZAX5VlvSKoSzNUHIoQ"},
# Apache HBase - Topic
{ userId = "UCcGNHRiO9bi6BeH5OdhY2Kw"},
# Apache Cassandra - Topic
{ userId = "UC6nsS04n_wBpCDXqSAkFM-w"},
# Apache Hadoop - Topic
{ userId = "UCgRu3LbCjczooTVI9VSvstg"},
# Apache Avro - Topic
{ userId = "UCzHCk8Gl5eP85xz0HwXjkzw"},
# Apache Maven - Topic
{ userId = "UCBS2s2cwx-MW9rVeKwee_VA"},
# Apache Oozie - Topic
{ userId = "UCRyBrviuu3qMNliolYXvC0g"},

I would recommend you stick with this list if you don’t have any activity on YouTube, or you’re just getting started and want to make sure some data is coming in.  But in the spirit of creating our own little social monitoring quasi-app, consider replacing the proceeding paragraph with this:

val myChannel = z.input("my YouTube Channel", "")
val myId = z.input("my ID", "")

val accounts_hocon = s"""
youtube.youtubeUsers = [
{ userId = "${myId}"},
{ userId = "${myChannel}"}

Which is going to give you a dynamic form in which a user can insert their own YouTube channel Id.

Screen Shot 2016-12-09 at 3.32.42 PM.png

You’ll note here I have put in both my User ID and channel Id, and the only difference is adJyExT6JcrA0kvKNHrXZw vs UCadJyExT6JcrA0kvKNHrXZw is the proceeding UC in the latter- denoting it is a channel. Further if you notice the code- you submit User ID and channel ID as userId=.... This may be helpful because as we will see the channel and user APIs give us slightly different information.

To find channel IDs just surf around YouTube and go to channel pages. You’ll see the address is something like (which is Astronomia Nova) and the UCGsMOj1pUwabpurXSKzfIfg part is the user ID.

Getting at that data

First we’re going to set up a ConfigFactory to load our config strings which we just did. This is going to create our YouTube configuration.

val reference = ConfigFactory.load()
val credentials = ConfigFactory.parseString(credentials_hocon)
val accounts = ConfigFactory.parseString(accounts_hocon)
val typesafe = accounts.withFallback(credentials).withFallback(reference).resolve()
val youtubeConfiguration = new ComponentConfigurator(classOf[YoutubeConfiguration]).detectConfiguration(typesafe, "youtube");

Next we will pull information about the Channel(s) we noted in our accounts_hocon config string. This may be all of the Apache Accounts or our personal one. We will create an ArrayBuffer to store all of the responses we get. This should be old hat by now- you’ll notice the very similar API as accessing the Facebook and Twitter APIs from previous posts.

import com.typesafe.config._
import org.apache.streams.config._
import org.apache.streams.core._
import java.util.Iterator

val youtubeChannelProvider = new YoutubeChannelProvider(youtubeConfiguration);

val channel_buf = scala.collection.mutable.ArrayBuffer.empty[Object]
while( youtubeChannelProvider.isRunning()) {
val resultSet = youtubeChannelProvider.readCurrent()
val iterator = resultSet.iterator();
while(iterator.hasNext()) {
val datum =;
channel_buf += datum.getDocument

The important thing to watch for, especially if using something other than the Apache userID set is that the output of the previous paragraph doesn’t end with something like: res45: Int = 0 because if that is the case- your buffer is empty and you will get errors when you try to make a table. If your buffer is empty, try using different user Ids.

Next we’ll get the user activity results.


val buf = scala.collection.mutable.ArrayBuffer.empty[Object]

val provider = new YoutubeUserActivityProvider(youtubeConfiguration);
while(provider.isRunning()) {
val resultSet = provider.readCurrent()
val iterator = resultSet.iterator();
while(iterator.hasNext()) {
val datum =;
buf += datum.getDocument

Basically the same idea here- same as last time, make sure that buffer is greater than 0!

Processing data into something we can play with

Our goal here is to get our data into a format we can play with. The easiest route to this end is to leverage Spark DataFrames / SQL Context and Zeppelin’s table display.

At the end of the last session on Apache Streams, we left as an exercise to the user a way to untangle all of the messy characters and line feeds. Now we provide the solution, a couple of User Defined Functions (UDFs) to strip all of that bad noise out.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.UserDefinedFunction
import java.util.regex.Pattern

val toLowerCase = udf {
(text: String) => text.toLowerCase

val removeLineBreaks = udf {
(text: String) =>
val regex = "[\\n\\r]"
val pattern = Pattern.compile(regex)
val matcher = pattern.matcher(text)

// Remove all matches, split at whitespace (repeated whitespace is allowed) then join again.
val cleanedText = matcher.replaceAll(" ").split("[ ]+").mkString(" ")


val removePunctuationAndSpecialChar = udf {
(text: String) =>
val regex = "[\\.\\,\\:\\-\\!\\?\\n\\t,\\%\\#\\*\\|\\=\\(\\)\\\"\\>\\ posts(s)
val YoutubeTypeConverter = new YoutubeTypeConverter()

val pages_datums = channel_buf.flatMap(x => YoutubeTypeConverter.process(new StreamsDatum(x)))

Now we create our DataFrame and register it as a temp table. Notice we use our “cleaner function” aka removePunctuationAndSpecialChar to clean column summary. This gives us a nice clean summary, while preserving the original actor.summary. If actor.summary doesn’t make any sense as a column name, see when we get to querying the table- it has to do with how spark queries the nested fields of a JSON.

import org.apache.streams.jackson.StreamsJacksonMapper;
import sqlContext._
import sqlContext.implicits._

val mapper = StreamsJacksonMapper.getInstance();
val pages_jsons = => mapper.writeValueAsString(o.getDocument))
val pagesRDD = sc.parallelize(pages_jsons)

val pagesDF =

val cleanDF = pagesDF.withColumn("summary", removePunctuationAndSpecialChar(pagesDF("actor.summary")))

In the line where we create page_json we do a map over the pages_datums we had created just previously (which was type scala.collection.mutable.ArrayBuffer[org.apache.streams.core.StreamsDatum] if you’re in to that sort of thing), changing each StreamsDatum into a JSON. We then paralellize that array, and do the standard Spark things to create a DataFrame from the RDD. Hocus-pocus and our channels have now been processed into a temp table called youtube_pages. The process is repeated for our users’ posts.

import org.apache.streams.core.StreamsDatum
import scala.collection.JavaConversions._
//Normalize activities -> posts(s)
val YoutubeTypeConverter = new YoutubeTypeConverter()

val useractivity_posts = buf.flatMap(x => YoutubeTypeConverter.process(new StreamsDatum(x)))

And then…

import org.apache.streams.jackson.StreamsJacksonMapper;
import sqlContext._
import sqlContext.implicits._

val mapper = StreamsJacksonMapper.getInstance();
val jsons = => mapper.writeValueAsString(o.getDocument))
val activitiesRDD = sc.parallelize(jsons)

val activitiesDF =

val cleanDF = activitiesDF.withColumn("content", removePunctuationAndSpecialChar(activitiesDF("content")))

Same idea, now we have a table called youtube_posts.

Let’s play.

At the end of the second paragraph for pages and posts above we called the .printSchema method on our DataFrame. This is helpful because it shows us … the schema of the DataFrame, which in a non-flat format such as JSON can be very useful. In general to traverse this schema we use field.subfield.subsubfield to query these things. Consider the following SQL statment:

, actor.displayName
, summary
, actor.extensions.followers
, actor.extensions.posts
, from youtube_pages


select id
, published
, actor.displayName
, content
, title from youtube_posts

Screen Shot 2016-12-09 at 4.19.19 PM.png

Here we see, I’m not such a popular guy on YouTube; 1 follower, 23 views, 1 post (a recent Apache Mahout Talk). My chance at being one of the great YouTube bloggers has passed me by, such is life.


The story arch of this series on Apache Streams has reached its zenith. If you have been following along, you’ll notice there isn’t really so much new information per Apache Streams anymore- the posts are more dedicated to ‘How to get your credentials for provider X’. I was thinking about doing one last one on Google+ since the credentialing is essentially the same, and you only need to change a couple of lines of code to say Google instead of YouTube.

In the dramatic conclusion to the Apache Streams mini-saga, we’re going to combine all of these together into a ‘quasi-app’ in which you collect all of your data from various providers and start merging it together using Spark SQL, visualizing with Angular, Zeppelin, and D3Js, and a little machine learning to see if we can’t learn a little something about our selves and the social world/bubble we (personally) live in.

Deep Magic, Volume 3: Eigenfaces

This week we’re going to really show off how easy it is to “roll our own” algorithms in Apache Mahout by looking at Eigenfaces. This algorithm is really easy and fun in Mahout because Mahout comes with a first class distributed stochastic singular value decomposition Mahout SSVD.

This is going to be a big job, and assumes you are have HDFS, and a small cluster to work with. You may be able to run this on your laptop, but I would recommend following the last post on setting up a small cloud-based hadoop cluster with Apache Zeppelin.

Eigenfaces are an image equivelent(ish) to eigenvectors if you recall your high school linear algebra classes. If you don’t recall: read wikipedia otherwise, it is a set of ‘faces’ that by a linear combination can be used to represent other faces.

Step 1. Get Data

The first thing we’re going to do is collect a set of 13,232 face images (250×250 pixels) from the Labeled Faces in the Wild data set.

Because some of these shell commands take a little while to run, it is worth taking a moment to set shell.command.timeout.millisecs in the sh interpreter to 600000.

My first paragraph, I’m going to make a directory for my final eigenface images that Zeppelin’s HTML loader can use. Then I’m going to download and untar the dataset. Finally, I’ll put the dataset into the tmp folder in HDFS.


mkdir zeppelin-0.7.0-SNAPSHOT/webapps/webapp/eigenfaces

tar -xzf lfw-deepfunneled.tgz
hdfs dfs -put /home/guest/lfw-deepfunneled /tmp/lfw-deepfunneled

Step 2. Add dependency JARs

We use the Scala package scrimage to do our image processing, so we’ll want to add those jars. Also, there is a bug we’re working on in Mahout where broadcasting vectors doesn’t work so well in shell / Zeppelin mode. To get around this, I have added the transformer I need in custom jar of algorithms I use.

MAHOUT-1892: Broadcasting Vectors in Mahout-Shell

If that issue is still open, you’ll need to clone and build my branch (or make your own jar).

git clone
cd mahout
git checkout mahout-1856
mvn clean package -DskipTests

Finally, assuming you are using Big Insights on Hadoop Cloud, you’ll need to copy the jar to the cluster.

scp algos/target/mahout-algos_2.10-0.12.3-SNAPSHOT.jar

Changing username to your username and to your server address.

Back in Zeppelin, load the dependencies:



// add EXPERIMENTAL mahout algos

Step 3. Setup the Mahout Context

This step imports the Mahout packages and sets up the Mahout Distributed Context


import org.apache.mahout.math._
import org.apache.mahout.math.scalabindings._
import org.apache.mahout.math.drm._
import org.apache.mahout.math.scalabindings.RLikeOps._
import org.apache.mahout.math.drm.RLikeDrmOps._
import org.apache.mahout.sparkbindings._

@transient implicit val sdc: org.apache.mahout.sparkbindings.SparkDistributedContext = sc2sdc(sc)

Step 4. Create a DRM of Vectorized Images

When doing image processing, we want a Matrix where each image is represented as a vector of numbers, where the number correspond to each pixel.


import com.sksamuel.scrimage._
import com.sksamuel.scrimage.filter.GrayscaleFilter

val imagesRDD:DrmRdd[Int] = sc.binaryFiles("/tmp/lfw-deepfunneled/*/*", 500)
       .map(o => new DenseVector( Image.apply(o._2.toArray)
       .map(p => p.toInt.toDouble / 10000000)) )
   .map(o => (o._2.toInt, o._1))

val imagesDRM = drmWrap(rdd= imagesRDD).par(min = 500).checkpoint()

println(s"Dataset: ${imagesDRM.nrow} images, ${imagesDRM.ncol} pixels per image")

sc.binaryFiles is the spark way to read in our images. We set out partitioning to 500, and map each image into a scrimage object which drops the color information (we don’t need it for feature mapping) and converts the image to an array of large integers. We scale the integers down, and use the array to create a Mahout DenseVector. Finally we zipWithIndex and then use drmWrap to transform our RDD into a DRM.

Step 5. Subtract Means

IF vector broadcasting in the shell were working (it may be by the time you read this), this is how we would subtract our means.


// How you would do this without help
// Doesn't work re BUG: MAHOUT-1892
val colMeansV = imagesDRM.colMeans
val bcastV = drmBroadcast(colMeansV)

val smImages = input.mapBlock(imagesDRM.ncol) {
case (keys, block) =>
      val copy: Matrix = block.cloned
      copy.foreach(row => row -= bcastV.value)
      (keys, copy)

We use the .colMeans method to get the colmeans, and then do a mapBlock on the images matrix, subtracting the mean from each image.

Since at the moment the above does not work, here is the hack to do it.


import org.apache.mahout.algos.transformer.SubtractMean

// Subtract Mean transforms each row by subtracting the column mean
val smTransformer = new SubtractMean() // calculuates the column mean
val smImages = smTransformer.transform(imagesDRM) // return new DRM of subtracted means


Again, this is only needed as a work around. If you’re interested in how easy it is to package your own functions into a jar- check out the SubtractMean source code

Step 6. Distributed Stochastic Singlar Value Decomposition

Based primarily on Nathan Halko’s dissertation most of the hard part here is done for us.


import org.apache.mahout.math._
import decompositions._
import drm._

val(drmU, drmV, s) = dssvd(smImages, k= 20, p= 15, q = 0)

Here k is rank of the output e.g. the number of eigenfaces we want out. p is oversampling parameter, and q is the number of additional power iterations. Read Nathan’s paper or see ssvd docs for more information.

The above will take some time- about 35 minutes on a 5-node Big Insights Cloud Cluster.

drmV will be contain our Eigenfaces (transposed).

drmU will tell us the composition of each face we fed into the algorithm. For example


drmU.collect(0 until 1, ::)


{0:-5.728272924185402E-4,1:0.005311020699576641,2:-0.009218182156949998,3:0.008125182744744356,4:0.001847134204087927,5:-0.01391318137456792,6:0.0038760898878500913,7:0.0022845256274037842,8:0.007046521884887152,9:0.004835772814429175,10:0.00338488781174816,11:0.009318311263249005,12:-0.005181665861179919,13:-0.004665157429436422,14:0.003072181956470255,15:-0.01285733757511248,16:0.005066140593688097,17:-0.016895601017982726,18:-0.012156252679821318,19:-0.008044144986630029 ... }

Which implies the first image = .005 * eigenFace1 + -.009 * eigenFace2 + …

Step 7. Write the Eigenfaces to disk

We want to SEE our eigenfaces! We use scrimage again to help us reconstruct our calculated eigenfaces back in to images.


import javax.imageio.ImageIO

val sampleImagePath = "/home/guest/lfw-deepfunneled/Aaron_Eckhart/Aaron_Eckhart_0001.jpg"
val sampleImage = File(sampleImagePath))  
val w = sampleImage.getWidth
val h = sampleImage.getHeight

val eigenFaces = drmV.t.collect(::,::)
val colMeans = smImages.colMeans

for (i <- 0 until 20){
    val v = (eigenFaces(i, ::) + colMeans) * 10000000
    val output = new Array[com.sksamuel.scrimage.Pixel](v.size)
    for (i <- 0 until v.size) {
        output(i) = Pixel(v.get(i).toInt)
    val image = Image(w, h, output)
    image.output(new File(s"/home/guest/zeppelin-0.7.0-SNAPSHOT/webapps/webapp/eigenfaces/${i}.png"))

First we load a sampleImage so we can get the height and width. I could have just opened it up in an Image viewer, but y’know, code. drmV has our eigenfaces as columns, so we transpose that. Also recall we subtracted the column means when we in-processed the images, so we’ll need to add that back.

We do a simple for loop over the eigenFaces, basically undoing the inprocessing we did. We then create an array of pixels, we do that by iterating through the vector. Finally we create the image with the width and height, and save it to the directory we set up. ZEPPELIN_HOME/webapps/webapp/ is the root directory for the %html interpreter, so we save our images in a directory there (we created this directory at the beginning).

Step 8. Display the Images

Scala is fun, but for simple string and list manipulation, I love Python


r = 4
c = 5
print '%html\n<table style="width:100%">' + "".join(["<tr>" + "".join([ '<td><img src="eigenfaces/%i.png"></td>' % (i + j) for j in range(0, c) ]) + "</tr>" for i in range(0, r * c, r +1 ) ]) + '</table>'

All that does is create an html table of our images.

Screenshot from 2016-11-10 16-26-02.png

And that’s it!

I hope you enjoyed this one as much as I did, please leave comments and let me know how it went for you.

Getting to Know Your Friends with Apache Streams

In out last adventure we got did a basic example of Apache Streams with Twitter data. This week we’re going to extend that example with Facebook data! Also note, if this seems a little light it is because it’s not that different from the last post and the full explanations are there. Our goal here is not to explain all of Apache Streams (last week) but to begin branching out into other providers.

Getting credentialed through Facebook

The first step in any new provider is going to be to go get credentials. To get Facebook credentials you’ll need to visit Facebook App Dashboard. Once you get there and sign up, you’ll need to click “Add new App” (top right). After you’ve create the new app click “Settings”, in the panel to right. Here you will see the “AppID”, and if you click “Click show to see App Secret”, you’ll have two important strings you’re going to need soon.

The final piece of credentials you’ll need to set up is a userAccessToken. The easiest way to do this is to go to Graph API Explorer. These are temporary tokens that expire quickly (after an hour or so). This isn’t a “production” grade approach, but it is a good way to test things out and get going. Persistent tokening with Facebook is out of scope of this blog post and things which I care about at this point in my life.

Setting up the Stream

First you’ll need to add some dependencies, including the Facebook provider jar:


Next we create a paragraph to make a cute little GUI for entering our credentials.

val appId = z.input("appId", "")
val appSecret = z.input("appSecret", "")
val userAccessToken = z.input("userAccessToken", "")

That’s going to create a from where you (or your friends) can copy paste their credentials into.


You’ll also want all of your inputs… and there are a LOT. (Likely not all necessary).


import com.typesafe.config._

import org.apache.streams.config._
import org.apache.streams.converter.ActivityConverterProcessor
import org.apache.streams.core.StreamsProcessor
import org.apache.streams.core._
import org.apache.streams.facebook._
import org.apache.streams.facebook.FacebookUserInformationConfiguration
import org.apache.streams.facebook.FacebookUserstreamConfiguration
import org.apache.streams.facebook.Page
import org.apache.streams.facebook.Post
import org.apache.streams.facebook.processor.FacebookTypeConverter
import org.apache.streams.facebook.provider.FacebookFriendFeedProvider
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity

import scala.collection.JavaConverters
import scala.collection.JavaConversions._
import java.util.Iterator

Next we’ll create the config files. We dynamically create this string based on the values we enter at our little GUI we made. The info section is supposed list which users your are interested in. I think for this provider it is not important especially considering the way we got our AccessToken. In this case it basically just gives you everything you are entitled to and nothing else. Specifically this provider gives you a stream of everything your friends are doing.


val credentials =
facebook {
oauth {
appId = "$appId"
appSecret = $appSecret
userAccessToken = $userAccessToken

info = [

val credentialsConfig = ConfigFactory.parseString(credentials)

val typesafe = ConfigFactory.parseString(credentials)
val config = new ComponentConfigurator(classOf[FacebookUserstreamConfiguration]).detectConfiguration(typesafe, "facebook");
val provider = new FacebookFriendFeedProvider(config, classOf[org.apache.streams.facebook.Post] );

val timeline_buf = scala.collection.mutable.ArrayBuffer.empty[Object]


This should look very similar to the setup we did for the Twitter stream, except of course we are using FacebookUserstreamConfiguration and the FacebookFriendFeedProvider classes, and our stream will consist of org.apache.streams.facebook.Posts

//while(provider.isRunning()) {
for (i <- 0 to 20) {
  val resultSet = provider.readCurrent()
  val iterator = resultSet.iterator();
  while(iterator.hasNext()) {
    val datum =;
    timeline_buf += datum.getDocument
  println(s"Iteration $i")

The key difference here is that we stop running after 40 iterations. That should be PLENTY of data. If we use the while loop, this would run forever updating each time our friends had another action.

Now we have to process our tweets facebook posts.

import org.apache.streams.converter.ActivityConverterProcessor
import org.apache.streams.facebook.processor.FacebookTypeConverter
import org.apache.streams.core.StreamsProcessor
import org.apache.streams.pojo.json.Activity
import scala.collection.JavaConverters
import scala.collection.JavaConversions._

import org.apache.streams.pojo.json.Activity
import org.apache.streams.facebook.Post
import org.apache.streams.facebook.Page

//val converter = new ActivityConverterProcessor()
val converter = new FacebookTypeConverter(classOf[Post], classOf[Activity])


val status_datums = => new StreamsDatum(x))
val activity_datums = status_datums.flatMap(x => converter.process(x)).map(x => x.getDocument.asInstanceOf[Activity])

import org.apache.streams.jackson.StreamsJacksonMapper;

val mapper = StreamsJacksonMapper.getInstance();
val activitiesRDD = sc.parallelize( => mapper.writeValueAsString(o)))

val activitiesDF =


The only line that changed here was

//val converter = new ActivityConverterProcessor()
val converter = new FacebookTypeConverter(classOf[Post], classOf[Activity])

In the future, we won’t even need to change that line! But for now, we have to explicitly set the FacebookTypeConverter and let it know we are expecting to see Posts and we want Activities (which are Activity Streams 1.0 compliant.)

At this point it is pretty much the same as last time. Feel free to check out the schema of the activitiesDF by running



If you are comparing to the tweet activities you collected before, you’ll notice there are some places where Facebook post-based activities contain more or less information, but at a high level the schema is similar: actor, likes, rebroadcasts, etc.

Let’s do a simple SQL query that won’t reveal the identity of any of my friends…


select, count(id) from activities where location is not null group by

Screenshot from 2016-11-03 17-25-53.png

And here we see most of my friends and friends of friends of friend hail from Chicago, which makes sense because it is the best city in the country, and my friends aren’t dummies.

We’ll also need to do a little post-processing on some of these columns. For instance, times are all given as stings- so to work with them as dates we can do something like this:

select a.hour_of_day, sum(rebroadcasts)
from (select hour(from_utc_timestamp(published, "MM/dd/yyyyZHH:mm:ss")) as hour_of_day, rebroadcasts.count as rebroadcasts from activities) a
group by hour_of_day

Screenshot from 2016-11-03 17-42-18.png


  1. Zeppelin’s Table API doesn’t handle new lines well. If you try to build a table of the messages, it’s not going to work out great. I leave it as an exercise to the user to clean that up.

Link to full notebook

Dipping Your Toes in Apache Streams

Do you know the origin story of the Apache Software Foundation? Let me tell you the short version: In the late 90s most of the internet was being powered by a web-server that no one was maintaining. It was a good product, but no one was feeding and caring for it (e.g. bug fixes, security updates). On BBS boards a number of people were talking about the issue, and how before too long they were going to have to start writing their own web-servers to replace it.

Around this time they realized, none of their respective companies competed based on web-servers; they were selling books, and flights, and hosting gifs of dancing hamsters. It made more sense for everyone to work together on a common web-server project- as a web server is something everyone needs, but no one uses for competitive advantage. Thus the Apache Web Server was born, and with it the Apache Software Foundation and the legally defensible, yet business friendly copyright.

This history lesson has a point, fast forward to present day and enter Apache Streams-incubating- lots of people from several industries are looking at social data from across multiple providers, everyone builds their own processors to transform into a useful format- that might be extracting text from tweets and Facebook posts. No one competes to do a ‘better job’ extracting data – its a straight forward task to connect to a providers API and reshaping a JSON into a table or other object- but it is something that everyone has to do. That’s where Apache Streams comes in.

Apache Streams is a package of tools for:
1. Connecting to a provider (e.g. Facebook or Twitter)
2. Retrieving data and reshaping it into a WC3 common format
3. Storing that data in a specified target (e.g. local file or database)
4. Doing this all in the JVM or a distributed engine (e.g. Apahce Flink or Apache Spark)
5. Much of this can be done with JSON configurations

We are going to write code to build this stream, but soon (upcoming post) we will show how to do it code free.

About Apache Streams

High level, Apache Streams is a framework that collects data from multiple social media providers, transform the data into a common, WC3 backed, format known as Activity Streams, and persist the data to multiple endpoints (e.g. files or databases).

It is also possible to use Apache Streams inline with regular Java/Scala code, which is what we will be demonstrating here today.

Step 1. Configuring

This post assuming you have Apache Zeppelin installed. Other posts on this blog explain how to setup up Zeppelin on clusters- you don’t need any of that. A local version of Zeppelin will work just find for what we’re about to do- see the official Zeppelin side for getting started

Method 1- The %spark.dep interpreter.

This is the fail-safe method- however it relies on the deprecated %dep interpreter.

In the first paragraph of the notebook enter the following:


What we’ve done here is add the Apache Snapshots repository, which contains the Streams 0.4 Snapshots. A Snapshot is a most up-to-date version of a project. It’s not production grade but it has all of the latest and greatest features- unstable though they may be. At the time of writing Apache Streams is in the process of releasing 0.4.

If you get an error:

Must be used before SparkInterpreter (%spark) initialized
Hint: put this paragraph before any Spark code and restart Zeppelin/Interpreter

Go to the interpreters page and restart the interpreter. (Click on “Anonymous” at the top right, a drop down menu will pop up, click on “Interpreters”, then at the new page search or scroll down to “Spark”. In the top right of the Spark interpreter tile there are three buttons, “Edit”, “Restart”, “Remove”. Click “Restart” then try running the paragraph again.

Method 2- The config GUI

Follow the directions directly above for accessing the interpreter config GUI in Zeppelin. This time however, click “Edit”.

You will see Properties and Values. Scroll down and find the property:


In the Values, add the following code.


This step is equivalent to the z.addRepo(... line in the other method.

Now scroll down past the Properties and Values to the Dependencies section. In the Artifact section, add org.apache.streams:streams-provider-twitter:0.4-incubating-SNAPSHOT and org.apache.streams:streams-converters:0.4-incubating-SNAPSHOT, hit the little “+” button to the right after adding each. Finally save the changes (little button at bottom), which will restart the interpreter.

Step 2. Getting credentials from Twitter

Each provider will have its own method for authenticating the calls you make for data, and its own process for granting credentials. In the case of Twitter, go to

There are several great blog posts you can find if you Google “create twitter app”. I’ll leave you to go read and learn. It’s a fairly painless process.

When you create the app, or if you already have an app- we need the following information:
– Consumer Key
– Consumer Secret
– Access Token
– Access Token Secret

Step 3. Configuring the Stream

First we are going to use the Apache Zeppelin’s Dynamic Forms to make it easy for users to copy and paste their Twitter Credentials into the program. Could you hard code your credentials? Sure, but this is a demo and we’re showing off.

The UserID is who we are going to follow. It should work with User ID’s or Twitter handles, however at the time of writing there is a bug where only UserIDs are available. In the mean time, use an online tool to convert usernames into IDs

You’ll see that we are simply getting values to fill into a string that looks sort of like a JSON called hocon. This config string, is the makings of the config JSONs that would allow you to define a Stream with out very little code. For the sake of teaching however, we’re going to cut code anyway.

Full setup Paragraph

val consumerKey = z.input("ConsumerKey", "")
val consumerSecret = z.input("ConsumerSecret", "")
val accessToken = z.input("AccessToken", "")
val accessTokenSecret = z.input("AccessTokenSecret", "")
val userID = z.input("UserID", "")

val hocon = s"""
twitter {
oauth {
consumerKey = "$consumerKey"
consumerSecret = "$consumerSecret"
accessToken = "$accessToken"
accessTokenSecret = "$accessTokenSecret"
retrySleepMs = 5000
retryMax = 250
info = [ $userID ]

Note: Becareful when sharing this notebook, your credentials (the last ones used) will be stored in the notebook, even though they are not hard-coded.

Step 4. Writing the Pipeline

We want to collect tweets, transform them into ActivityStreams objects, and persist the objects into a Spark DataFrame for querying and analytics.

After taking care of our imports, we will create an ArrayBuffer to which we will add tweets. Note, if you have any problems with the imports- check that you have done Step 1 correctly.

val timeline_buf = scala.collection.mutable.ArrayBuffer.empty[Object]

Next, we are going to parse the config string we created in the last step into a ComponentConfigurator which will be used to setup the Stream. Again, this all could be handled by Streams its self, we are literally recreating the wheel slightly for demonstration purposes.

val typesafe = ConfigFactory.parseString(hocon)
val config = new ComponentConfigurator(classOf[TwitterUserInformationConfiguration]).detectConfiguration(typesafe, "twitter");

In the remaining code, we create a Streams provider from our config file, run it and collect tweets into our array buffer.

Full paragraph worth of code for copy+paste

import com.typesafe.config._
import java.util.Iterator

import org.apache.streams.config._
import org.apache.streams.core._

import org.apache.streams.twitter.TwitterUserInformationConfiguration
import org.apache.streams.twitter.pojo._
import org.apache.streams.twitter.provider._

val timeline_buf = scala.collection.mutable.ArrayBuffer.empty[Object]
val typesafe = ConfigFactory.parseString(hocon)
val config = new ComponentConfigurator(classOf[TwitterUserInformationConfiguration]).detectConfiguration(typesafe, "twitter");

val provider = new TwitterTimelineProvider(config);

while(provider.isRunning()) {
val resultSet = provider.readCurrent()
val iterator = resultSet.iterator();
while(iterator.hasNext()) {
val datum =;
timeline_buf += datum.getDocument

Transforming Tweets into Activity Stream Objects

This is only a few lines of fairly self evident code. We create an ActivityConverterProcessor() and call the prepare method.

Recall, our timeline_buf was an ArrayBuffer of tweets- we are going to map each tweet into a StreamsDatum, because that is the format the converter expects for input. You can see on the next line our converter processes StreamsDatums.

The net result of this block is a scala.collection.mutable.ArrayBuffer[org.apache.streams.pojo.json.Activity] called activity_datums.


import org.apache.streams.converter.ActivityConverterProcessor
import org.apache.streams.core.StreamsProcessor
import org.apache.streams.pojo.json.Activity
import scala.collection.JavaConverters
import scala.collection.JavaConversions._

val converter = new ActivityConverterProcessor()

val status_datums = => new StreamsDatum(x))
val activity_datums = status_datums.flatMap(x => converter.process(x)).map(x => x.getDocument.asInstanceOf[Activity])

Step 5. Visualizing the results

This is more of a Spark / Apache Zeppelin trick than anything else. The StreamsJacksonMapper converts an Activity Stream Object into a JSON. We create activitiesRDD as an org.apache.spark.rdd.RDD[String] and in the next two lines create a DataFrame and register it as a temp table, in the usual ways.

import org.apache.streams.jackson.StreamsJacksonMapper;

val mapper = StreamsJacksonMapper.getInstance();
val activitiesRDD = sc.parallelize( => mapper.writeValueAsString(o)))

val activitiesDF =


Zeppelin is so fun, because now we can use regular SQL in the next paragraph to explore this table- try this one on for size:

select ht, count(id) as mentions
from (
select explode(hashtags) as ht, id
from activities
) a

group by ht
order by mentions desc

Screen Shot 2016-10-28 at 8.31.37 AM.png

This query gives us a county by hashtag. There are two thing you’ll want to review regarding SparkSQL and querying JSONs.
explode a row is with an array of length N it turned into N new rows, one for each element in N.
– To query a nested column use the style super_col.nested_col

Consider the following:



Outputs this:

|-- actor: struct (nullable = true)
| |-- displayName: string (nullable = true)
| |-- extensions: struct (nullable = true)
| | |-- favorites: long (nullable = true)
| | |-- followers: long (nullable = true)
| | |-- location: string (nullable = true)
| | |-- posts: long (nullable = true)
| | |-- screenName: string (nullable = true)

I truncated the output. The point is, if I wanted to query displayName and how many followers the person had I could do something like this.


select actor.displayName, actor.extensions.favorites from activities limit 20

This query wasn’t very interesting because all of the rows were me, and 70 (my followers count).

Screen Shot 2016-10-28 at 8.30.19 AM.png

Another dude’s blog post on querying JSONs, which candidly I didn’t even read

Final Thoughts and next steps

Apache Streams is an exciting young project that has the opportunity to really do some good in the world, by way of reducing wasteful redundancy (everyone has to write their own methods for collecting and homogenizing social media data).

Now you may be thinking- that was a lot of extra work to collect and visualize some tweets. I say to you:
1 – It wasn’t that much extra work
2 – Your tweets are now in Activity Streams which can be easily combined with Facebook / YouTube / GooglePlus / etc. data.

Hopefully this is going to be the first in a series of blog posts that will build on each other. Future blog posts will show how to connect other service providers and eventually we will build a collection of tools for collecting and analyzing data about your own social foot print. The series will culminate in a collection of Zeppelin Notebooks that allow the use to build an entire UI backed by Streams.

Click this link for full notebook

Deep Magic Volume2: Absurdly Large OLS with Apache Mahout

In this post we’re going to really show off the coolest (imho) use-case of Apache Mahout – roll your own distributed algorithms.

All of these posts are meant for you to follow-along at home, and it is entirely possible, you don’t have access to a large YARN cluster. That’s OK. Short story- they’re free on IBM’s BlueMix, where you can:
1. Sign up for a free 30-day account
2. Setup a 5-Node BigInsights (IBMs Hadoop Distro) on Cloud
3. Install Apache Zeppelin and Apache Mahout
4. Run an absurdly large OLS algorithm

The benefit of using a Hadoop as a Service environment is that for analytics, our dataset might not be HUGE enough that uploading is prohibitive, but big enough that we need more horse power than a single node can provide. Being able to spool up a cluster, upload a few files, do some work on it with 5 nodes/240GB RAM/ 48 Virtual processors- and then throw the environment away has a lot of use cases.

In this post we’re doing to write an Ordinary Least Squares algorithm, and then run it on a dataset of 100 million observations by 100 columns (dense).

This is a simplest of algorithms to get you started. Once you have this power at your finger tips, implement any algorithm you find specified in matrix form (which is most of them in academic papers) in Spark or Flink with ease. No longer are you tied to what ever the powers that be running SparkML and FlinkML decide to implement!

Disclosure: I’m working for IBM now, this isn’t an unsolicited sales pitch. The notebook available for part 4 will be available and will run on any Zeppelin instance (assuming you’ve followed the directions for setting up Mahout on Apache Spark listed in a previous post). If you are running Zeppelin locally, you also won’t be able to go as big as we do.

Step 1. Sign up for BlueMix-

Link to Signup

Step 2. Setup a 5-Node BigInsights on Cloud

In short IBM Big Insights on Cloud is Hadoop-as-a-Service. It’s also fairly simple to setup.

Log in to BlueMix. In the navigation bar you should see “Catalog”. Click that, and then search “BigInsights”.

Screen Shot 2016-10-13 at 1.35.31 PM.png

There are a few different UIs floating around at the moment, if you can’t find a way to create BigInsights, this link might help (must be logged in). Link

Click on BigInsights. In the next window, there should be a button that says ‘Create’ somewhere towards the right or bottom-right. (Old vs. new UI). From this point on, everything should look the same, so I’ll post more screen shots.

On the next screen click “Open”, and on the screen following that click “New Cluster”.

You should now be at a screen that looks like this:

Screen Shot 2016-10-13 at 1.45.09 PM.png

The cluster name/ username / password don’t really matter.

In the following section, make sure to set the following:

  • Number of Data Nodes: 5 (current max for free users)
  • IBM Open Platform Version: IOP 4.2 (4.3 has Spark 2.0- gross).
  • Optional Components: Make sure Spark is checked.

Screen Shot 2016-10-13 at 1.48.40 PM.png

Click “Create” and then go grab some coffee or whatever. It will take about 5-10 minutes to setup.

After you click create you’ll be taken to a screen talking about your cluster.

In the top center you will see SSH Host and a value below that looks something like, where the XXXX will be four numbers. You will need this value for the next step.

Step 3. Install Apache Zeppelin and Apache Mahout

As a proud new IBM employee, I get unfettered access to these little clusters, to celebrate I’ve done a little Python witchcraft for quickly installing and my favorite services into BigInsights Cloud instances. These scripts also open up some ports in the firewall as needed for WebUIs (especially on Zeppelin, Apache NiFi, and Apache Flink).

The scripts in essence do the following:
– ssh in to the cluster
– download the binaries of the desired program (Zeppelin, NiFi, Flink, etc.)
– untar the program
– Upload some config files specific to BigInsights
– Start the service
– If a WebUI is necessary, a BlueMix app will be created which establishes an ssh tunnel between the world wide web and the port of the service (e.g. 8080 for Zeppelin).

Skip to the end of this section to see what I mean regarding the BlueMix app.

rawkintrevo’s scripts for installing extra services on BigInsights Cloud on github

The following assumes you are running on some sort of Ubuntu. The principals are the same, but you might need to take different steps to make this work on CentOS or OSX.

Obviously you need Python.

You also need the Python modules paramiko and scp, which are used for using ssh with Python. To install these in Ubunbu, from the command line run:
sudo apt-get install python-paramiko
sudo apt-get install python-scp

Next you will need to install Cloud Foundry and IBM’s Cloud Foundry bonus packs.

Install CloudFoundry

IBM BlueMix Docs

On Ubuntu, it’s going to look something like this-

Download Cloud Foundry Installer All Installers:


Unpack and install Cloud Foundry:

sudo dpkg -i ./cf-cli-*.deb &amp;&amp; apt-get install -f

Set BlueMix Plugin Registry Endpoint:

cf add-plugin-repo bluemix-cf-staging

Install BlueMix Plugins:

cf install-plugin active-deploy -r bluemix-cf-staging
cf install-plugin bluemix-admin -r bluemix-cf-staging

Login to BlueMix via CloudFoundry:

cf login -a

Now, clone my BlueMix extra services repo:

git clone

In ./bluemix-extra-services you’ll find a script Open it up and set the following variables:
APP_PREFIX : Whatever you set this to, Zeppelin will become available at
SERVER : The name of the SSH Host from the end of Step 2. (not the name you assigned but the ssh address!)
USERNAME : Username you entered when you created the server
PASSWORD : Password you entered when you created the server

Note on S3 notebook repositories

You’ll see some commented out section about S3 notebooks. BigInsights free clusters only persist for 14 days. When they expire- so do all of your notebooks, if not persisted. You can use AWS S3 to persist notebooks so they always pop up when create a new cluster. If you have an AWS account, you can create a bucket, and set S3_BUCKET to that value. In that bucket create a folder, set S3_USERNAME to whatever that folder is called. In that folder, create a folder called notebook. There is a link also commented out that explains this further. A little ways down you will see a line commented out z.setS3auth(S3_USERNAME, S3_BUCKET). Uncomment that line to have the script update the config files to use your S3 bucket at a notebook repository.

Finally, in S3. Click on your name at the top right, there will be a drop down. Click on “Security and Credentials”. A window will pop up, click “Continue …”. In the page in the center, click on “Access Keys (Access Key ID and Secret Access Key)”. Click on the blue button that says, “Create New Access Key”. Click on “Download key file”. Save the file as /path/to/bluemix-extra-services/data/resources/aws/rootkey.csv.

These steps are required. That makes this demo a bit more tedious, but in general is good, because then all your Zeppelin notebooks follow you where ever you go. (In the future I’ll probably update so this isn’t required, I’ll update the post then.)

NOTE on S3 charges Using S3 will incur a cost… My notebooks cost me about 14 cents per month.

The important thing to do before running this script is to make sure your cluster is fully set up. To do this, in the browser tab where you created your cluster, click the little back arrow to get to the Cluster list. You will have one cluster listed, named what ever you named it. It should have a little green circle and say “Active”. If it is a yellow circle and says “Pending”, wait a little longer or hit the refresh button at the top of the list. If it is a red circle and says “Failed” has happened to me about one time in 100, hover over the little gear to the right, click “Delete Cluster”, then create a new one again. If that happens be advised your SSH Host will have changed.

Once the cluster is active and you have updated and saved the Python script, run it- it will give you some standard out that should end in

webapp should be available soon at `

where APP_PREFIX is whatever you described it as.

This script is downloading various packages, uploading config files, and finally starting a simple python web app that establishes an SSH tunnel from the webapp to the webUI of the service on the BigInsights cloud cluster.

Link to template webapp.

More information on Bluemix Apps

Step 4. Run an absurdly large OLS algorithm

So you’ve made it this far, eh? Well done. Crack a beer- the rest is pretty easy. Apache Mahout is a library that allows you to quickly ‘roll your own’ algorithms based on matrix representations, and run them on your favorite distributed engine (assuming that engine is either Apache Spark, Apache Flink, or H20).

Now- we’re assuming you followed Step 3. to create a YARN cluster, and as such you have a Spark interpreter with all of the appropriate Mahout dependencies and settings. If you didn’t follow step 3, that’s ok- just make sure to create the Mahout interpreter following the steps found in this previous post.

The first paragraph you need to run when using the Mahout interpreter in Zeppelin imports Mahout and sets the distributed context.

Initial Mahout Imports


import org.apache.mahout.math._
import org.apache.mahout.math.scalabindings._
import org.apache.mahout.math.drm._
import org.apache.mahout.math.scalabindings.RLikeOps._
import org.apache.mahout.math.drm.RLikeDrmOps._
import org.apache.mahout.sparkbindings._

implicit val sdc: org.apache.mahout.sparkbindings.SparkDistributedContext = sc2sdc(sc)

The example we are going to do today very closely follows the Playing With the Mahout Shell post from the Mahout website, except with a twist. In our version we are going to use import org.apache.spark.mllib.util.LinearDataGenerator to create a very large data set to crunch.

Synthesize some Linear Data


import org.apache.spark.mllib.util.LinearDataGenerator
val n = 100000000
val features = 100
val eps = 0.1 // i'm guessing error term, poorly documented
val partitions = 5500
val intercept = 10.0

val synDataRDD = LinearDataGenerator.generateLinearRDD(sc, n, features, eps, partitions, intercept)

Now we have a very large dataset, we need to convert it into a Mahout Distribute Row Matrix (think the Mahout equivelent of a RDD). A good primer on Mahout can be found here.

Create DRM from RDD

val tempRDD = lv =&gt; {
val K = lv._2.toInt
val x = new DenseVector(lv._1.features.toArray )
//x = sparkVec2mahoutVec( lv._1.features ) // still doesn't serialize
val y = lv._1.label
(K, (y, x))

println(&quot;----------- Creating DRMs --------------&quot;)
// temp RDD to X an y
val drmRddX:DrmRdd[Int] = =&gt; (o._1, o._2._2))
val drmX = drmWrap(rdd= drmRddX)
val drmRddY:DrmRdd[Int] = =&gt; (o._1, new DenseVector( Array(o._2._1) )))
val drmy = drmWrap(rdd= drmRddY)

Also note, the only reason we are using Spark instead of Flink here, is that SparkML comes with this nice linear data generator. Assuming you were loading your data from some other source, the following will code will run on Mahout on Spark OR Mahout on Flink.

For those a little rusty on Ordinary Least Squares method of regression:


Mahout OLS


val drmXtX = drmX.t %*% drmX
val drmXty = drmX.t %*% drmy
val beta = solve(drmXtX, drmXty)

And that’s it. An R-Like DSL for Scala that runs inside your Spark code and can be copy-pasted to Flink code.

Play with the paragraph that synthesizes data. This is a good exercise on how Spark partitioning strategies can affect performance. I played with it for an hour or so- 100 million rows by 100 columns was the largest I could get to run on the BigInsights Cloud ( which has approx 240GB RAM and 48 processors ).

In this post we spent a lot of time setting up an environment and not much time doing anything with it. The onus is now on you to go implement algorithms. In posts that follow, I intend to refer back to this post for setting up an environment that has the horse power to calculate big jobs. Because if you’re not going to be distributed then you have to ask your self, why not just do this in R?


When things go wrong.

Two types of errors can happen in general:
– Zeppelin failed to start correctly
– The webapp failed to start

If you go to the address given and see a 404 error (make sure you typed it correctly), odds are the webapp failed. From the dashboard in BlueMix you should see your webapp, click on it and then click on logs. See if there are any clues.

If you go to the address and see a 502 error, Zeppelin didn’t start. Check the standard out from when you ran the program and look for errors.

If you go to the address and see a 503 error, the web app is tunneling correctly, but Zeppelin isn’t serving it data.

In a terminal ssh in to the cluster as follows:

ssh username@ssh-host

where username is the user name you picked in step 2, and ssh-host is the host given in SSH Host.

If you had a 502 error, from here you can manually start Zeppelin with

z*/bin/ start

Then try the website again.

The BigInsights cluster is restrictive to say the least. Use tail to see the end of the logs.

tail z*/logs/*out -n 50

Look for something about AWS credentials not being accepted. If that happens, STOP ZEPPELIN

z*/bin/ stop

Delete the zeppelin directory

rm -rf zeppelin-0.7.0-SNAPSHOT

Double check the rootkey.csv (maybe re download?) and run the python script again.

If you don’t see anything, at this point it is just standard Zeppelin troubleshooting.