Grokking Streams and Killing Zombies

TLDR: Streams are nothing more than a convenient way of handling callbacks, and they can cause a zombie outbreak if you are not careful, because callbacks ARE a type of reference to the creating object.

The Unnecessarily Long Introduction

Between RxJS clocking in at 15 million weekly (yes, seriously, weekly) downloads and RxDart exploding in popularity, it looks like streams, as a paradigm, are here to stay. But what the hell are streams? Are they a special language syntax? A magical data structures that just somehow “flow” data? Or, are they just a convenient way of handling async callbacks with a ton of functionality (and complexity) built on top of a very simple concept? Let’s take a look.

Initially, I started to learn about streams for a Bluetooth project I was writing in Flutter. In case you have been living under a rock, or simply don’t keep up with all the latest mobile development trends (Inconceivable!), Flutter is this really cool framework that Google “graciously” released a few years ago. In reality Google was tired of people releasing all the cool apps for iOS first, and they decided to make a cross platform toolkit so that at the very least, if they can’t get devs to code for Android first, they would get them to code for both platforms at the same time. Well, what can I say, it worked, they got me.

My current project is an app for a bluetooth enabled battery management system (BMS) for large scale lithium batteries. The app pulls data from the BMS about the state of the battery, and allows the end user to set settings on the BMS. Overall, it is a pretty simple 2–3 screen app with only complex bits residing in the BMS connection negotiation code. We needed to support iOS and Android out of the box, so this app was perfect for Flutter!

Flutter doesn’t support Bluetooth out of the box, but Buffalo Inc, the people who make portable hard drives and a bunch of other stuff, were gracious enough to release Flutter Blue, an open source bluetooth library. The library is based around RxDart, and handles everything via streams. So, I had to roll up my sleeves and finally figure out streams properly.

Finally, Streams!!!

So, what are streams and why do you need them? Well, for that bit of info I’ll let you read or watch any number of explanations from countless sources on the internet. You see, you have to get properly confused and desperate before you’ll become inspired to continue reading this blog post ;) But, in short, streams are pitched as this magical alternative to callbacks. People analogize streams to data flows or to factory assembly lines. They are neither.

Streams are literally just another way to handle good old callbacks. It took me banging my head on my desk (literally), to understand that little bit of insight. Now I am going to try to convey that insight to you by reinventing the wheel and making our very own stream using nothing more magical than classes and callbacks. My hope is that by showing you what streams look like under the hood you will be able to think about them in a more accurate way, and would avoid the zombie object issues I had to deal with ZombieLand style.

(What, you don’t think your project has zombies? Have you checked? Read on, you may find that you have a zombie infestation as well, and you just can’t see it.)

I know JS is taking over the world, but I’ll take this moment to also introduce you to Dart. Dart is the language used to program Flutter apps in. It’s also a pretty cool little language in it’s own right, and it compiles to vanilla JS, as well as native code! It’s strongly typed, very strongly typed, not like that promiscuous TypeScript, and it has great out of the box support for Streams. If you are unfamiliar with Dart, just read the code examples below while omitting all of the type annotations and it will basically read like JS.

To run any of the code examples simply use DartPad and run it directly in the browser (remember, dart compiles to JS, after all).

Normal Dart Streams

So, first, let’s take a look at a very simple example of using the streams built right into Dart. Read through the code quickly and see if you can predict what the output will be.

Ok, so what do you think? Do you know what the output will look like? Let’s take a look at the output, and then we will go quickly through the code section by section. Here is the output from above:

Did you get it right? Are you a little surprised that the stream capped on counting even after we set zombie = null? Like JS and many other garbage collected languages, Dart should have double tapped that zombie after we nulled its one and only reference. But was that really the only reference to our zombie? Or, perhaps, just maybe, another reference to our zombie is created when it runs the listen function? Is listen just a really shitty analogy to what we are actually doing? All questions will be answered, but first let's look closely at the code.

Dart starts running all code from the main function, just like C and many other languages. If you are more familiar with JS, just pretend that everything in main is at the top of the file.

In the above section we instantiate the MyPrettyLittleZombie class into a zombie object. (If this sentence is confusing, go and watch some YouTube videos on object orientated programming. It's boring, and I am not covering it.)

The next bit is simple, though, through the pure genius of Google engineers, it’s made in the most complex way possible. Basically 10 seconds from when the code runs I want it to do something. Think setTimeout in JS.

The bit that I want it to do is to set the zombie object to null, which should (theoretically) cause it to get garbage collected. But it doesn't, as you can see in the output. Why? That's what we aim to find out. Let's take a look at what our zombie looks like.

Our zombie, as most zombies, is a pretty simple little creature, it only has 2 properties, one real function, and one safety measure (you know, just in case our garbage collector fails to double tap the bastard). At the top, we assign ns to a new instance of NormalStream. NormalStream is just an implementation of a stream using Dart's built in Stream classes. Upon instantiation, NormalStream starts broadcasting an incrementing integer, pretty simple.

After instantiating NormalStream we "subscribe" to updates from our instance of NormalStream by using the listenfunction for the stream. The analogy is that as soon as we hear the stream emit (send out) a new value, we will run some code on that value, in our case just print it to the console. Pretty simple. The sub variable stores a reference to our subscription. It's like a receipt, you'll need it to unsubscribe.

Finally, we add a safety that will unsubscribe our zombie from NormalStream updates after 15 seconds by using the subobject we talked about above.

Pretty simple stuff, no? Now let’s look at our final bit of code, the NormalStream class.

If you are familiar with streams, nothing here should be very surprising. Basically, we setup a StreamController and a Stream, and then add some logic to emit the incrementing integers once a second. The StreamController handles all of the additional helper logic around the Stream object.

To send a new value to the stream we run the streamController.sink.add(counter) function with counter as the integer we want to emit. Why is it called a sink? Because developers have a penchant for having really terrible analogies for simple concepts. In any case, by adding a new value to the sink, that value will be emitted on the stream.

Finally, the counterIncrement function calls itself over and over again. Yes, I know, it's tempting to sound smart by calling this recursion, but recursion it is not.

So that’s it! We looked at all the code. Do you now know why our zombie refuses to die after we null its reference? No? Ok, let's keep going. In the hopes of giving you the same "aha" moment I had earlier this month, let me now rebuild the same code with our own "Stream" class, instead.

Reinventing the Wheel, or rather, the Stream

To understand a bit better what’s happening under the hood of a Stream object let’s make our own, very simple, implementation. Yes, we will have Streams without any libraries or built in classes. In fact, you could easily implement the code below using vanilla JS (again, dart is compiled to vanilla JS). So, first, like before, let’s take a look at the code.

Because this post is already getting ridiculously long, I’ll skip copy and pasting the console output here, but take my word for it that it looks identical to the output from the normal stream implementation above. Or, if you are not the trusting type, just run it in DartPad.

The first bit of this code is essentially identical to what we looked at above. In the MyPrettyLittleZombie class we use ns.listen instead of ns.stream.listen, but that's only because I was too lazy to split my Stream Implementation into 15 different classes. But generally, the main function and MyPrettyLittleZombie class are identical. The difference are in the ReinventedStream and ReinventedStreamSubscription classes. So let's take a look at those. What do these magical streams actually look like under the hood?

First, let’s take a look at the ReinventedStream class. I'll skip over periodic number emitting logic, as it's identical to the normal implementation above, and will instead concentrate on the "stream" bits.

Here we set up 2 properties. The subCallBacks property is a Map that has an integer key index and stores a void Function(int) callback. If we were making a proper generic Stream object we would have used a generic type, of course, but for this example a hard coded int type will be sufficient.

The other property, subCounter, is used to generate unique index values for the subCallBacks map. (BTW, a Map is similar to named array, HashMap, or a Named List, in other languages. Every language, well, almost every language, has one, and every language has to invent a new name for the same damn concept.)

In this bit of code we define our listen and unlisten functions. Again, terrible names! They should be called addCallBackand removeCallBack, but I am using the nomenclature of the magical streams here.

The listen function stores the call back passed to it from the outside, in our case, from our zombie object, into the subCallBacks map. The unlisten function removes the callback at a given index. Again, pretty simple so far. The next bit is where the "magic" happens.

Yep, you made it! Now you should be having that “aha” moment I was talking about! This is the function that “sinks” or adds new values to the stream. Upon adding a new value it calls all of the callbacks one at a time, and sends to them the new value.

As you may know, callbacks run in the context they are created in, NOT the context they are called in. The latter would make them pretty useless in anything other than globally scoped programs. So, to properly run the callback that we set up inside the zombie object, the instance of the ReinventedStream has to maintain a reference to the zombie object! And, of course, since we instantiated the ReinventedStream inside the zombie object, they are now cross linked, and will not get garbage collected until we delete the callback stored in the subCallBacks map of the ReinventedStream instance by unsubscribing from the stream.

As an aside, this is an implementation of a broadcast stream. A regular stream can only have one listener. A broadcast stream can have many listeners, but you can’t guarantee which listener’s callback will run first. You can see how you can easily use this code to create your own stream library with priority settings, where you can assign some callback to run first, and then have groups of callbacks. For instance, group A can run after the first callback, group B next, and so on. I can think of a few UI applications where that would be very useful, but I have digressed.

Finally, let’s just take a quick look at the ReinventedStreamSubscription class, just for completeness, after all, we have figured out our object zombification issue now, no?

This class is instantiated by the listen function of the ReinventedStream class and returned to the subscriber so that the subscriber could cancel the subscription. It really should be called a subscription receipt. The code is pretty self-explanatory, the cancel function reaches out to the instance of its creator and runs unlisten on a given callback, as identified by its integer index.

Conclusion

Wow, you are still reading? Well, I am tired of typing, so let’s keep this short. Streams are awesome, but you MUST remember to unsubscribe from all the streams before disposing of an object or you will suffer from a zombie uprising of an unstoppable magnitude (or just a memory leak). Happy hunting!

I am a prolific creator, curious hacker, dyslexic writer, licensed and yet apathetic attorney, but above all else, I am human, and these are my ideas.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store