TLDR: Streams are nothing more than a convenient way of handling callbacks, and they can cause a zombie outbreak if you are not careful, because callbacks ARE a type of reference to the creating object.
The Unnecessarily Long Introduction
Between RxJS clocking in at 15 million weekly (yes, seriously, weekly) downloads and RxDart exploding in popularity, it looks like streams, as a paradigm, are here to stay. But what the hell are streams? Are they a special language syntax? A magical data structures that just somehow “flow” data? Or, are they just a convenient way of handling async callbacks with a ton of functionality (and complexity) built on top of a very simple concept? Let’s take a look.
Initially, I started to learn about streams for a Bluetooth project I was writing in Flutter. In case you have been living under a rock, or simply don’t keep up with all the latest mobile development trends (Inconceivable!), Flutter is this really cool framework that Google “graciously” released a few years ago. In reality Google was tired of people releasing all the cool apps for iOS first, and they decided to make a cross platform toolkit so that at the very least, if they can’t get devs to code for Android first, they would get them to code for both platforms at the same time. Well, what can I say, it worked, they got me.
My current project is an app for a bluetooth enabled battery management system (BMS) for large scale lithium batteries. The app pulls data from the BMS about the state of the battery, and allows the end user to set settings on the BMS. Overall, it is a pretty simple 2–3 screen app with only complex bits residing in the BMS connection negotiation code. We needed to support iOS and Android out of the box, so this app was perfect for Flutter!
Flutter doesn’t support Bluetooth out of the box, but Buffalo Inc, the people who make portable hard drives and a bunch of other stuff, were gracious enough to release Flutter Blue, an open source bluetooth library. The library is based around RxDart, and handles everything via streams. So, I had to roll up my sleeves and finally figure out streams properly.
So, what are streams and why do you need them? Well, for that bit of info I’ll let you read or watch any number of explanations from countless sources on the internet. You see, you have to get properly confused and desperate before you’ll become inspired to continue reading this blog post ;) But, in short, streams are pitched as this magical alternative to callbacks. People analogize streams to data flows or to factory assembly lines. They are neither.
Streams are literally just another way to handle good old callbacks. It took me banging my head on my desk (literally), to understand that little bit of insight. Now I am going to try to convey that insight to you by reinventing the wheel and making our very own stream using nothing more magical than classes and callbacks. My hope is that by showing you what streams look like under the hood you will be able to think about them in a more accurate way, and would avoid the zombie object issues I had to deal with ZombieLand style.
(What, you don’t think your project has zombies? Have you checked? Read on, you may find that you have a zombie infestation as well, and you just can’t see it.)
I know JS is taking over the world, but I’ll take this moment to also introduce you to Dart. Dart is the language used to program Flutter apps in. It’s also a pretty cool little language in it’s own right, and it compiles to vanilla JS, as well as native code! It’s strongly typed, very strongly typed, not like that promiscuous TypeScript, and it has great out of the box support for Streams. If you are unfamiliar with Dart, just read the code examples below while omitting all of the type annotations and it will basically read like JS.
To run any of the code examples simply use DartPad and run it directly in the browser (remember, dart compiles to JS, after all).
Normal Dart Streams
So, first, let’s take a look at a very simple example of using the streams built right into Dart. Read through the code quickly and see if you can predict what the output will be.
Ok, so what do you think? Do you know what the output will look like? Let’s take a look at the output, and then we will go quickly through the code section by section. Here is the output from above:
Did you get it right? Are you a little surprised that the stream capped on counting even after we set
zombie = null? Like JS and many other garbage collected languages, Dart should have double tapped that
zombie after we nulled its one and only reference. But was that really the only reference to our
zombie? Or, perhaps, just maybe, another reference to our
zombie is created when it runs the
listen function? Is
listen just a really shitty analogy to what we are actually doing? All questions will be answered, but first let's look closely at the code.
Dart starts running all code from the
main function, just like C and many other languages. If you are more familiar with JS, just pretend that everything in
main is at the top of the file.
In the above section we instantiate the MyPrettyLittleZombie class into a
zombie object. (If this sentence is confusing, go and watch some YouTube videos on object orientated programming. It's boring, and I am not covering it.)
The next bit is simple, though, through the pure genius of Google engineers, it’s made in the most complex way possible. Basically 10 seconds from when the code runs I want it to do something. Think
setTimeout in JS.
The bit that I want it to do is to set the
zombie object to null, which should (theoretically) cause it to get garbage collected. But it doesn't, as you can see in the output. Why? That's what we aim to find out. Let's take a look at what our
zombie looks like.
zombie, as most zombies, is a pretty simple little creature, it only has 2 properties, one real function, and one safety measure (you know, just in case our garbage collector fails to double tap the bastard). At the top, we assign
ns to a new instance of
NormalStream is just an implementation of a stream using Dart's built in Stream classes. Upon instantiation,
NormalStream starts broadcasting an incrementing integer, pretty simple.
NormalStream we "subscribe" to updates from our instance of
NormalStream by using the
listenfunction for the stream. The analogy is that as soon as we hear the stream emit (send out) a new value, we will run some code on that value, in our case just print it to the console. Pretty simple. The
sub variable stores a reference to our subscription. It's like a receipt, you'll need it to unsubscribe.
Finally, we add a safety that will unsubscribe our
NormalStream updates after 15 seconds by using the
subobject we talked about above.
Pretty simple stuff, no? Now let’s look at our final bit of code, the
If you are familiar with streams, nothing here should be very surprising. Basically, we setup a
StreamController and a
Stream, and then add some logic to emit the incrementing integers once a second. The
StreamController handles all of the additional helper logic around the
To send a new value to the stream we run the
streamController.sink.add(counter) function with
counter as the integer we want to emit. Why is it called a
sink? Because developers have a penchant for having really terrible analogies for simple concepts. In any case, by adding a new value to the sink, that value will be emitted on the stream.
counterIncrement function calls itself over and over again. Yes, I know, it's tempting to sound smart by calling this recursion, but recursion it is not.
So that’s it! We looked at all the code. Do you now know why our
zombie refuses to die after we
null its reference? No? Ok, let's keep going. In the hopes of giving you the same "aha" moment I had earlier this month, let me now rebuild the same code with our own "Stream" class, instead.
Reinventing the Wheel, or rather, the Stream
To understand a bit better what’s happening under the hood of a Stream object let’s make our own, very simple, implementation. Yes, we will have Streams without any libraries or built in classes. In fact, you could easily implement the code below using vanilla JS (again, dart is compiled to vanilla JS). So, first, like before, let’s take a look at the code.
Because this post is already getting ridiculously long, I’ll skip copy and pasting the console output here, but take my word for it that it looks identical to the output from the normal stream implementation above. Or, if you are not the trusting type, just run it in DartPad.
The first bit of this code is essentially identical to what we looked at above. In the
MyPrettyLittleZombie class we use
ns.listen instead of
ns.stream.listen, but that's only because I was too lazy to split my Stream Implementation into 15 different classes. But generally, the
main function and
MyPrettyLittleZombie class are identical. The difference are in the
ReinventedStreamSubscription classes. So let's take a look at those. What do these magical streams actually look like under the hood?
First, let’s take a look at the
ReinventedStream class. I'll skip over periodic number emitting logic, as it's identical to the normal implementation above, and will instead concentrate on the "stream" bits.
Here we set up 2 properties. The
subCallBacks property is a Map that has an integer key index and stores a
void Function(int) callback. If we were making a proper generic Stream object we would have used a generic type, of course, but for this example a hard coded
int type will be sufficient.
The other property,
subCounter, is used to generate unique index values for the
subCallBacks map. (BTW, a Map is similar to named array, HashMap, or a Named List, in other languages. Every language, well, almost every language, has one, and every language has to invent a new name for the same damn concept.)
In this bit of code we define our
unlisten functions. Again, terrible names! They should be called
removeCallBack, but I am using the nomenclature of the magical streams here.
The listen function stores the call back passed to it from the outside, in our case, from our
zombie object, into the
subCallBacks map. The
unlisten function removes the callback at a given index. Again, pretty simple so far. The next bit is where the "magic" happens.
Yep, you made it! Now you should be having that “aha” moment I was talking about! This is the function that “sinks” or adds new values to the stream. Upon adding a new value it calls all of the callbacks one at a time, and sends to them the new value.
As you may know, callbacks run in the context they are created in, NOT the context they are called in. The latter would make them pretty useless in anything other than globally scoped programs. So, to properly run the callback that we set up inside the
zombie object, the instance of the
ReinventedStream has to maintain a reference to the
zombie object! And, of course, since we instantiated the
ReinventedStream inside the
zombie object, they are now cross linked, and will not get garbage collected until we delete the callback stored in the
subCallBacks map of the
ReinventedStream instance by unsubscribing from the stream.
As an aside, this is an implementation of a broadcast stream. A regular stream can only have one listener. A broadcast stream can have many listeners, but you can’t guarantee which listener’s callback will run first. You can see how you can easily use this code to create your own stream library with priority settings, where you can assign some callback to run first, and then have groups of callbacks. For instance, group A can run after the first callback, group B next, and so on. I can think of a few UI applications where that would be very useful, but I have digressed.
Finally, let’s just take a quick look at the
ReinventedStreamSubscription class, just for completeness, after all, we have figured out our object zombification issue now, no?
This class is instantiated by the
listen function of the
ReinventedStream class and returned to the subscriber so that the subscriber could cancel the subscription. It really should be called a subscription receipt. The code is pretty self-explanatory, the
cancel function reaches out to the instance of its creator and runs
unlisten on a given callback, as identified by its integer index.
Wow, you are still reading? Well, I am tired of typing, so let’s keep this short. Streams are awesome, but you MUST remember to unsubscribe from all the streams before disposing of an object or you will suffer from a zombie uprising of an unstoppable magnitude (or just a memory leak). Happy hunting!