Big Data for n00bs: Gelly on Apache Flink

Big Data for n00bs is a new series I’m working on targeted at absolute beginners like my self.  The goal is to make some confusing tasks more approachable.  The first few posts will be spin offs of a recent talk I gave at Flink Forward 2016 Apache Zeppelin- A Friendlier Way To Flink (will link video when posted).

Graph databases are becoming an increasingly popular way to store and analyze data, especially when relationships can be expressed in terms of object verb objectFor instance, social networks are usually represented in graphs such as 

Jack likes Jills_picture

A full expose on the uses and value of graph databases is beyond the scope of this blog post however the reader is encouraged to follow these links for a more in depth discussion:

 

Gelly Library on Apache Flink

Gelly is the Flink Graph API.

Using d3js and Apache Zeppelin to Visualize Graphs

First, download Apache Zeppelin (click the link and choose the binary package with all interpreters) then “install” Zeppelin by unzipping the downloaded file and running bin/zeppelin-daemon.sh start or bin/zeppelin.cmd (depending on if you are using windows or Linux / OSX).  See installation instructions here.

After you’ve started Zeppelin, open a browser and go to http://localhost:8080

You should see a “Welcome to Zeppelin” page.  We’re going to create a new notebook by clicking the “Notebook” drop down, and the “+Create new note”.

screen-shot-2016-09-15-at-8-50-22-am

Call the notebook whatever you like.

Add dependencies to the Flink interpreter

Next we need to add two dependencies to our Flink interpreter.  To do this we go to the “Interpreters” page, find the “Flink” interpreter and add the following dependencies:

  • com.typesafe.play:play-json_2.10:2.4.8
    • used for reading JSONs
  • org.apache.flink:flink-gelly-scala_2.10:1.1.2
    • used for the Flink Gelly library

We’re also going to exclude com.typesafe:config from the typesafe dependency.  This packaged tends to cause problems and is not necessary for what we are doing, so we exclude it.

The dependencies section of our new interpreter will look something like this:

screen-shot-2016-09-15-at-8-58-08-am

Downloading some graph data

Go back to the notebook we’ve created.  In the first paragraph add the following code
%sh
mkdir tmp
wget https://raw.githubusercontent.com/d3/d3-plugins/master/graph/data/miserables.json -O tmp/miserables.json

It should look like this after you run the paragraph (clicking the little “play” button in top right corner of paragraph):

Screen Shot 2016-09-15 at 9.04.06 AM.png

What we’ve done there is use a Linux command wget to download our data. It is also an option to simply download the data your browser, you could for example right click on this link and click “Save As…” but if you do that, you’ll need to edit the next paragraph to load the data from where ever you saved it to.

Visualizing Data with d3js

d3js is a Javascript library for making some really cool visualizations. A fairly simple graph visualization was selected to keep this example fairly simple; a good next step would be to try a more advanced visualization.

First we need to parse our json:

import  scala.io.Source
import play.api.libs.json._
import org.apache.flink.graph.scala.Graph
import org.apache.flink.graph.Edge
import org.apache.flink.graph.Vertex

import collection.mutable._
import org.apache.flink.api.scala._

val dataJson = Source.fromFile("/home/guest/tmp/miserables.json").getLines.toList.mkString
val json: JsValue = Json.parse(dataJson)

 

Screen Shot 2016-09-20 at 12.10.02 PM.png
We’re going to have some output that looks like this/

For this hack, we’re going to render our d3js, by creating a string that contains our data.
(This is very hacky, but super effective).

%flinkGelly
println( s"""%html
<style>

.node {
  stroke: #000;
  stroke-width: 1.5px;
}

.link {
  fill: none;
  stroke: #bbb;
}

</style>
<div id="foo">


var width = 960,
    height = 300

var svg = d3.select("#foo").append("svg")
    .attr("width", width)
    .attr("height", height);

var force = d3.layout.force()
    .gravity(.05)
    .distance(100)
    .charge(-100)
    .size([width, height]);

var plot = function(json) {

  force
      .nodes(json.nodes)
      .links(json.links)
      .start();

  var link = svg.selectAll(".link")
      .data(json.links)
    .enter().append("line")
      .attr("class", "link")
    .style("stroke-width", function(d) { return Math.sqrt(d.value); });

  var node = svg.selectAll(".node")
      .data(json.nodes)
    .enter().append("g")
      .attr("class", "node")
      .call(force.drag);

  node.append("circle")
      .attr("r","5");

  node.append("text")
      .attr("dx", 12)
      .attr("dy", ".35em")
      .text(function(d) { return d.name });

  force.on("tick", function() {
    link.attr("x1", function(d) { return d.source.x; })
        .attr("y1", function(d) { return d.source.y; })
        .attr("x2", function(d) { return d.target.x; })
        .attr("y2", function(d) { return d.target.y; });

    node.attr("transform", function(d) { return "translate(" + d.x + "," + d.y + ")"; });
  });
}

plot( $dataJson )


</div>
""")

Now, check out what we just did there: println(s"""%html ... $dataJson. We just created a string that started with the %html tag, letting Zeppelin know, this is going to be a HTML paragraph, render it as such, and then passed the data directly in. If you were to inspect the page you would see the entire json is present in the html code.

Screen Shot 2016-09-20 at 12.20.35 PM.png
This is the (messy) graph we get.

From here, everything is a trivial exercise.

Let’s load this graph data into a Gelly Graph:

val vertexDS = benv.fromCollection(
(json \ "nodes" \\ "name")
.map(_.toString).toArray.zipWithIndex
.map(o => new Vertex(o._2.toLong, o._1)).toList)

val edgeDS = benv.fromCollection(
((json \ "links" \\ "source")
.map(_.toString.toLong) zip (json \ "links" \\ "target")
.map(_.toString.toLong) zip (json \ "links" \\ "value")
.map(_.toString.toDouble))
.map(o => new Edge(o._1._1, o._1._2, o._2)).toList)

val graph = Graph.fromDataSet(vertexDS, edgeDS, benv)

Woah, that looks spooky. But really is not bad. The original JSON contained a list called nodes which held all of our vertices, and a list called links which held all of our edges. We did a little hand waving to parse this into the format expected by Flink to create an edge and vertex DataSet respectively.

From here, we can do any number of graph operations on this data, and the user is encouraged to do more. For illustration, I will perform the most trivial of tasks: filtering on edges whose value is greater than 2.

val filteredGraph = graph.filterOnEdges(edge => edge.getValue > 2.0)

Now we convert our data back in to a json, and use the same method to re-display the graph. This is probably the most complex operation in the entire post.

val jsonOutStr = """{"nodes": [ """.concat(filteredGraph.getVertices.collect().map(v => """{ "name": """ + v.getValue() + """ } """).mkString(","))
.concat(""" ], "links": [ """)
.concat(filteredGraph.getEdges.collect().map(e => s"""{"source": """ + e.getSource() + """, "target": """ + e.getTarget + """, "value": """ + e.getValue + """}""").mkString(","))
.concat("] }")

As we see we are creating a json string from the edges and vertices of the graph. We call filteredGraph.getVertices.collect() and then map those vertices into the format expected by the json. In this case, our rendering graph expects a list of dictionaries of the format { "name" : }. The edges follow a similar pattern. In summation though we are simply mapping a list of of collected vertices/edges to string representations in a json format.

Finally, we repeat our above procedure for rendering this new json. An imporant thing to note, our code for mapping the graph to the json will work for this no matter what operations we perform on the graph. That is to say, we spend a little time setting things up, from a perspective of translating our graphs to jsons and rendering our jsons with d3js, and then we can play as much as we want with our graphs.

println( s"""
<style>

.node {
  stroke: #000;
  stroke-width: 1.5px;
}

.link {
  fill: none;
  stroke: #bbb;
}

</style>
<div id="foo2">


var width = 960,
    height = 500

var svg = d3.select("#foo2").append("svg")
    .attr("width", width)
    .attr("height", height);

var force = d3.layout.force()
    .gravity(.05)
    .distance(100)
    .charge(-100)
    .size([width, height]);

var plot = function(json) {

  force
      .nodes(json.nodes)
      .links(json.links)
      .start();

  var link = svg.selectAll(".link")
      .data(json.links)
    .enter().append("line")
      .attr("class", "link")
    .style("stroke-width", function(d) { return Math.sqrt(d.value); });

  var node = svg.selectAll(".node")
      .data(json.nodes)
    .enter().append("g")
      .attr("class", "node")
      .call(force.drag);

  node.append("circle")
      .attr("r","5");

  node.append("text")
      .attr("dx", 12)
      .attr("dy", ".35em")
      .text(function(d) { return d.name });

  force.on("tick", function() {
    link.attr("x1", function(d) { return d.source.x; })
        .attr("y1", function(d) { return d.source.y; })
        .attr("x2", function(d) { return d.target.x; })
        .attr("y2", function(d) { return d.target.y; });

    node.attr("transform", function(d) { return "translate(" + d.x + "," + d.y + ")"; });
  });
}

plot( $jsonOutStr )


</div>
""")

Also note we have changed $dataJson to $jsonOutStr as our new graph is contained in this new string.

A final important call out is the d3.select("#foo2") and <div id="foo2"> in the html string. This is creating a container for the element, and then telling d3js where to render the element. This was the hardest part for me; before I figured this out, the graphs kept rendering on the grey background behind the notebooks- which is cool, if that’s what you’re going for (custom Zeppelin skins anyone?), but very upsetting if it is not what you want.

Screen Shot 2016-09-20 at 12.31.44 PM.png
New filtered graph.

Conclusions

Apache Zeppelin is a rare combination of easy and powerful.  Simple things like getting started with Apache Flink and the Gelly graph library are fairly simple, however we are still able to add in powerful features such as d3js visualizations, with relatively little work.

 

Why Flink is going to upend the Digital Advertising industry in 5 minutes.

So, I was on a call for the November i-Com Data Science Board meeting Thursday morning.  There were supposed to be 5 minute presentations with discussions on a few topics, however a couple of the presenters couldn’t make the call, including the presenter on Streaming Data Processing, and Real Time Analytics.  So I think to myself I think, “Hey, real time analytics and streaming data processing is kind of my wheel house. I could probably hip-shot a pretty solid 5 minute talk on the subject and how it relates to digital marketing.”  So I hacked out a quick outline, and was all set and then the discussion following the first presentation went until the end of the meeting.

Waste not, want not; I am recycling the outline into a quick blog post because most of the people on my distribution list (e.g. Twitter, Facebook, and LinkedIn friends) are somehow tied to digital marketing, and this is relevant to their interests.

To do so properly I have to start with a quick primer on Big Data and real-time stream processing so that the reader understands where we were 5 years ago, where we are now, and why some up-and-coming Apache projects are about to change everything. Second I’ll briefly compare the current real-time streaming technology, and the two new comers.  Finally I’ll give a couple of quick hits on how this will turn the digital ad space on its head.

The history of Big Data / Streaming / Real Time Data processing in 120 seconds.

A stream. Consider a stream of data coming at you. The most familiar type of streaming data to the audience at large is probably Twitter data.  Consider a stream of all Tweets originating within a 100 kilometer radius of Chicago.

Distributed computing.  You might have heard this term floating around; more likely you have heard the tangent buzzwords, including but not limited to: Map-Reduce, Spark, Hadoop, Flink, Clusters, Cloud, etc.  In the mid 2000s Google figured it was cheaper and wiser to make teams of cheap computers work together than keep buying big expensive servers.  People had been doing this for a while, but Google did it really well and addressed a lot of the problems associated with it (for example machines randomly fail) and the best part was in the late 2000s Google released the technology open-source (read free) to the world, that’s when this whole mess started.

Apache Hadoop. So in a Big Data sense, real-time stream processing means having teams of computers working together to process streaming data. Streaming is the key here, because Google’s version worked on large data sets that were already there, think of a traditional database or even an Excel spreadsheet. Excel can handle 1 million rows. Hadoop (the name of Google’s baby) can handle several trillion and beyond.

Apache Storm. Twitter has been dealing with streaming data for a while. Similar to Google, Twitter developed a really nice tool for handling streaming data that solved a lot of the common problems and was generalizable to many use-cases.  They also released it to the world open source in the early 2010s. This is the tool (or some hacked version of it) that many DSPs use today (for example Rubicon).

Apache Spark. Apache Spark is the hot new thing in Big Data. It has significant improvements over Hadoop, the Google product, and solves a lot of the Hadoop problems, a full discussion is beyond the scope of this article. To be fair though, Spark is really like 5 separate projects all kind of lumped into one. One of those side projects is Spark Streaming.  The way Spark thinks of data is in batches.  Spark “Streaming” is really mini-batch processing. In the Twitter example Spark creates ‘windows’ e.g. it would take all the records (e.g. tweets) that come in every 5 seconds, or every 100 records as a mini batch then process that.

Apache Flink. Flink, came into existence a little more recently than Spark, and was built specifically to take the place of the aging Storm (Twitter quit supporting Storm and it has lost a lot of development momentum).  Flink, like Storm, was built to process streaming system. It can process batches like Spark, but it considers batches to be a special case of streams where you know the stream is only so long.

Sidebar:  What is Apache?  The Apache Software Foundation is like Habitat for Humanity for software.  Professionals and companies volenteer time to improve code of the open-source they use, and the software is then free for everyone.  ASF is a non for profit that coordinates and provides a set of standards for some (but not all) larger open-source projects.

Four key things when processing streaming data in 60 seconds.

There are four interrelated concepts when it comes to processing streaming data in a distributed (team of computers) way.

Throughput. How many records can a given set of hardware (team of computers) process in a given length of time. I.e. How many records (tweets) per second can you process.

Latency. A concept all gamers are familiar with- a record comes in now, how long until it is processed.  E.g. time between the person clicking the ‘tweet’ button and the time it shows up in your feed.

Functionality. What kinds of weird black-magic data-science stuff can you do on the records to analyze them. Simple function on tweets: Counting how many came out of Chicago today.  Less simple function: real-time language processing algorithm to detect terrorist tweeting they’re about to go do some terror and then alerting the authorities. The higher the functionality, the lower the throughput.

Statefulness. This is more of an under-the-hood concept but an important one that I will point out because it significantly impacts the others.  Statefulness and more precisely ‘Exactly Once’ statefullness refers to the systems ability to make sure the team of computers process every record that comes in and they process it only once. Somewhat trivial for most Twitter applications, VERY important for banking transactions.  Consider the team of computers- what if one computer crashes halfway through processing a record, how does the system recover?  Also consider someone sending a tweet when their phone was in airplane mode, they clicked send at noon, but their plane didn’t land until 5pm and then reported the tweet, how does the system rectify this?

Comparison of the technologies and implications for digital marketers in 120 seconds.

Storm and Hadoop were based on internal code released into the wild by benevolent companies. Flink and Spark were grad-student projects that took off, both still in their early phases but already being used in production (in real life, not just toys in the R&D department) at numerous firms. Spark is considerably more popular at the moment (it had a two year head start).

Functionality (good) Latency (bad) Throughput (good)
Spark Mid High High
Flink Low Low Ultra-High
Storm Mid Ultra-Low Low

I left Hadoop off this chart because it isn’t a viable option at all for streaming data. I also want to call out the throughput for Flink vs. Storm: A study by Data Artisans found that on a particular task and hardware set, Flink achieved average throughput of 690,000 records per second per core, compared to Storm’s 2,600 records per second on similar hardware. That is a mind-blowing increase in throughput, and that one thing you must remember or nothing that follows will seem wondrous.

Here in lies the crux of why any of this matters to digital marketers:

A sea change is coming.  All of the RTB, targeting, digital hocus-pocus / etc. that you/your vendors currently do, is about to see a hardware cost reduction to the tune of 1000-fold.  Another way to look at it: using current hardware, digital advertisers are about to have 1000-fold more computational power (for black magic algorithms) at their disposal.

To further put this in perspective, consider Rubicon, a juggernaut in the industry, currently running on Apache Storm.  A few employees could go rogue and with in a week or two setup a platform that could honestly out perform Rubicon, on a shoe-string budget.  You as the buyer might think, “Oh there is no way this fly-by-night operation could really out perform Rubicon”… As with any start-up I’m sure there will be hiccups, but yes- they can. Also, I use Rubicon only as an example. Any one whose core business involves making high velocity decisions at scale (read: anyone in the RTB space) is at risk.

For a short list of companies who can be upset check out the Companies using Storm. Paradoxically, these companies are actually in the best position to move forward, because there are guides and tools readily available for migrating from Storm to Flink.  Companies that have built custom in house solutions will basically have to start from scratch either rebuilding a new in house solution, or migrating to Flink.  A quick moral of the story- it behooves a firm to use open-source when ever possible, you are out sourcing a non-core-competency.

This is literally the horse-and-buggy versus high-speed-trains.  In my 5-minute-hip-shot talk, this is the big change that jumps off the page at me, that the giants have to competitive advantage (and arguably a competitive disadvantage, in terms of no sunk cost in defunct technologies/organizational resistance to change) against the upstarts.

The second implication is a little more pleasant than the fall and rise of digital ad empires.  The Internet of Things promises to increase the amount of streaming data exponentially.  To date, there hasn’t been a great solution for processing all of this data. More is not more unless there is something useful to do with all of the data that is being kicked off.  Unlocking potential to cost-effectively process sensor data in real time frees up a current bottle neck slowing the progression of the internet of things.

For example, my day job is working with off-line CPG transaction data.  No one has ever merged the offline store-register data with online targeting (at scale).  Mainly because doing so would be an expensive pain in the butt. Well, it’s getting to a point where it would be fairly cheap and trivial exercise. That’s going to give rise to entirely new sets of strategies, tactics, and KPIs.  It’s also going to unfold differently for each industry.  I can’t tell you what the future looks like, only that there is a storm coming.

You were probably already expecting to see significant gains in IoT and real-time streaming technologies over the next 5 years. Now you understand why it is happening, and as Virgil says, “Fortunate is he who understands the causes of things”.

 

How to get paid a gagillion dollars building crazy big data stuff on nights and weekends.

UPDATE 5-23: The world and ecosystem of Big Data evolves quickly.  Most of these tools have gone through multiple releases since I first penned this article.  I’ve tried to update accordingly. Good hunting.

That title is a lie, probably.

Spark is the hot new thing in big data. Flink will be the hot new thing in big data as internet-of-things, real-time analytics, and other buzz-words go from being stary-eyed promises made in sales pitches to things that actually happen. At the end of the day, most people don’t want to get their hands dirty monkeying around with the mechanics of this stuff, they just want a pretty web interface to use.

So to get paid a gagillion dollars, you basically just start tinkering with this and maybe contribute a bit here and there, then in 2 years when Flink and Spark are the new Microsoft Excel, you’re one of a couple thousand people in the world who have been working with this stuff for over a year. #!/bin/sh (pronounced ‘sha-bang’, more computer jokes, I digress) you’re getting paid a gagillion a year.

Let’s be real. Your proposed analytics stack could do some sci-fi black-magic analytics that perfectly predicts all of your KPIs, lottery numbers, and what color of ChuckTs you should rock with that awesome new dinosaur sweater you just got, but if it doesn’t have a sparkly, friendly, not-scary front-end you’re going to have a hard time getting any traction with it. (If you’re not doing this sort of thing day-to-day, then I’m sorry to say, this is the reality of things: people are inherently uncomfortable with submitting jobs via command line.)

Use Case #2: Having a pretty front end for your Spark / Flink like DataArtisans or DataBricks is nice, but for whatever reason you can’t put your data out on some cloud.

Because that one time that this happened…

So with out further ado, I present a nice recipe for setting up Apache Flink, Apache Spark, and Apache Zeppelin(incubating) in big-boy mode. (big-boy mode: Zeppelin comes pre-packaged with Flink and Spark, but you want to be pointing at full blown clusters of both of these because, y’know, science).

Ingredients:

Prep time: 2 hours.

Skill Level: Knows just enough to be dangerous

But really, this is a heavily documented (over documented?) recipe. It assumes no familiarity with linux and provides little blurbs and links about what each command is doing.  I am an autodidact when it comes to computers and while blindly following recipes is nice for getting something going it doesn’t teach you much and if the slightest thing goes wrong you are totally lost. So proceed with no fear.

Step 0:

Because jebus-only-knows what kind of wierd-o setup any given box could have, I present this recipe on a fishbone (that is minimal install) Ubuntu Server 14.04.3 virtual machine.  For demo purposes, this minimizes instances where

  1. something weird is causing unique problems and
  2. I miss a dependency because I have it on my computer, but you may not.

There are lots of tutorials on how to install Ubuntu on a virtual machine, but to be honest, if this step scares you, you should plan on this being a full day project or more.  It’s really nothing more than a recipe, but you’re going to be doing a lot of learning along the way.

Make sure to setup the virtual box to use a bridged network adapter. A good write up on the differences can be found here. In short, in Oracle VirtualBox go in to the machine Settings -> Networking and select Bridged Adapter from the drop down.

Select Bridged Adapter from the drop down menu.
Select Bridged Adapter from the drop down menu.

A bridged adapter basically has the fewest restrictions on the virtual machine. We’re going to be installing a lot of things that will be accessing ports on the host machine and this is just going to be a lot simple. If you’re curious go read more.

Gotchya

Throughout this tutorial I’ll point out things that seem trivial but if you variate from the script even slightly can totally derail you. I do this because I am not one to carefully follow directions and provide this information for others like me. In this case, the ‘gotchya’ is if the virtual machine is already running when you change this setting you need to restart the machine for the changes to take affect.

Install Ubuntu 14.04.3

A good tutorial is here.

Gotchya

I only attest to this recipe working on a bare bones Ubuntu 14.04.3 Server installation (the link to exactly what I used is in the ingredients). If you decide to use another version of Ubuntu or flavor of Linux, you may have to tweak some things. Zeppelin, Flink, and Spark are all written in Java/Scala so theoretically this could be done on OSX or Windows, but you wouldn’t run a cluster on Windows or OSX boxes, and for a number of other reasons, I’ve chosen Ubuntu 14.04.3. Get this working then try to do something weird if you want.

Step 1- Prepping the Box

Some basic applications that will be required by the programs we are going to use. Software in Ubuntu is managed via apt. Generally speaking there are three ways to get software.

  1. Download from the repository
  2. Download binaries directly
  3. Download and compile from source

In this episode, we’ll be doing all three. If you’re coming from windows and used to everything being pre-compiled for you with a nice pretty GUI installer… I don’t know what to tell you, other than ‘Welcome to Linux, this is life now.’

Any time you see the sudo apt-get we are telling the computer:

  1. sudo literally: super user do
  2. apt-get install use the package manager to install the requested software.

So we are using apt-get to install:

git

git is a program for software version control. We’ll be using to download the latest source code for programs we’re going to compile.

sudo apt-get install git
openssh-server

A basic Secure Shell server.

sudo apt-get install openssh-server
OpenJDK 7

The Java Development Kit version 7.

sudo apt-get install openjdk-7-jdk openjdk-7-doc openjdk-7-jre-lib

But Maven 3+ isn’t in the repository at the time of this writing. That is to say, if we use apt-get we will get a version of maven that is to old for what we need. So for maven, we are going to download a binary distribution and manually copy it into place.

Maven 3.1+

In the same way apt-get is a neat way to manage software which is kept in a repository, maven manages code libraries for us. Check out https://maven.apache.org/ for more info.

If you already have maven installed, we can use apt to remove software as well as install it.

sudo apt-get purge maven maven2

*note if you don’t have maven installed, this command isn’t going to hurt anything, you’re just going to see an error message about how there was nothing to uninstall.

Installing maven 3.3.3 quick and dirty
Download the maven 3.3.3 binary

wget is a CLI (command line interface) downloader.

wget "http://www.us.apache.org/dist/maven/maven-3/3.3.3/binaries/apache-maven-3.3.3-bin.tar.gz"

Unzip the binary (tar -zxvf) and then move (sudo mv) it to /usr/local

tar -zxvf apache-maven-3.3.3-bin.tar.gz
sudo mv ./apache-maven-3.3.3 /usr/local

Finally, create a symbolic link (ln -s) from /usr/bin to the new version. /usr/bin is one of the places Ubuntu looks for programs by default.

sudo ln -s /usr/local/apache-maven-3.3.3/bin/mvn /usr/bin/mvn

Installing Zeppelin

At the time this article went to press, the main branch of Zeppelin didn’t have support for Flink 0.10 (which at the time of press was the current stable release of Flink). There is a discussion here, but the short version is you either need to hack Zeppelin yourself or use Till Rohrmann’s branch.  For parsimony, I present the second method and leave ‘How to hack out Zeppelin’ as content for another post…

First, git clone Apache Zeppelin. This is the third method of getting software discussed earlier. We’re downloading source code to compile.

git clone https://github.com/tillrohrmann/incubator-zeppelin.git
git clone https://github.com/apache/incubator-zeppelin.git

UPDATE 5-23: Originally we wanted to get Flink v0.10 and Till’s branch had this, now the real Zeppelin is updated to branch 1.0, so we got to the (real) source.

change directory (cd) to incubator-zeppelin

cd incubator-zeppelin

A git can have multiple branches. A good overview is here. We want to checkout the branch that Till made with the Zeppelin configuration for flink-0.10-SNAPSHOT.

git checkout flink-0.10-SNAPSHOT

Now we instruct maven (mvn) to clean and package the source found in the directory (more on maven build life-cycles). Additionally we pass flags to maven instructing it to build against Spark version 1.5 (-Pspark-1.5) and to skip the tests that make sure it compiled correctly.  See Gotchya below.

UPDATE 5-23: This all seems to be working now.  We add flags -Psparkr -Ppyspark -Pspark-1.6 to make Zeppelin build against Spark 1.6 (included since last time), add support for SparkR and support for pyspark.  At the time of writing -Dflink.version=1.0 isn’t necessary, but will hopefully keep this working for a little while longer, especially after Flink v1.1 is released.

mvn clean package -DskipTests -Psparkr -Ppyspark -Pspark-1.6 -Dflink.version=1.0
Gotchya:

I explicitly didn’t use the -Pspark-1.5 flag. If I had, it would have built Zeppelin with an internal Spark interpreter at version 1.5. I was having all sorts of issues when doing this, and finally rolled back to make this a simple-as-possible case. If you want to try your hand at Spark 1.5, then add that flag and in the next section when you install Spark, checkout version 1.5 instead.

The maven build will take a little while (26 minutes for me). When it is done, you should see a message saying BUILD SUCEESS and some statistics.

And finally…drumroll… the moment you’ve all been waiting for … start the Zeppelin daemon.

sudo bin/zeppelin-daemon.sh start
Gotchya

You must use sudo when you start the zeppelin deamon. The onus is on you to remember to do this. It is absolutely possible to start the daemon without sudo and you will be able to run the Flink example listed below, however the Spark example won’t work. The Zeppelin internal Spark interpreter needs super user privileges for creating databases and other various writes.

UPDATE 5-23: Don’t use sudo .  If you do it once, you’ll have to do it always, and having Zeppelin running as super user is unwise and unnecessary.

Test flight of our new Zeppelin…

First determine the local IP address of the machine hosting Zeppelin.

ifconfig
This is the ouptput from my machine, your numbers will be different.
This is the output from my machine, your numbers will be different.

See that the IP of my machine is 192.168.1.109, yours will be different. In subsequent screenshots, in the browser address you will see this IP, however for those following along at home, you need to use your own IP address.

Open a browser and surf to http://yourip:8080, where yourip is the IP you found in the inet addr: field under the eth0 section. Port 8080 is the default port of the Zeppelin WebUI.

Guten Tag, Zeppelin.
Guten Tag, Zeppelin.

Open the Tutorial Notebook by clicking on Notebook -> Zeppelin Tutorial

Do this.
Do this.

When you open the tutorial notebook it will ask you to bind the interpreters, just do it by clicking save. Now run all of the examples in the notebook to make sure they are working. You can do this by going to each cell and clicking Shift+Enter or by clicking the little play button at the top of the note.

run all zeppelin

Now we are going to do a couple of simple examples in Flink and Spark. Zeppelin comes pre-built with its own Flink and Spark interpreters, and will use these until we have it pointed at our own cluster (which happens later). For now, we’re going to test some basic functionality of Zeppelin by running a Flink and a Spark word count example against the internal interpreters.

Flink Example

First, create a new notebook. Do this by clicking on Notebook -> Create New Notebook. Name this notebook “flink example”. Zeppelin doesn’t automatically open the notebook you’ve created, you’ll have to click on Notebook again and the name you gave the new notebook will appear in the list.

You can find a Flink word count gist here. Copy and paste the code from the gist into the Zeppelin note and either hit Shift+Enter or click the play button to run the paragraph.

Hopefully you see something like this...
Hopefully you see something like this…
Spark Example

Create another new notebook, call this one “spark example”, and open it.

Copy and paste the gist from here.

Assuming your examples are working go ahead and stop Zeppelin.

bin/zeppelin-daemon.sh stop

Installing Flink and Spark Clusters

There are lots of how to guides for getting full blown Flink and Spark clusters set up. For this example, we’re just going to install a stand alone of each. The important thing in this tutorial is how to get Zeppelin aimed at Flink and Spark instances outside of the ones that come prepackaged. These external versions can be scaled/built/setup to suit your use case.

Download, checkout, and start Flink
Download

We change directory back to our home directory

cd $HOME

Then clone the Apache Flink repository

git clone https://github.com/apache/flink.git

Then check out release-0.10  1.0

git checkout release-0.10
git checkout release-1.0

UPDATE 5-23: We’re on release 1.0 now.  Release 1.1 (which is what the master branch is on has some cool new stuff like streaming in the shell, but will also break backwards compatibility, e.g. it won’t work). I have a PR that makes it work, but I’ll save that for a future blog post.

And finally, build the package.

mvn clean package -DskipTests

Building Flink took 20 minutes on my virtual box.

Start Flink

Now start Flink with the following command:

build-target/bin/start-cluster.sh

Now go to http://yourip:8081 and check out the Flink web-ui.

Oh, hello there new friend.
Oh, hello there new friend.

Make sure there is something listed under the task manager. If there is nothing stop and restart the flink cluster like this:

build-target/bin/stop-cluster.sh
build-target/bin/start-cluster.sh
Download, checkout, and start Spark
Download

We change directory back to our home directory

cd $HOME

Then clone the Apache Spark repository

git clone https://github.com/apache/spark.git

Then check out branch-1.4 1.6

git checkout branch-1.4
git checkout branch-1.6 -Psparkr

UPDATE 5-23: We’re on branch 1.6 now, and we want SparkR support.

And finally, build the package.

mvn clean package -DskipTests

Building Spark took 38 minutes on my virtual box.

Start Spark

In a cluster, you have a boss that is in charge of distributing the work and collecting the results and a worker that is in charge of actually doing the work. In Spark these are referred to as the master and slave respectively.

In Flink we could start an entire stand alone cluster in one line. In Spark, we must start each individually. We start the master with a flag --webui-port 8082. By default the webui-port is 8080, which is already being used by Zeppelin.

sbin/start-master.sh --webui-port 8082

Now go check out the Spark master web-ui. It will be at http://yourip:8082.

spark://nameofhost:7077
spark://MARKDOWN_HASH61a4bd20fbd00664bbfa5c55f8b64df2MARKDOWN_HASH:7077

Note the URL listed. spark://ubuntu:7077. My URL is ubuntu because that is the name of my host. The name of your host will be what ever you set it up as during install. Write this url down, because next we are starting the slave. We have to tell the slave who its master is.

sbin/start-slave.sh spark://yourhostname:7077

The argument spark://yourhostname:7077 lets the slave know who its master is. This is literally the master’s URL. If you have another computer with Spark 1.4 installed you could run this line again (substituting ubuntu for the IP address of the master machine) and add another computer to your cluster.

Gotchya

For those that are not reading carefully and just copying and pasting, you probably won’t see this for a while anyway, but I want to say again, unless you just happen to have named your host ubuntu you need to change that to what ever the name is you found for the Master URL in the Spark Web-UI…

Now go back to your master webui and you should see the slave listed under workers.
spark master with slave

Start Zeppelin

Now everything is technically up and running. All we have left to do, is start Zeppelin back up, tell it to run code against our clusters (instead of the internal interpreters), and check that our examples still work.

Start Zeppelin with the following

cd $HOME
incubator-zeppelin/bin/zeppelin-daemon.sh start

Now go back to the Zeppelin web-ui at http://yourip:8080 and this time click on Interpreters at the top of the screen.

In the Spark section, click the edit button in the top right corner to make the property values editable. The only field that needs to be edited in the Spark interpreter is the master field. Change this value from local[*] to the URL you used to start the slave, mine was spark://ubuntu:7077.

Edit the spark and flink interpreters.
Edit the spark and flink interpreters.

Click ‘Save’, then scroll down to the Flink section. Click ‘edit’ and change the value of host from local to localhost. Click ‘Save’ again.

Now open the Flink notebook we made earlier.

Hit Shift+Enter or hit the play button at the top of the notebook to run the paragraphs. Hopefully the result is the same as before. Now in a new tab, go to the Flink Web-UI at http://yourip:8081. You should see the job has completed successfully on the cluster.

It's beeeeeautiful!
It’s beeeeeautiful!

Now open the spark example notebook from earlier and rerun this as well. After this notebook has run successfully go to the Spark Web-UI at http://yourip:8082 and see the job has run on this cluster.

Is great success.
Is great success.
Gotchya

if the Spark job seems to hang, go to the Spark Web-UI. If there is a job listed under running applications, but there are no workers listed, the slave has died, go back to the command line and run

cd $HOME
spark/sbin/start-slave.sh spark://yourhostname:7077

where yourhostname is the hostname you have been using for the Master URL this whole time.

Necromancing the Spark slave.
Necromancing the Spark slave.

Summary

Dude (or dudette), crack a beer. You just set up two of the most cutting edge big data engines available today in a cluster mode with an up-and-coming cutting edge (and pretty) web interface. Seriously, not a trivial task. Have you checked your linkedIn inbox in the last hour? because you probably have about 30 recruiters blowing you up.

Seriously though, It took me a couple of days smashing my head against the wall to make this rig work right and consistently. Seeing as I just saved you so much time, I think the least you could do is head over to, and sign up for, and participate in the user mailing lists for

Open source software is a beautiful thing, but it relies on a strong community. All of the projects, (and many more) could use your help, but especially Zeppelin which is still in incubator status.

(A thank you to the wonderful developers wouldn’t hurt either, they watch the mailing lists).

UPDATE 5-23:  I let this go with out updates longer than I should have, and I’m sorry.  To be honest, I probably won’t do it again.  I’m older and wiser now, the things listed here should remain valid for sometime to come. The big changes are:

  • We use the actual Zeppelin branch (not Till’s)
  • We build against Flink 1.0
  • We build against Spark 1.6 with SparkR support.

Happy hacking,

tg