Behind the Scenes of “RT’s HoRT IoT-A, An AIoT-P MVP Demo”: Part 2- The Streaming Engine

Welcome back from Part 1. On our last episode we did an introduction, spoke briefly on the overall structure of the program, discussed our demo data source, and did some foreshadowing of this week. This week we’ll do a bit of what we foreshadowed.

The Streaming Engine

Or How to Do Weird Stuff with Apache Flink

Let’s start off by looking at some pictures. Let me show you the DAG of the Flink Streaming Job to provide some motivation, and then we can go through piece by piece and look at the implementations.

Stream DAG

From the “architecture diagram” in Episode 1, this is what the Flink Job is doing.

  1. We pick up the data from MQTT
  2. We apply a sliding window doing some simple analytics on the data (Count, Min, Max, Average, Sum Squared Deviations, Standard Deviation)
  3. We join each record to the last analytics emitted from the sliding window
  4. We update the list of REST endpoints which serve the models every 30 seconds
  5. We send each Record + Last Emitted Analytics pair as a REST request to each model, and then sink the Record, Analytics, and Model Result (for each model) to Elasticsearch.

Pretty easy peasy.  If you’re not interested in Apache Flink, you can basically stop here and know/understand that Record from Divvy Bike Station + Analytics + Model Results are sunk (written) into Elasticsearch and that by continuously polling an endpoint that returns other active endpoints we can dynamically add/delete/update the models being ran on this stream of data.

Getting Data from MQTT

Initially I was using a copy of luckyyuyong’s flink-mqtt-connector in my implementation and required some hacks and updates.

The most important call out is that MQTT will disconnect brokers with the same clientID because it thinks they are old and stale, so make we have to make the clientID random. This was a particularly nasty and inconsistent bug to track down, but turns out many other have had this problem.  The solution here was just to add a string of milliseconds since the epoch.  Probably would need something more for production but this is an MVP.

String clientBase = String.format("a:%s:%s", Org_ID, App_Id).substring(0, 10);
String clientID = clientBase + System.currentTimeMillis();
mqttProperties.setProperty(MQTTSource.CLIENT_ID, clientID );

Aside from that little quirk, this all works basically like an Apache Kafka or any other connector.

Apply the Sliding Window

This was a bit of a trick because I didn’t want to hard code the data structure into the engine. I would like to eventually auto determine the schema of the json, but because of time constraints I set it up to be passed as a command line argument (but ended up hard coding what the CLI argument would be – see here).

This is important because we don’t want the window to be trying to compute analytics on text fields, and the data coming in form MQTT is all going to look like a string.

If you look at the code in ProcessWindowAnalyticsFunction you can see that function expects a schema to come in with the records, and in that schema any field that is listed as ‘Numeric’ we will attempt to compute analytics on.  Admittedly here, we are trading off performance for a single engine that will compute any data source.

Joining the Records and Last Reported Analytics

At this point, I had been doing A LOT of Java, which to be candid, I really don’t care for- so I switched over to Scala.  It’s really not a particularly interesting function. It simply joins all records with the last reported analytics from the sliding window.  You’re free to check it out here and make comments if you have questions. I realize there’s a lot of magic-hand-waiving and me telling the reader to “go look for yourself”,  for this blog / tutorial, but I am assuming you are fairly familiar with all of the tools I am using and I’m trying to give the reader a high level view of how this all fits together. If you have specific questions, please ask in the comments or email me (you can find me around the interwebs pretty easily).

The “Other” Stream

Now let’s shift and consider our “other” stream. The stream that simply polls an endpoint which serves us other endpoints every thirty seconds.  This is accomplished effectively by doing a time windowed stream on all of the events coming from the records source- throwing them all away, and then once every 30 seconds (but you could configure that), sending an Asyncy REST request to a preordained URL that holds the REST endpoints for the models.

You can see this in my github lines approx 140-159.  The endpoint that this system is hitting is being served in Apache OpenWhisk (which I absolutely love if you haven’t been able to gleam from my other blog posts, it’s like AWS / Google Cloud Functions except not proprietary vendor lock-in garbage).

You can see the response this gives here.  Obviously, a “next step” would be for this to hit some sort of database where you could add/delete entries. (If you’re to lazy to click, it basically just returns a json of { endpoints: [ { modelOneName: “http://the-url”}, …]}

Merging it All Together and Making Async Calls

Now we  bring everything together.  From one stream we have Device Event Records and analytics on those records, from the other we have a list of URLs which will serve models.  Now- it’s worth pointing out here, that while not implemented in a “real” version an easy add would be to have another field in the model name that specifies what devices it applies to- since that model is expecting certain input fields and different devices will have different fields. Again- a MINIMUM viable product is presented.

The rest is pretty simple conceptually- for each record/analytics item- it goes through the list of all applicable URLs (in this case all of the URLs), and pings each with the record and analytics as the payload. The code is here and may be more illuminating.  The magic happens in the main program right here.

The Async ping models is nice because as different requests come back at different speeds, they don’t hold up the rest of the stream.  A bug/feature can be introduced though if you don’t want the entire program to go flying off the rails if there is a single REST request.  To do that you must set the “timeout” of the Async function, my choice was to “ignore” but you could in theory re-request, allow up to X fails in a Y time, etc.

Conclusion

I want to state one more time- that this was a lot of waive-my-hands magic, and “go look for yourself”dom.  I probably could have made a 5 part blog just out of this post- but 1. I’m trying to write a book on something else already, and 2. the point of this blog series is an over view of how I built a “full stack” IoT Analytics solution from scratch part time in a couple of weeks.

Next Time

We’ll follow our hero into his mis-adventures in React, especially with the Dread Design System: Carbon.

See you, Space Cowboy.

Behind the Scenes of “Rawkintrevo’s House of Real Time IoT Analytics, An AIoT platform MVP Demo”

Woo, that’s a title- amirigh!?

It’s got everything- buzzwords, a corresponding YouTube video, a Twitter handle conjugated as a proper noun.

Introduction

Just go watch the video– I’m not trying to push traffic to YouTube, but it’s a sort of complicated thing and I don’t do a horrible job of explaining it in the video. You know what, I’m just gonna put it in line.

Ok, so Now you’ve see that.  And you’re wondering? How in the heck?!  Well good news- because you’ve stumbled to the behind the scenes portion where I explain how the magic happened.

There’s a lot of magic going on in there, and some you probably already know and some you’ve got no idea. But this is the story of my journey to becoming a full stack programmer.  As it is said in the Tao of Programming:

There once was a Master Programmer who wrote unstructured programs. A novice programmer, seeking to imitate him, also began to write unstructured programs. When the novice asked the Master to evaluate his progress, the Master criticized him for writing unstructured programs, saying, “What is appropriate for the Master is not appropriate for the novice. You must understand Tao before transcending structure.”

I’m not sure if I’m the Master or the Novice- but this program is definitely unstructured AF. So here is a companion guide that maybe you can learn a thing or two / Fork my repo and tell your boss you did all of this yourself.

Table of Contents

Here’s my rough outline of how I’m going to proceed through the various silliness of this project and the code contained in my github repo .

  1. YOU ARE HERE. A sarcastic introduction, including my dataset, WatsonIoT Platform (MQTT). Also we’ll talk about our data source- and how we shimmed it to push into MQTT, but obviously could (should?) do the same thing with Apache Kafka (instead). I’ll also introduce the chart- we might use that as a map as we move along.
  2. In the second post, I’ll talk about my Apache Flink streaming engine- how it picks up a list of REST endpoints and then hits each one of them.  In the comments of this section you will find people telling me why my way was wrong and what I should have done instead.
  3. In this post I’ll talk about my meandering adventures with React.js, and how little I like the Carbon Design System. In my hack-a-thon submission,  I just iFramed up the Flink WebUI and Kibana, but here’s where I would talk about all the cool things I would have made if I had more time / Carbon-React was a usable system.
  4. In the last post I’ll push this all on IBM’s K8s. I work for IBM, and this was a work thing. I don’t have enough experience on any one else’s K8s (aside from microK8s which doesn’t really count) to bad mouth IBM. They do pay me to tell people I work there, so anything to rude in the comments about them will most likely get moderated out. F.u.

Data Source

See README.md and scroll down to Data Source. I’m happy with that description.

As the program is currently, right about here the schema is passed as a string. My plan was to make that an argument so you could submit jobs from the UI.  Suffice to say, if you have some other interesting data source- either update that to be a command line parameter (PRs are accepted) or just change the string to match your data.  I was also going to do something with Schema inference, but my Scala is rusty and I never was great at Java, and tick-tock.

Watson IoT Platform

I work for IBM, specifically Watson IoT, so I can’t say anything bad about WatsonIoT.  It is basically based on MQTT, which is a pub-sub thing IBM wrote in 1999 (which was before Kafka by about 10 years, to be fair).

If you want to see my hack to push data from the Divvy API into Watson IoT Platform, you can see it here. You will probably notice a couple of oddities.  Most notably, that only 3 stations are picked up to transmit data.  This is because the Free account gets shut down after 200MB of data and you have to upgrade to a $2500/mo plan bc IBM doesn’t really understand linear scaling. /shrug. Obviously this could be easily hacked to just use Kafka and update the Flink Source here.

The Architecture Chart

That’s also in the github, so I’m going to say just look at it on README.md.

Coming Up Next:

Well, this was just about the easiest blog post I’ve ever written.  Up next, I may do some real work and get to talking about my Flink program which picks up a list of API endpoints every 30 seconds, does some sliding window analytics, and then sends each record and the most recent analytics to each of the end points that were picked up, and how in its way- this gives us dynamic model calling. Also- I’ll talk about the other cool things that could/should be done there that I just didn’t get to. /shrug.

See you, Space Cowboy.

 

 

 

Building a License Plate Recognizer For Bike Lane Uprising

UPDATE 6/20/2019: Christina of Bike Lane Uprising doesn’t work in software sales, and therefor is terrified of “forward-selling”, and wants to make sure its perfectly clear that automatic plate recognition, while in the road map, will not be implemented for some time to come.

I was recently at the Apache Roadshow Chicago where I met the founder of Bike Lane Uprising, a project whose goal is to make bicycling more safe by reducing the occurrence of bike lane obstruction.

I actually met Christina when I was running for Alderman last fall (which is why I’ve been radio-silent on the blogging for the last 8 months- before you go Google it, I’ll let you know I dropped out of the race).  As a biker, I identified with what she was doing.  Chicago is one of the bike-y-est cities in North America, but it can still be a very… exciting… adventure commuting via bike.  It also seemed like there was probably something I could do to help out.   I promised after the election was done I’d be in touch.

Of course, I forgot.

Actually, I didn’t forget- I dropped out of the election because I got a book deal with O’Reilly Media to write on Kubeflow AND because I had already committed to producing the Apache Roadshow Chicago.  So I “punted” and promised to help out after the Roadshow.

Christina was still nice enough to come out and speak at the roadshow, and was very well received.  We were talking there, and I, remembering my oath to help out, finally got a hold of her a week or two later.

Her plan, was to do license plate recognition.  The current model of Bike Lane Uprising uses user submitted photos, which are manually tagged and entered into a database with things like: License Plate Number, Company, City/State Vehicles, etc.  She had found an open source tool called OpenALPR and wondered if BLU could use it somehow.

Now obviously this is going to have to be served somewhere, and if you hadn’t heard, I have fallen deeply in love with Apache OpenWhisk-incubating over the last 18 months.  And I’m not saying that with my IBM hat on, I genuinely think it is an amazing and horribly underrated product. Also it’s really cheap to run (note- I am running it as IBM Cloud Functions, which is just a very thin veil over OpenWhisk).

OK so OpenALPR has a Python API.  Good news and bad news- good news because this project will take 20 minutes and I’ll be done, bad news because it’s too quick and easy to make a blog post out of.  Considering you’re reading the blog post- obviously that didn’t work.  If you look at OpenALPR, you’ll see its been over a year since any work has gone on with it. It’s basically a derelict, but a functional one…ish.  The Python is broken- some people said they could make the Python work if they built from scratch- I could not. Gonna have to CLI this one.

As a spoiler- here’s the code.

Well, that’s exciting because I’ve never built a Docker function before (only Python, and some pretty abusive python tricks that use os.system(... to build the environment at run time…

For this trick however, we do a multi stage build.  This is because the Alpine Linux repos are unstable as hell, and if you want to version lock something you basically have to build from source.

OpenALPR depends on OpenCV and Tesseract. OpenWhisk expects its Docker functions to start off from the dockerSkelaton image.**  So if you go into the docker/ directory- I’d like to first direct your attention to opencv which uses openwhisk/dockerSkelaton for a base image and builds OpenCV. Was kind of a trick. Not horrible.  Then, we have the tesseract folder which builds an image using rawkintrevo/opencv-whisk as a base. Finally, openalpr/ which builds an image using rawkintrevo/tesseract-whiskas a base.  Now we have an (extremely overweight bc I was lazy about my liposuction) environment with openalpr installed.  Great.

Finally, let me direct your attention to where the magic happens. plate-recog-server-whisk/ has a number of interesting files. First there is an executable called exec.  A silly little bash file with only one job- to call

python3 /action/openalpr-wrapper.py ${1}

You see- the dockerSkelaton has a bunch of plumbing, but OpenWhisk expects there to be a file /action/exec that it will execute and the last line of stdout from that executable to be a json (which OpenWhisk will return).

So lets look at the code of openalpr-wrapper.py elegant in it’s simplicity.  This is a program that takes a single command line arg, a json (that’s how OpenWhisk passes in parameters), that json may have two keys, a required image url, and an optional top n license plates.  subprocess.call( calls alpr with the image and the top plates, and prints the response (in json from, which is what the -j flag is for).  And that’s it. So simple, so elegant.

I’m trying to follow this design pattern more and more as I get older- their usually exists some open source package that does what I want, and I just need a little python glue code up top.  In this case- I wanted

  • License Plate Recognition
  • As-a-Service (e.g. done via API call and scalable).

OpenALPR + Apache OpenWhisk-incubating.

 

I’m hoping to write more now that life has slowed down a bit. Stop back by soon.

** UPDATE 2: The Apache OpenWhisk folks wanted me to provide some clarity around this statement.  Specifically they said about this sentence, that it:

 isn’t quite right. You can use any image as your base as long as you implement the lifecycle/protocol. What you get from the skeleton is a built in solution. You could have started with our python image or otherwise.

 

Photo Credit: Ash Kyd