Behind the Scenes of “RT’s HoRT IoT-A, An AIoT-P MVP Demo”: Part 2- The Streaming Engine

Welcome back from Part 1. On our last episode we did an introduction, spoke briefly on the overall structure of the program, discussed our demo data source, and did some foreshadowing of this week. This week we’ll do a bit of what we foreshadowed.

The Streaming Engine

Or How to Do Weird Stuff with Apache Flink

Let’s start off by looking at some pictures. Let me show you the DAG of the Flink Streaming Job to provide some motivation, and then we can go through piece by piece and look at the implementations.

Stream DAG

From the “architecture diagram” in Episode 1, this is what the Flink Job is doing.

  1. We pick up the data from MQTT
  2. We apply a sliding window doing some simple analytics on the data (Count, Min, Max, Average, Sum Squared Deviations, Standard Deviation)
  3. We join each record to the last analytics emitted from the sliding window
  4. We update the list of REST endpoints which serve the models every 30 seconds
  5. We send each Record + Last Emitted Analytics pair as a REST request to each model, and then sink the Record, Analytics, and Model Result (for each model) to Elasticsearch.

Pretty easy peasy.  If you’re not interested in Apache Flink, you can basically stop here and know/understand that Record from Divvy Bike Station + Analytics + Model Results are sunk (written) into Elasticsearch and that by continuously polling an endpoint that returns other active endpoints we can dynamically add/delete/update the models being ran on this stream of data.

Getting Data from MQTT

Initially I was using a copy of luckyyuyong’s flink-mqtt-connector in my implementation and required some hacks and updates.

The most important call out is that MQTT will disconnect brokers with the same clientID because it thinks they are old and stale, so make we have to make the clientID random. This was a particularly nasty and inconsistent bug to track down, but turns out many other have had this problem.  The solution here was just to add a string of milliseconds since the epoch.  Probably would need something more for production but this is an MVP.

String clientBase = String.format("a:%s:%s", Org_ID, App_Id).substring(0, 10);
String clientID = clientBase + System.currentTimeMillis();
mqttProperties.setProperty(MQTTSource.CLIENT_ID, clientID );

Aside from that little quirk, this all works basically like an Apache Kafka or any other connector.

Apply the Sliding Window

This was a bit of a trick because I didn’t want to hard code the data structure into the engine. I would like to eventually auto determine the schema of the json, but because of time constraints I set it up to be passed as a command line argument (but ended up hard coding what the CLI argument would be – see here).

This is important because we don’t want the window to be trying to compute analytics on text fields, and the data coming in form MQTT is all going to look like a string.

If you look at the code in ProcessWindowAnalyticsFunction you can see that function expects a schema to come in with the records, and in that schema any field that is listed as ‘Numeric’ we will attempt to compute analytics on.  Admittedly here, we are trading off performance for a single engine that will compute any data source.

Joining the Records and Last Reported Analytics

At this point, I had been doing A LOT of Java, which to be candid, I really don’t care for- so I switched over to Scala.  It’s really not a particularly interesting function. It simply joins all records with the last reported analytics from the sliding window.  You’re free to check it out here and make comments if you have questions. I realize there’s a lot of magic-hand-waiving and me telling the reader to “go look for yourself”,  for this blog / tutorial, but I am assuming you are fairly familiar with all of the tools I am using and I’m trying to give the reader a high level view of how this all fits together. If you have specific questions, please ask in the comments or email me (you can find me around the interwebs pretty easily).

The “Other” Stream

Now let’s shift and consider our “other” stream. The stream that simply polls an endpoint which serves us other endpoints every thirty seconds.  This is accomplished effectively by doing a time windowed stream on all of the events coming from the records source- throwing them all away, and then once every 30 seconds (but you could configure that), sending an Asyncy REST request to a preordained URL that holds the REST endpoints for the models.

You can see this in my github lines approx 140-159.  The endpoint that this system is hitting is being served in Apache OpenWhisk (which I absolutely love if you haven’t been able to gleam from my other blog posts, it’s like AWS / Google Cloud Functions except not proprietary vendor lock-in garbage).

You can see the response this gives here.  Obviously, a “next step” would be for this to hit some sort of database where you could add/delete entries. (If you’re to lazy to click, it basically just returns a json of { endpoints: [ { modelOneName: “http://the-url”}, …]}

Merging it All Together and Making Async Calls

Now we  bring everything together.  From one stream we have Device Event Records and analytics on those records, from the other we have a list of URLs which will serve models.  Now- it’s worth pointing out here, that while not implemented in a “real” version an easy add would be to have another field in the model name that specifies what devices it applies to- since that model is expecting certain input fields and different devices will have different fields. Again- a MINIMUM viable product is presented.

The rest is pretty simple conceptually- for each record/analytics item- it goes through the list of all applicable URLs (in this case all of the URLs), and pings each with the record and analytics as the payload. The code is here and may be more illuminating.  The magic happens in the main program right here.

The Async ping models is nice because as different requests come back at different speeds, they don’t hold up the rest of the stream.  A bug/feature can be introduced though if you don’t want the entire program to go flying off the rails if there is a single REST request.  To do that you must set the “timeout” of the Async function, my choice was to “ignore” but you could in theory re-request, allow up to X fails in a Y time, etc.


I want to state one more time- that this was a lot of waive-my-hands magic, and “go look for yourself”dom.  I probably could have made a 5 part blog just out of this post- but 1. I’m trying to write a book on something else already, and 2. the point of this blog series is an over view of how I built a “full stack” IoT Analytics solution from scratch part time in a couple of weeks.

Next Time

We’ll follow our hero into his mis-adventures in React, especially with the Dread Design System: Carbon.

See you, Space Cowboy.

Behind the Scenes of “Rawkintrevo’s House of Real Time IoT Analytics, An AIoT platform MVP Demo”

Woo, that’s a title- amirigh!?

It’s got everything- buzzwords, a corresponding YouTube video, a Twitter handle conjugated as a proper noun.


Just go watch the video– I’m not trying to push traffic to YouTube, but it’s a sort of complicated thing and I don’t do a horrible job of explaining it in the video. You know what, I’m just gonna put it in line.

Ok, so Now you’ve see that.  And you’re wondering? How in the heck?!  Well good news- because you’ve stumbled to the behind the scenes portion where I explain how the magic happened.

There’s a lot of magic going on in there, and some you probably already know and some you’ve got no idea. But this is the story of my journey to becoming a full stack programmer.  As it is said in the Tao of Programming:

There once was a Master Programmer who wrote unstructured programs. A novice programmer, seeking to imitate him, also began to write unstructured programs. When the novice asked the Master to evaluate his progress, the Master criticized him for writing unstructured programs, saying, “What is appropriate for the Master is not appropriate for the novice. You must understand Tao before transcending structure.”

I’m not sure if I’m the Master or the Novice- but this program is definitely unstructured AF. So here is a companion guide that maybe you can learn a thing or two / Fork my repo and tell your boss you did all of this yourself.

Table of Contents

Here’s my rough outline of how I’m going to proceed through the various silliness of this project and the code contained in my github repo .

  1. YOU ARE HERE. A sarcastic introduction, including my dataset, WatsonIoT Platform (MQTT). Also we’ll talk about our data source- and how we shimmed it to push into MQTT, but obviously could (should?) do the same thing with Apache Kafka (instead). I’ll also introduce the chart- we might use that as a map as we move along.
  2. In the second post, I’ll talk about my Apache Flink streaming engine- how it picks up a list of REST endpoints and then hits each one of them.  In the comments of this section you will find people telling me why my way was wrong and what I should have done instead.
  3. In this post I’ll talk about my meandering adventures with React.js, and how little I like the Carbon Design System. In my hack-a-thon submission,  I just iFramed up the Flink WebUI and Kibana, but here’s where I would talk about all the cool things I would have made if I had more time / Carbon-React was a usable system.
  4. In the last post I’ll push this all on IBM’s K8s. I work for IBM, and this was a work thing. I don’t have enough experience on any one else’s K8s (aside from microK8s which doesn’t really count) to bad mouth IBM. They do pay me to tell people I work there, so anything to rude in the comments about them will most likely get moderated out. F.u.

Data Source

See and scroll down to Data Source. I’m happy with that description.

As the program is currently, right about here the schema is passed as a string. My plan was to make that an argument so you could submit jobs from the UI.  Suffice to say, if you have some other interesting data source- either update that to be a command line parameter (PRs are accepted) or just change the string to match your data.  I was also going to do something with Schema inference, but my Scala is rusty and I never was great at Java, and tick-tock.

Watson IoT Platform

I work for IBM, specifically Watson IoT, so I can’t say anything bad about WatsonIoT.  It is basically based on MQTT, which is a pub-sub thing IBM wrote in 1999 (which was before Kafka by about 10 years, to be fair).

If you want to see my hack to push data from the Divvy API into Watson IoT Platform, you can see it here. You will probably notice a couple of oddities.  Most notably, that only 3 stations are picked up to transmit data.  This is because the Free account gets shut down after 200MB of data and you have to upgrade to a $2500/mo plan bc IBM doesn’t really understand linear scaling. /shrug. Obviously this could be easily hacked to just use Kafka and update the Flink Source here.

The Architecture Chart

That’s also in the github, so I’m going to say just look at it on

Coming Up Next:

Well, this was just about the easiest blog post I’ve ever written.  Up next, I may do some real work and get to talking about my Flink program which picks up a list of API endpoints every 30 seconds, does some sliding window analytics, and then sends each record and the most recent analytics to each of the end points that were picked up, and how in its way- this gives us dynamic model calling. Also- I’ll talk about the other cool things that could/should be done there that I just didn’t get to. /shrug.

See you, Space Cowboy.




Building a License Plate Recognizer For Bike Lane Uprising

UPDATE 6/20/2019: Christina of Bike Lane Uprising doesn’t work in software sales, and therefor is terrified of “forward-selling”, and wants to make sure its perfectly clear that automatic plate recognition, while in the road map, will not be implemented for some time to come.

I was recently at the Apache Roadshow Chicago where I met the founder of Bike Lane Uprising, a project whose goal is to make bicycling more safe by reducing the occurrence of bike lane obstruction.

I actually met Christina when I was running for Alderman last fall (which is why I’ve been radio-silent on the blogging for the last 8 months- before you go Google it, I’ll let you know I dropped out of the race).  As a biker, I identified with what she was doing.  Chicago is one of the bike-y-est cities in North America, but it can still be a very… exciting… adventure commuting via bike.  It also seemed like there was probably something I could do to help out.   I promised after the election was done I’d be in touch.

Of course, I forgot.

Actually, I didn’t forget- I dropped out of the election because I got a book deal with O’Reilly Media to write on Kubeflow AND because I had already committed to producing the Apache Roadshow Chicago.  So I “punted” and promised to help out after the Roadshow.

Christina was still nice enough to come out and speak at the roadshow, and was very well received.  We were talking there, and I, remembering my oath to help out, finally got a hold of her a week or two later.

Her plan, was to do license plate recognition.  The current model of Bike Lane Uprising uses user submitted photos, which are manually tagged and entered into a database with things like: License Plate Number, Company, City/State Vehicles, etc.  She had found an open source tool called OpenALPR and wondered if BLU could use it somehow.

Now obviously this is going to have to be served somewhere, and if you hadn’t heard, I have fallen deeply in love with Apache OpenWhisk-incubating over the last 18 months.  And I’m not saying that with my IBM hat on, I genuinely think it is an amazing and horribly underrated product. Also it’s really cheap to run (note- I am running it as IBM Cloud Functions, which is just a very thin veil over OpenWhisk).

OK so OpenALPR has a Python API.  Good news and bad news- good news because this project will take 20 minutes and I’ll be done, bad news because it’s too quick and easy to make a blog post out of.  Considering you’re reading the blog post- obviously that didn’t work.  If you look at OpenALPR, you’ll see its been over a year since any work has gone on with it. It’s basically a derelict, but a functional one…ish.  The Python is broken- some people said they could make the Python work if they built from scratch- I could not. Gonna have to CLI this one.

As a spoiler- here’s the code.

Well, that’s exciting because I’ve never built a Docker function before (only Python, and some pretty abusive python tricks that use os.system(... to build the environment at run time…

For this trick however, we do a multi stage build.  This is because the Alpine Linux repos are unstable as hell, and if you want to version lock something you basically have to build from source.

OpenALPR depends on OpenCV and Tesseract. OpenWhisk expects its Docker functions to start off from the dockerSkelaton image.**  So if you go into the docker/ directory- I’d like to first direct your attention to opencv which uses openwhisk/dockerSkelaton for a base image and builds OpenCV. Was kind of a trick. Not horrible.  Then, we have the tesseract folder which builds an image using rawkintrevo/opencv-whisk as a base. Finally, openalpr/ which builds an image using rawkintrevo/tesseract-whiskas a base.  Now we have an (extremely overweight bc I was lazy about my liposuction) environment with openalpr installed.  Great.

Finally, let me direct your attention to where the magic happens. plate-recog-server-whisk/ has a number of interesting files. First there is an executable called exec.  A silly little bash file with only one job- to call

python3 /action/ ${1}

You see- the dockerSkelaton has a bunch of plumbing, but OpenWhisk expects there to be a file /action/exec that it will execute and the last line of stdout from that executable to be a json (which OpenWhisk will return).

So lets look at the code of elegant in it’s simplicity.  This is a program that takes a single command line arg, a json (that’s how OpenWhisk passes in parameters), that json may have two keys, a required image url, and an optional top n license plates. calls alpr with the image and the top plates, and prints the response (in json from, which is what the -j flag is for).  And that’s it. So simple, so elegant.

I’m trying to follow this design pattern more and more as I get older- their usually exists some open source package that does what I want, and I just need a little python glue code up top.  In this case- I wanted

  • License Plate Recognition
  • As-a-Service (e.g. done via API call and scalable).

OpenALPR + Apache OpenWhisk-incubating.


I’m hoping to write more now that life has slowed down a bit. Stop back by soon.

** UPDATE 2: The Apache OpenWhisk folks wanted me to provide some clarity around this statement.  Specifically they said about this sentence, that it:

 isn’t quite right. You can use any image as your base as long as you implement the lifecycle/protocol. What you get from the skeleton is a built in solution. You could have started with our python image or otherwise.


Photo Credit: Ash Kyd


Pavlov’s Sandman pt 3: Hacking a Shock Collar and the benefits of Propper Cargo Culting

In the last post I laid out a super simple algorithm for detecting snores.  You might have read that and thought, “dude, this barely counts as data science”. I would agree with you.  I did that in January, the talk was in May. I figured I’d go hack the shock collar, and then work on fine tuning the algorithm with some more advanced magic.  Turns out I had no idea what I was doing with bluetooth hacking in Python, and it was by luck in early March before I was able to make any communication with the shock collars / begin testing.

I had done some device hacking before (see Cylons).  I thought this would be easy…

_ Author makes 1000-yard stare out of the window at restaurant where he is writing blog_

… it wasn’t.

The idea was, I would be able to see what messages my phone was sending the device, then I would send the same messages and viola! I would have control. So step 1 is to snoop the communication between the phone and the device.

To do this, open your Android (if you’re using an iPhone, you’ll first need to smash it with a hammer and get a real cell phone), go to settings, open developer options (varies by phone look up a tutorial for your model), click on the “Enable HCI bluetooth snoop log”, and then go into the app to pair with the collar / send commands.

Screen Shot 2018-05-22 at 2.37.58 PM

Too easy!  Next step is to pull the log off your phone. For this step you’ll need Linux (actually you can theoretically do this without Linux, but you will need linux for the Python program that controls the audio of the app or significantly refactor it to run on your trash OS)- if you’re not using Linux you’ll need to format your hard drive and install a grownup’s operating system, you may also need to install Android Debugging Bridge.  Open the terminal and type in:

adb pull /sdcard/btsnoop_hci.log

Now here’s a picture.

Screen Shot 2018-05-23 at 3.24.01 PM
A screen shot of a screen shot, but you get the idea.

Ok, that’s going to pull the snoop file on to your computer.  Next step, open up Wireshark or whatever your favorite log analyzer is and read through.

Screen Shot 2018-05-23 at 3.26.02 PM.png

What we see here is what (I think) was a “buzz” command from my phone to the collar.

Cool- so now we see the commands my phone was sending to my device (shock collar), should be easy to mimic right?  Well, no it wasn’t. Not because it was intrinsically difficult, but because I had no idea what I was doing.  What happened next in my story was about two months of me (off and on) trying to make contact from my computer to my device.  The first problem was there was no way to pair between my device and the computer, some of the linux bluetooth utils would see the device, others would not.

I was stabbing in the dark. Do you remember being a child, and seeing your parents order pizza.  And then one day you want to order pizza, and you’ve seen your parents do it plenty of times and seems easy enough, so you walk to the phone press buttons, say “pizza please” and then wait forty five minutes for pizza to show up. You only know if you did it right because pizza either shows up or it doesn’t.  That was basically my experience trying to hack this S.O.B.

I was cargo culting, and doing so with more concern as the deadline approached.  Finally, the cargo gods smiled on me.  I learned about Low Energy Bluetooth devices (which my device is).  I found a somewhat obscure python library for dealing with them.  But I still wasn’t getting signals through (that is to say, the device was not reacting to commands I sent).   From there, I litterally cargo culted backwards.  There are a lot of commands the phone sends to the device and I started going backwards through them all. In not much time- SUCCESS! The collar buzzed.  It turns out there is a command which I roughly translate to mean “Hello device, I am a controller that will be sending you commands now, please do whatever you hear from me”, and then it does.


The code that does all of this is so simple and straight forward its hardly worth giving a line by line.

You can find the controller class here.

Pavlov’s Sandman Pt 2: Background and Detecting Snores

Welcome to the long anticipated follow on post to Pavlov’s Sandman: Pt 1.  In that post I admitted that I had a problem with snoring, and laid out a basic strategy and app to stop snoring.  They say that admitting you have a problem is half of the battle.  In this case that is untrue- hacking the shock collar was half of the battle, and the other half of the battle was detecting snores.

I had a deadline for completing this project, ODSC East in Boston, at the beginning of May.  What the talk ended up being about (and in turn what this blog post will now be about) is how in “data science” we usually start with a problem, that based on some passing knowledge seems very solvable and then the million little compromises we make with ourselves on our way to solving the problem/completing a product.

This blog post and the one or two that follow will be an exposition / journal of my run-and-gun data science as I start with assuming a problem is easy, have some trouble but assume I have plenty of time, realize time is running out, panic, be sloppy with experiments, make a critical break through in the 11th hour, and then deliver an OK product / talk / blog post series.

A brief history of the author as a snorer.

As best as I can tell, I began snoring in Afghanistan.  This isn’t suprising, the air in Kabul was so bad the army gave me a note saying that if I ever had respitory issues, it was probably their fault (in spite of the fact that everyone was smoking a pack per day of Luckys / Camel Fulls / Marlboro Reds ).  This is to say nothing of the burn pits I sat next to to keep warm while on gate duty from December until March, or the five thousand years of sheep-poo-turned-to-moon-dust always blowing around in the country side west of Kabul City.

As a brief aside, do you know what’s really fun about trying to “research” when you started snoring? Making a list of ex’s and then contacting each of them out of the blue with a “hey, weird question but…”, it’s like a much more fun version of the “Who gave me the STD?” game.

After Afghanistan, girls I was sleeping with would occasionally complain of my snoring.  This came to a head with my ex though.  She would complain, elbow me, sleep on the couch, etc. But I was more concerned when the issue came up with my new girl friend (a very light sleeper, or I’ve gotten much worse about snoring).  This was especially concerning, because I had tried every snoring “remedy” on the internet and had no success.

Break throughs on other fronts.

I have a puppy named Apache, and was at the trainers.  They convinced me that I should start using a wireless collar ( a shock collar ).  The guy who trained me taught me that you don’t want to hurt the dog, you want to deliver the lightest shock they can feel and just keep tapping them with that until they stop doing what they should be doing.  The shock should be uncomfortable, not painful.

One of the “remedies” I had tried for my snoring before was an app called Sleep as Android. In this app there was an “anti-snoring” function where the phone would buzz or make a noise when you were snoring- this had no effect, but I had always wished I could rig it out to a shock collar.

Finally, in November of last year- I discovered that you can buy a shock collar which is controlled via Bluetooth on Amazon for about $50. (PetSafe).

I have done some device hacking, I figured I could figure out the shock collar easily enough. Detecting snores also seemed easy enough.  So I wrote a paper proposal for ODSC and started working on the issue. (And wrote the last blog post which recorded me snoring).

Snore Detection and the Data Science Process of the Author

Traditionally when I start on a project, I try to come up with an exceptionally simple-enough-to-explain-to-a-5-year-old type of algorithm just so I have a baseline to test against. The benefits to this are 1) having a baseline to train against, but 2) to become “familiar with the data”.

My first attempt at a “simple snore detector” was to attempt to fit a sin curve to the volume of the recorded noises.  This got me used to working with Python audio controls and sound files.  I also learned right away this wasn’t going to work because the “loud” part of the snore happens then there is a much longer quiet portion. That is to say we don’t breath evenly.  I don’t have sleep apnea (that is to say I don’t stop breathing), so the snores are relatively evenly spaced apart, but there are also “other noises” and various other reasons, the sin wave curve fitting just wasn’t ideal.

At this point I went back and read some academic literature on snore detection. There isn’t a lot, but there is a bit.

Automatic Detection of Snoring Events Using Gaussian Mixture Models by Dafna et. al.

Automatic detection, segmentation and assessment of snoring from ambient acoustic data by Duckitt et. al.

An efficient method for snore/nonsnore classification of sleep sounds Cavusoglu et. al.

My Synopsis

Dafna reconstructed when the patient was snoring by looking at the entire night of data and looking at how volume compared to the average.  Following his method and converting it to “real-time” detection however, was going to be problematic.

Duckitt created a Hidden Markov Model (mid 2000s speak for LSTM) (yes I know they’re not that same) with the states snoringnot-snoringother-noisesbreathingduvet-noise.  An interesting idea, one I might visit for a “real version”.

Cavusoglu looked at subband energy distributions, inter and intra individual spectra energy distributions, some principal component analysis, in other words- MATH.  I liked this guys approach and decided to mimic it next.


pyAudioAnalysis is a package created and maintained(ish) by Theodoros Giannakopoulos.  It will break audio files down into 32 features, including the ones used by Cavusoglu.  From there I tried some simple logistic regression, random forrest classification, and K-nearest-neighbors classification.

The results weren’t bad, but I was VERY opposed to false positives (e.g. getting shocked when I didn’t snore.  The numbers I was getting on validation just didn’t inspire me (though looking back I think I could probably have been ok.)

Screen Shot 2018-05-22 at 1.59.18 PM

Back to Basics

A quick note on the “equipment” I am using to record, it is a laptop mic, which is basically trash.  Lots of back ground noise.  At this point, I had been playing with audio files for a while.  I decided to see if I could isolate the frequency bands of my snoring.

In short I found that I normally snore at 1500-2000Hz, 5khz-7khz, and occasionally at 7khz-15khz.  I decided to revisit the original loud noises idea, but this time, I would filter the original recording (for the last 5 seconds) into 1500-2khz and 5-7khz. If there was a period which was over the average + 1 standard deviation for the clip, which lasted longer than 0.4 seconds, but less than 1.2 seconds, then there was a pause in which the intensity (volume at that frequency band) was less than the threshold (mean + 1.5 stdevs) for 2.2-4.4 seconds and then another period where the intensity was above the threshold for 0.4 to 1.2 seconds, then we would be in a state of snoring.

This worked exceptionally well, except when I deployed it, I accidentally set the bands at 1500-2khz and 5khz-7khz, which missed a lot of snores.  I will be updating shortly.

Screen Shot 2018-05-22 at 2.13.08 PM.png

In the upper image above, we see the original audio file intensity (blue) over time, and in orange is the intensity on the 5-7khz band. The black line is the threshold. This would have been classified as a snore (except it wouldn’t because I wasn’t watching 5-7khz on accident).


So that is the basics of detecting snores in real time.  Record a tape of the last 5 seconds of audio, analyze it- and if thresholds are surpassed, then we have a “snore”. Fire the shock. but oh, firing the shock and hacking the shock collar- that was a whole other adventure.  To be continued…


Pavlov’s Sandman pt. 1

I snore. I’m perfect in many most ways, but this is the one major defect I am aware of. Like any good engineer, upon learning of a defect I set out to correct or at least patch it. The name of this little project stems from Ivan Pavlov’s experiments with conditioning as well as Operation Sandman, a CIA program for sleep deprivation torture of detainees at Gitmo.

The naming convention is evident when one looks at the strategy I am taking to correct my unpleasant snoring habit.  I have an app on my phone (Sleep as Android) that tracks my sleep, is a cool alarm, and among other things, tracks my snoring.   From this app I know:

  1. It is possible to “detect” snoring.
  2. I don’t rip logs all night, but in bursts.

I have tried a number of things to correct this throughout the last year including mouth piece, a jaw strap, a shirt with a tennis ball sewn in the back, video recording myself sleeping to see if I can detect a position where snoring occurs, essential oils/other alternative medicines, etc.  Failing all of these, I now begrudgingly turn to “Data Science” the form of mysticism reserved for the exceptionally desperate.

The plan of attack on this endeavor is as follows:

  1. Create a program that detects loud noises (preprocessing).
  2. Differentiate between snoring and other nighttime noises (dog, furnace, coughs, etc).
  3. When snoring is detected administer a small shock via a Bluetooth controlled shock collar for dogs which I will be wearing as an arm band.
  4. Video record results of me electrocuting myself while trying to sleep and post to YouTube, elevating me to stardom.
  5. Possibly train myself to stop snoring.

The title of this project should now be apparent, as I am hoping to “train” myself to not snore, and if I were developing this commercially, I’m fairly confident that any beta-testing I did would (rightly) classify me as a war criminal.

In part one of this series (I promise that all the time and have a bad habit of not following through), I present the code and methods I have developed for detecting loud noises / building my dataset.

In future parts, I hope to do some cool things with respect to signal processing and Bluetooth device hacking with Python.

GitHub Repo

Loud Noises

The first step is to record sounds. The code presented is fairly elegant and easy to follow (for now).

We have a class AudioHandler which contains some variables and a few methods.

In addition to the alsaaudio handler, we have:

  • rawData a list for holding caching the audio recorded
  • volume which will be used to create a csv of  timestamp, volume data
  • and various thresholds to prevent getting multiple shocks in unison, a warm-up period, volume threshold for recording, etc.

The methods are:


This action listens to the mic for a number of seconds and attempts to dynamically set the threshold. It’s not great- for my first night of use I ended up doing it manually by observation and laying in bed and making some breathing / fake snoring sounds and seeing where it hit.


This method writes the csv and audio to disk


This is a place holder, and later will be used to call the “shock”


This is where all the fun happens. In short it

  1. Attempts to set a baseline threshold considering mic sensativity, background noise, etc.
  2. In a while loop it then
    1. Listens to the mic
    2. If the volume is above the threshold:
      1. Set a recordingActive flag to True if it wasn’t already
      2. If it wasn’t already, timestamp when this recording started
      3. Determine how long the current recording has been going on.
      4. If it has been going on for some duration, call the executeAction method and dump the recording to disk.
    3. If recordingActive is true, add the raw audio and volume levels to rawData and volume

And that’s about it. Again, look at the code for specifics, but all in all pretty straight forward.

Last night I recorded.

Building a Training Set

As a priest of the mystic art of data science, the first part of any ceremonial ritual is to create a training / testing data set.

This was a very tedious part of my day.  I went through the recordings and seperated them into two folders “snore” and “non-snore”.  Well, I did this for about 30 minutes, and got approx 80 samples of each. Then I moved the rest into an “unlabled” folder… you know for testing purposes, not because I was super bored.  Perhaps if I had an intern, this would have been a more robust set.

Finally I wrote a little python script that will copy the csvs over appropriately to all of the wav files you sorted out into the proper directories.

Stay tuned for part 2, where we’ll do some signal processing to differentiate the snores from the noises!


Borg System Architecture

or “how I accidentally enslaved humanity to the Machine Overlords”.

The Borg are a fictional group from the Sci-Fi classic, Star Trek, who among other things have a collective consciousness.  This creates a number of problems for the poor humans (and other species) that attempt to resist the Borg, as they are extremely adaptive. When a single Borg Drone learns something, its knowledge is very quickly propagated through the collective, presumably subject to network connectivity issues, and latency.

Here we create a system for an arbitrary number edge devices to report sensor data, a central processor to use the data to understand the environment the edge devices are participating in, and finally to make decisions / give instructions back to the edge device.  This is in essence what the Borg are doing.  Yes, there are some interesting biological / cybernetic integrations, however as far as the “hive mind” aspect is concerned, this is basic principles in play.

I originally built this toy to illustrate that “A.I.” has three principle components: Real time data going into a system, an understanding of the environment is reached, a decision is made. (1) Real Time artifical intelligence, like the “actual” intelligence it supposedly mimics is not making E.O.D. batch decisions. (2) In real time the system is aware of what is happening around it- understanding its environment and then using that understanding to (3) make some sort of decision about how to manipulate that environment. Read up on definitions of intelligence, a murky subject itself.

Another sweet bonus, I wanted to show that sophisticated A.I. can be produced with off-the-shelf components and a little creativity, despite what vendors want to tell you. Vendors have their place. It’s one thing to make something cool, another to productionalize it- and maybe you just don’t care enough. However, since you’re reading this- I hope you at least care a little.

Artificial Intelligence is by no means synonymous with Deep Learning, though Deep Learning can be a very useful tool for building A.I. systems.  This case does real time image recognition, and you’ll note does not invoke Deep Learning or even the less buzz-worthy “neural nets” at any point.  Those can be easily introduced to the solution, but you don’t need them.

Like the Great and Powerful Oz, once you pull back the curtain on A.I. you realize its just some old man who got lost and creatively used resources he had lying around to create a couple of interesting magic tricks.


System Architecture

OpenCV is the Occipital Lobe, this is where faces are identified in the video stream.

Apache Kafka is the nervous system, how messages are passed around the collective. (If we later need to defeat the Borg, this is probably the best place to attack- presuming we of course we aren’t able to make the drones self aware).


Apache Flink is the collective consciousness of our Borg Collective, where thoughts of the Hive Mind are achieved.  This is probably intuitive if you are familiar with Apache Flink.

Apache Solr is the store of the “memories” of the collective consciousness.

The Apache Mahout library is the “higher order brain functions” for understanding. It is an ideal choice as it is well integrated with Apache Flink and Apache Spark

Apache Spark with Apache Mahout gives our creation a sense of conext, e.g. how do I recognize faces? It quickly allows us to bootstrap millions of years of evolutionary biological processes.

A Walk Through

(1) Spark + Mahout used to calculate eigenfaces (see previous blog post).

(2) Flink is started, it loads the calculated eigenfaces from (1)

(3) A video feed is processed with OpenCV .

(4) OpenCV uses Haar Cascade Filters to detect faces.

(5) Detected faces are turned in to Java Buffered Images, greyscaled and size-scaled to the size used for Eigenface calculations and binarized (inefficient). The binary arrays are passed as messages to Kafka.

(6) Flink picks up the images, converts them back to buffered images. The buffered image is then decomposed into linear a linear combination of the Eigenfaces calculated in (1).

(7) Solr is queried for matching linear combinations. Names associated with best N matches are assigned to each face. I.e. face is “identified”… poorly. See next comments.

(8) If the face is of a “new” person, the linear combinations are written to Solr as a new potential match for future queries.

(8) Instructions for edge device written back to Kafka messaging queue as appropriate.


A major problem we instantly encountered was that sometimes OpenCV will “see” faces that do not exist, as patterns in clothing, shadows, etc. To overcome this we use Flink’s sliding time window and Mahout’s Canopy clustering.  Intuitively, faces will not momentarily appear and disappear within a frame, cases where this happens are likely errors on the part of OpenCV. We create a short sliding time window and cluster all faces in the window based on their X, Y coordinates.  Canopy clustering is used because it is able to cluster all faces in one pass, reducing the amount of introduced latency.  This step happens between step (6) and (7)

In the resulting clusters there are either lots of faces (a true face) or a very few faces (a ghost or shadow, which we do not want).  Images belonging to the former are further processed for matches in step (7).

Another challenge is certain frames of a face may look like someone else, even though we have been correctly identifying the face in question in nearby frames.  We use our clusters generated in the previous hack, and decide that people do not spontaneously become other people for an instant and then return. We take our queries from step 7 and determine who the person is based on the cluster, not the individual frames.

Finally, as our Solr index of faces grows, our searches in Solr will become less and less effecient.  Hierarchical clustering is believed to speed up these results and be akin to how people actually recognize each other.  In the naive form, for each Solr Query will scan the entire index of faces looking for a match.  However we can clusters the eigenface combinations such that each query will first only scan the cluster centriods, and then only consider eigenfaces in that cluster. This can potentially speed up results greatly.



This is how the Borg were able to recognize Locutus of Borg.


This type of system also was imperative for Cylon Raiders and Centurions to recognize (and subsequently not inadvertently kill) the Final Five.

Shorter Term

This toy was originally designed to work with the Petrone Battle Drones however as we see the rise of Sophia and Atlas, this technology could be employed to help multiple subjects with similar tasks learn and adapt more quickly.  Additionally there are numerous applications in security (think network of CCTV cameras, remote locks, alarms, fire control, etc.)

Do you want Cylons? Because that’s how you get Cylons.

Alas, there is no great and powerful Oz. Or- there is, and …




Flink Forward, Berlin 2017
Slides Video (warning I was sick this day. Not my best work).

Lucene Revolution, Las Vegas 2017
Slides Video

My Git Repo

PR Donating this to Apache Mahout
If you’re interested in contributing, please start here.

Using JNIs (like OpenCV) in Flink

For a bigger project which I hope to blog about soon, I needed to get the OpenCV Java Native Library (JNI) running in a Flink stream. It was a pain in the ass, so I’m putting this here to help the next person.

First thing I tried… Just doing it.

For OpenCV, you need to statically initialize (I’m probably saying this wrong) the library, so I tried something like this

val stream = env
.map(record => {

Well, that kicks an error that looks like:

Exception in thread "Thread-143" java.lang.UnsatisfiedLinkError:
Native Library /usr/lib/jni/ already loaded in
another classloader

Ok, that’s fair. Multiple task managers, this is getting loaded all over the place. I get it. I tried moving this around a bunch. No luck.

Second thing I tried… Monkey see- monkey do: The RocksDB way.

In the Flink-RocksDB connector, and other people have given this advice, the idea is to include the JNI in the resources/fat-jar, then write out a tmp one and have that loaded.

This, for me, resulted in seemingly one tmp copy being generated for each record processed.


* This is an example of an extremely stupid way (and consequently the way Flink does RocksDB) to handle the JNI problem.
* Basically we include in src/main/resources so then it creates a tmp version.
* I deleted from it resources, so this would fail. Only try it for academic purposes. E.g. to see what stupid looks like.
object NativeUtils {
   // heavily based on
    def loadOpenCVLibFromJar() = {

        val temp = File.createTempFile("libopencv_java330", ".so")

        val inputStream= getClass().getResourceAsStream("/")

        val os = new FileOutputStream(temp)
        var readBytes: Int = 0
        var buffer = new Array[Byte](1024)

        try {
           while ({(readBytes =
              readBytes != -1}) {
              os.write(buffer, 0, readBytes)
        finally {
        // If read/write fails, close streams safely before throwing an exception



Third way: Sanity.

There were more accurately like 300 hundred ways I tried to make this S.O.B. work, I’m really just giving you the way points- major strategies I tried in my journey. This is the solution. This is the ‘Tomcat solution’ I’d seen referenced throughout my journey but didn’t understand what they meant. Hence why, I’m writing this blog post.

I created an entirely new module. I called it org.rawkintrevo.cylons.opencv. In that module there is one class.

package org.rawkintrevo.cylon.opencv;

import org.opencv.core.Core;

public class LoadNative {

   static {

   native void loadNative();

I compiled that as a fat jar and dropped it in flink/lib

Then, where I would have run System.loadLibrary(Core.NATIVE_LIBRARY_NAME), I now put Class.forName("org.rawkintrevo.cylon.opencv.LoadNative").

   val stream = env
      .map(record => {
         // System.loadLibrary(Core.NATIVE_LIBRARY_NAME)

Further, I copy the OpenCV Java wrapper (opencv/build/bin/opencv-330.jar), and library (opencv/build/lib/ in flink/lib

Then I have great success and profit.

Good hunting,


Watching YouTube Activity with Apache Streams

We’re finishing up our series of blogs on providers with YouTube. After this we’ll get down business on building our own social monitoring quasi-app!

Getting YouTube Credentials

Before accessing any API, we of course need to get our credentials.

A good first step in an enterprise like this would be to read the docs, or at least thumb through them, regarding registering an application with the YouTube API.

But if you wanted to read lengthy docs, you probably wouldn’t be checking out this blog- so here’s the short version:

Open the Google Developers Dashboard we’re going to “Create a Project”

Screenshot from 2016-12-06 13-25-05.png

I’m going to call my application “Apache Streams Demo”

Screenshot from 2016-12-06 13-26-13.png

The next dialog is where things can go awry.  First, right click “Enable APIs you plan to use” and open that link in a new tab, then click the drop down arrow next to “Create Credentials” and then select “Create Service Account Key” (we’ll come back to API enablement in a minute).

Screenshot from 2016-12-06 13-30-46.png

In the next screen choose ‘New Service Account’ and select P12 download type (not JSON which is the default).  A dialogue will pop up with some key information. I set permissions to owner, but that probably isn’t necessary. After you hit create the key will be generated and downloaded.

Screenshot from 2016-12-06 13-47-00.png

Screenshot from 2016-12-06 13-50-32.png

Set that .p12 file aside somewhere safe. We’ll need it soon.

Finally, we need to enable the services we want with this key.  Go to the library tab (or open the ‘new tab’ you opened when you were creating the service account.

You’ll see a long list of all the APIs you can grant access to.  We want to focus, for the moment on the YouTube services.

Screenshot from 2016-12-08 13-47-13.png

Go through each API you want (the YouTubes for this demo) and click “Enable” at the top.

Screenshot from 2016-12-08 14-20-27.png


Full Credit to Steve Blockmon on this well done notebook.

Import Setup.json

Import YouTube.json


Steve and I are working toward a suite of notebooks aimed at creating a Social Monitoring Quasi-App.  In the first paragraph of setup.json we see the following paragraph.


We’re using the Zeppelin Dependency Loader here (you’ll likely see a message how this is deprecated). At the time of writing, there seems to be an issue with adding repositories via the GUI (e.g. it doesn’t pick up the new repository URL).

z.addRepo(... does add the new repository- in this case the Apache SNAPSHOT repository. The remaining lines load various Apache Streams providers.

In the next paragraph of the notebook Steve defines a couple of UDFs for dealing with punctuation and spaces in tweets and descriptions. These will come in handy later and are worth taking a moment to briefly grok.


The first paragraph isn’t too exciting- some simple imports:

import org.apache.streams.config._
import org.apache.streams.core._

import com.typesafe.config._

import java.util.Iterator

The second paragraph is where things start to get interesting. As we do, we are building a hocon configuration. We are also leveraging Zeppelin Dynamic Forms

val apiKey = z.input("apiKey", "")
val serviceAccountEmailAddress = z.input("serviceAccountEmailAddress", "")
val pathToP12KeyFile = z.input("pathToP12KeyFile", "")

val credentials_hocon = s"""
youtube {
apiKey = $apiKey
oauth {
serviceAccountEmailAddress = "$serviceAccountEmailAddress"
pathToP12KeyFile = "$pathToP12KeyFile"

This paragraph will yield:


To find your API Key and Service account (assuming you didn’t write them down earlier- tsk tsk). Go back to your Google Developers Console, on the Credentials Tab, and click “manage service accounts” on the right.  On this page you will see your service account and API Key. If you have multiple- make sure to use the one that cooresponds to the .p12 file you downloaded.


After you enter your crendentials and path to .p12file- make sure to click the “Play” button again to re-run the paragraph and update with your credentials.

The next configuration specifies who we want to follow. It is a list of UserIDs.

val accounts_hocon = s"""
youtube.youtubeUsers = [
# Apache Software Foundation - Topic
{ userId = "UCegQNPmRCAJvCq6JfHUKZ9A"},
# Apache Software Foundation
{ userId = "TheApacheFoundation"},
# Apache Spark
{ userId = "TheApacheSpark" },
# Apache Spark - Topic
{ userId = "UCwhtqOdWyCuqOboj-E1bpFQ"},
# Apache Flink Berlin
{ userId = "UCY8_lgiZLZErZPF47a2hXMA"},
# Apache Syncope
{ userId = "UCkrSQVb5Qzb13crS1kCOiQQ"},
# Apache Accumulo
{ userId = "apacheaccumulo"},
# Apache Hive - Topic
{ userId = "UCIjbkZAX5VlvSKoSzNUHIoQ"},
# Apache HBase - Topic
{ userId = "UCcGNHRiO9bi6BeH5OdhY2Kw"},
# Apache Cassandra - Topic
{ userId = "UC6nsS04n_wBpCDXqSAkFM-w"},
# Apache Hadoop - Topic
{ userId = "UCgRu3LbCjczooTVI9VSvstg"},
# Apache Avro - Topic
{ userId = "UCzHCk8Gl5eP85xz0HwXjkzw"},
# Apache Maven - Topic
{ userId = "UCBS2s2cwx-MW9rVeKwee_VA"},
# Apache Oozie - Topic
{ userId = "UCRyBrviuu3qMNliolYXvC0g"},

I would recommend you stick with this list if you don’t have any activity on YouTube, or you’re just getting started and want to make sure some data is coming in.  But in the spirit of creating our own little social monitoring quasi-app, consider replacing the proceeding paragraph with this:

val myChannel = z.input("my YouTube Channel", "")
val myId = z.input("my ID", "")

val accounts_hocon = s"""
youtube.youtubeUsers = [
{ userId = "${myId}"},
{ userId = "${myChannel}"}

Which is going to give you a dynamic form in which a user can insert their own YouTube channel Id.

Screen Shot 2016-12-09 at 3.32.42 PM.png

You’ll note here I have put in both my User ID and channel Id, and the only difference is adJyExT6JcrA0kvKNHrXZw vs UCadJyExT6JcrA0kvKNHrXZw is the proceeding UC in the latter- denoting it is a channel. Further if you notice the code- you submit User ID and channel ID as userId=.... This may be helpful because as we will see the channel and user APIs give us slightly different information.

To find channel IDs just surf around YouTube and go to channel pages. You’ll see the address is something like (which is Astronomia Nova) and the UCGsMOj1pUwabpurXSKzfIfg part is the user ID.

Getting at that data

First we’re going to set up a ConfigFactory to load our config strings which we just did. This is going to create our YouTube configuration.

val reference = ConfigFactory.load()
val credentials = ConfigFactory.parseString(credentials_hocon)
val accounts = ConfigFactory.parseString(accounts_hocon)
val typesafe = accounts.withFallback(credentials).withFallback(reference).resolve()
val youtubeConfiguration = new ComponentConfigurator(classOf[YoutubeConfiguration]).detectConfiguration(typesafe, "youtube");

Next we will pull information about the Channel(s) we noted in our accounts_hocon config string. This may be all of the Apache Accounts or our personal one. We will create an ArrayBuffer to store all of the responses we get. This should be old hat by now- you’ll notice the very similar API as accessing the Facebook and Twitter APIs from previous posts.

import com.typesafe.config._
import org.apache.streams.config._
import org.apache.streams.core._
import java.util.Iterator

val youtubeChannelProvider = new YoutubeChannelProvider(youtubeConfiguration);

val channel_buf = scala.collection.mutable.ArrayBuffer.empty[Object]
while( youtubeChannelProvider.isRunning()) {
val resultSet = youtubeChannelProvider.readCurrent()
val iterator = resultSet.iterator();
while(iterator.hasNext()) {
val datum =;
channel_buf += datum.getDocument

The important thing to watch for, especially if using something other than the Apache userID set is that the output of the previous paragraph doesn’t end with something like: res45: Int = 0 because if that is the case- your buffer is empty and you will get errors when you try to make a table. If your buffer is empty, try using different user Ids.

Next we’ll get the user activity results.


val buf = scala.collection.mutable.ArrayBuffer.empty[Object]

val provider = new YoutubeUserActivityProvider(youtubeConfiguration);
while(provider.isRunning()) {
val resultSet = provider.readCurrent()
val iterator = resultSet.iterator();
while(iterator.hasNext()) {
val datum =;
buf += datum.getDocument

Basically the same idea here- same as last time, make sure that buffer is greater than 0!

Processing data into something we can play with

Our goal here is to get our data into a format we can play with. The easiest route to this end is to leverage Spark DataFrames / SQL Context and Zeppelin’s table display.

At the end of the last session on Apache Streams, we left as an exercise to the user a way to untangle all of the messy characters and line feeds. Now we provide the solution, a couple of User Defined Functions (UDFs) to strip all of that bad noise out.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.UserDefinedFunction
import java.util.regex.Pattern

val toLowerCase = udf {
(text: String) => text.toLowerCase

val removeLineBreaks = udf {
(text: String) =>
val regex = "[\\n\\r]"
val pattern = Pattern.compile(regex)
val matcher = pattern.matcher(text)

// Remove all matches, split at whitespace (repeated whitespace is allowed) then join again.
val cleanedText = matcher.replaceAll(" ").split("[ ]+").mkString(" ")


val removePunctuationAndSpecialChar = udf {
(text: String) =>
val regex = "[\\.\\,\\:\\-\\!\\?\\n\\t,\\%\\#\\*\\|\\=\\(\\)\\\"\\>\\ posts(s)
val YoutubeTypeConverter = new YoutubeTypeConverter()

val pages_datums = channel_buf.flatMap(x => YoutubeTypeConverter.process(new StreamsDatum(x)))

Now we create our DataFrame and register it as a temp table. Notice we use our “cleaner function” aka removePunctuationAndSpecialChar to clean column summary. This gives us a nice clean summary, while preserving the original actor.summary. If actor.summary doesn’t make any sense as a column name, see when we get to querying the table- it has to do with how spark queries the nested fields of a JSON.

import org.apache.streams.jackson.StreamsJacksonMapper;
import sqlContext._
import sqlContext.implicits._

val mapper = StreamsJacksonMapper.getInstance();
val pages_jsons = => mapper.writeValueAsString(o.getDocument))
val pagesRDD = sc.parallelize(pages_jsons)

val pagesDF =

val cleanDF = pagesDF.withColumn("summary", removePunctuationAndSpecialChar(pagesDF("actor.summary")))

In the line where we create page_json we do a map over the pages_datums we had created just previously (which was type scala.collection.mutable.ArrayBuffer[org.apache.streams.core.StreamsDatum] if you’re in to that sort of thing), changing each StreamsDatum into a JSON. We then paralellize that array, and do the standard Spark things to create a DataFrame from the RDD. Hocus-pocus and our channels have now been processed into a temp table called youtube_pages. The process is repeated for our users’ posts.

import org.apache.streams.core.StreamsDatum
import scala.collection.JavaConversions._
//Normalize activities -> posts(s)
val YoutubeTypeConverter = new YoutubeTypeConverter()

val useractivity_posts = buf.flatMap(x => YoutubeTypeConverter.process(new StreamsDatum(x)))

And then…

import org.apache.streams.jackson.StreamsJacksonMapper;
import sqlContext._
import sqlContext.implicits._

val mapper = StreamsJacksonMapper.getInstance();
val jsons = => mapper.writeValueAsString(o.getDocument))
val activitiesRDD = sc.parallelize(jsons)

val activitiesDF =

val cleanDF = activitiesDF.withColumn("content", removePunctuationAndSpecialChar(activitiesDF("content")))

Same idea, now we have a table called youtube_posts.

Let’s play.

At the end of the second paragraph for pages and posts above we called the .printSchema method on our DataFrame. This is helpful because it shows us … the schema of the DataFrame, which in a non-flat format such as JSON can be very useful. In general to traverse this schema we use field.subfield.subsubfield to query these things. Consider the following SQL statment:

, actor.displayName
, summary
, actor.extensions.followers
, actor.extensions.posts
, from youtube_pages


select id
, published
, actor.displayName
, content
, title from youtube_posts

Screen Shot 2016-12-09 at 4.19.19 PM.png

Here we see, I’m not such a popular guy on YouTube; 1 follower, 23 views, 1 post (a recent Apache Mahout Talk). My chance at being one of the great YouTube bloggers has passed me by, such is life.


The story arch of this series on Apache Streams has reached its zenith. If you have been following along, you’ll notice there isn’t really so much new information per Apache Streams anymore- the posts are more dedicated to ‘How to get your credentials for provider X’. I was thinking about doing one last one on Google+ since the credentialing is essentially the same, and you only need to change a couple of lines of code to say Google instead of YouTube.

In the dramatic conclusion to the Apache Streams mini-saga, we’re going to combine all of these together into a ‘quasi-app’ in which you collect all of your data from various providers and start merging it together using Spark SQL, visualizing with Angular, Zeppelin, and D3Js, and a little machine learning to see if we can’t learn a little something about our selves and the social world/bubble we (personally) live in.

Deep Magic, Volume 3: Eigenfaces

This week we’re going to really show off how easy it is to “roll our own” algorithms in Apache Mahout by looking at Eigenfaces. This algorithm is really easy and fun in Mahout because Mahout comes with a first class distributed stochastic singular value decomposition Mahout SSVD.

This is going to be a big job, and assumes you are have HDFS, and a small cluster to work with. You may be able to run this on your laptop, but I would recommend following the last post on setting up a small cloud-based hadoop cluster with Apache Zeppelin.

Eigenfaces are an image equivelent(ish) to eigenvectors if you recall your high school linear algebra classes. If you don’t recall: read wikipedia otherwise, it is a set of ‘faces’ that by a linear combination can be used to represent other faces.

Step 1. Get Data

The first thing we’re going to do is collect a set of 13,232 face images (250×250 pixels) from the Labeled Faces in the Wild data set.

Because some of these shell commands take a little while to run, it is worth taking a moment to set shell.command.timeout.millisecs in the sh interpreter to 600000.

My first paragraph, I’m going to make a directory for my final eigenface images that Zeppelin’s HTML loader can use. Then I’m going to download and untar the dataset. Finally, I’ll put the dataset into the tmp folder in HDFS.


mkdir zeppelin-0.7.0-SNAPSHOT/webapps/webapp/eigenfaces

tar -xzf lfw-deepfunneled.tgz
hdfs dfs -put /home/guest/lfw-deepfunneled /tmp/lfw-deepfunneled

Step 2. Add dependency JARs

We use the Scala package scrimage to do our image processing, so we’ll want to add those jars. Also, there is a bug we’re working on in Mahout where broadcasting vectors doesn’t work so well in shell / Zeppelin mode. To get around this, I have added the transformer I need in custom jar of algorithms I use.

MAHOUT-1892: Broadcasting Vectors in Mahout-Shell

If that issue is still open, you’ll need to clone and build my branch (or make your own jar).

git clone
cd mahout
git checkout mahout-1856
mvn clean package -DskipTests

Finally, assuming you are using Big Insights on Hadoop Cloud, you’ll need to copy the jar to the cluster.

scp algos/target/mahout-algos_2.10-0.12.3-SNAPSHOT.jar

Changing username to your username and to your server address.

Back in Zeppelin, load the dependencies:



// add EXPERIMENTAL mahout algos

Step 3. Setup the Mahout Context

This step imports the Mahout packages and sets up the Mahout Distributed Context


import org.apache.mahout.math._
import org.apache.mahout.math.scalabindings._
import org.apache.mahout.math.drm._
import org.apache.mahout.math.scalabindings.RLikeOps._
import org.apache.mahout.math.drm.RLikeDrmOps._
import org.apache.mahout.sparkbindings._

@transient implicit val sdc: org.apache.mahout.sparkbindings.SparkDistributedContext = sc2sdc(sc)

Step 4. Create a DRM of Vectorized Images

When doing image processing, we want a Matrix where each image is represented as a vector of numbers, where the number correspond to each pixel.


import com.sksamuel.scrimage._
import com.sksamuel.scrimage.filter.GrayscaleFilter

val imagesRDD:DrmRdd[Int] = sc.binaryFiles("/tmp/lfw-deepfunneled/*/*", 500)
       .map(o => new DenseVector( Image.apply(o._2.toArray)
       .map(p => p.toInt.toDouble / 10000000)) )
   .map(o => (o._2.toInt, o._1))

val imagesDRM = drmWrap(rdd= imagesRDD).par(min = 500).checkpoint()

println(s"Dataset: ${imagesDRM.nrow} images, ${imagesDRM.ncol} pixels per image")

sc.binaryFiles is the spark way to read in our images. We set out partitioning to 500, and map each image into a scrimage object which drops the color information (we don’t need it for feature mapping) and converts the image to an array of large integers. We scale the integers down, and use the array to create a Mahout DenseVector. Finally we zipWithIndex and then use drmWrap to transform our RDD into a DRM.

Step 5. Subtract Means

IF vector broadcasting in the shell were working (it may be by the time you read this), this is how we would subtract our means.


// How you would do this without help
// Doesn't work re BUG: MAHOUT-1892
val colMeansV = imagesDRM.colMeans
val bcastV = drmBroadcast(colMeansV)

val smImages = input.mapBlock(imagesDRM.ncol) {
case (keys, block) =>
      val copy: Matrix = block.cloned
      copy.foreach(row => row -= bcastV.value)
      (keys, copy)

We use the .colMeans method to get the colmeans, and then do a mapBlock on the images matrix, subtracting the mean from each image.

Since at the moment the above does not work, here is the hack to do it.


import org.apache.mahout.algos.transformer.SubtractMean

// Subtract Mean transforms each row by subtracting the column mean
val smTransformer = new SubtractMean() // calculuates the column mean
val smImages = smTransformer.transform(imagesDRM) // return new DRM of subtracted means


Again, this is only needed as a work around. If you’re interested in how easy it is to package your own functions into a jar- check out the SubtractMean source code

Step 6. Distributed Stochastic Singlar Value Decomposition

Based primarily on Nathan Halko’s dissertation most of the hard part here is done for us.


import org.apache.mahout.math._
import decompositions._
import drm._

val(drmU, drmV, s) = dssvd(smImages, k= 20, p= 15, q = 0)

Here k is rank of the output e.g. the number of eigenfaces we want out. p is oversampling parameter, and q is the number of additional power iterations. Read Nathan’s paper or see ssvd docs for more information.

The above will take some time- about 35 minutes on a 5-node Big Insights Cloud Cluster.

drmV will be contain our Eigenfaces (transposed).

drmU will tell us the composition of each face we fed into the algorithm. For example


drmU.collect(0 until 1, ::)


{0:-5.728272924185402E-4,1:0.005311020699576641,2:-0.009218182156949998,3:0.008125182744744356,4:0.001847134204087927,5:-0.01391318137456792,6:0.0038760898878500913,7:0.0022845256274037842,8:0.007046521884887152,9:0.004835772814429175,10:0.00338488781174816,11:0.009318311263249005,12:-0.005181665861179919,13:-0.004665157429436422,14:0.003072181956470255,15:-0.01285733757511248,16:0.005066140593688097,17:-0.016895601017982726,18:-0.012156252679821318,19:-0.008044144986630029 ... }

Which implies the first image = .005 * eigenFace1 + -.009 * eigenFace2 + …

Step 7. Write the Eigenfaces to disk

We want to SEE our eigenfaces! We use scrimage again to help us reconstruct our calculated eigenfaces back in to images.


import javax.imageio.ImageIO

val sampleImagePath = "/home/guest/lfw-deepfunneled/Aaron_Eckhart/Aaron_Eckhart_0001.jpg"
val sampleImage = File(sampleImagePath))  
val w = sampleImage.getWidth
val h = sampleImage.getHeight

val eigenFaces = drmV.t.collect(::,::)
val colMeans = smImages.colMeans

for (i <- 0 until 20){
    val v = (eigenFaces(i, ::) + colMeans) * 10000000
    val output = new Array[com.sksamuel.scrimage.Pixel](v.size)
    for (i <- 0 until v.size) {
        output(i) = Pixel(v.get(i).toInt)
    val image = Image(w, h, output)
    image.output(new File(s"/home/guest/zeppelin-0.7.0-SNAPSHOT/webapps/webapp/eigenfaces/${i}.png"))

First we load a sampleImage so we can get the height and width. I could have just opened it up in an Image viewer, but y’know, code. drmV has our eigenfaces as columns, so we transpose that. Also recall we subtracted the column means when we in-processed the images, so we’ll need to add that back.

We do a simple for loop over the eigenFaces, basically undoing the inprocessing we did. We then create an array of pixels, we do that by iterating through the vector. Finally we create the image with the width and height, and save it to the directory we set up. ZEPPELIN_HOME/webapps/webapp/ is the root directory for the %html interpreter, so we save our images in a directory there (we created this directory at the beginning).

Step 8. Display the Images

Scala is fun, but for simple string and list manipulation, I love Python


r = 4
c = 5
print '%html\n<table style="width:100%">' + "".join(["<tr>" + "".join([ '<td><img src="eigenfaces/%i.png"></td>' % (i + j) for j in range(0, c) ]) + "</tr>" for i in range(0, r * c, r +1 ) ]) + '</table>'

All that does is create an html table of our images.

Screenshot from 2016-11-10 16-26-02.png

And that’s it!

I hope you enjoyed this one as much as I did, please leave comments and let me know how it went for you.