Sessions is temporarily moving to YouTube, check out all our new videos here.

Stream Processing With Coroutines in OCaml

Rizo Isrof speaking at London Functional Programmers in June, 2017
Great talks, fired to your inbox 👌
No junk, no spam, just great talks. Unsubscribe any time.

About this talk

Coroutines are a powerful control flow mechanism for cooperative multitasking. See how they compare to functions, how they work, and finally, implement a small library in OCaml, to work with coroutines.


Yes, I'm gonna talk about coroutines, and we are gonna implement a small library in OCaml to work with couroutines. Before we actually start, how many of you know what are coroutinues? Okay, that's good. Have you ever used them extensively? Cool. So Donald Knuth says that subroutines, and by that he means functions, are special cases of coroutines. Is it helpful? Is it useful to understand what are coroutines? I mean, functions, coroutines do what our functions do, that is clear. But what makes coroutines different from functions, and what makes coroutines different from other obstructions, like processes, for example, or threads? So I'm gonna talk about how functions compare to coroutines, and I'm gonna explain what you already know, which is how function works, how functions works. And functions usually have this hierarchy relation, where there's a caller and there's a callee. And in this case, we see that F is calling G, and G does some work, and then G terminates, returns something, potentially, and goes back to F. So in this case, it's really clear that F is in charge of G, essentially. And whenever F decides to call G again, the whole process starts all over again, and there's no connection between the previous execution and the next one. But coroutines work really differently. Coroutines allow functions to restore the state. And in this case, F is calling G, but maybe G was running before, and maybe it has some previous stored state that is can recover and use for the next execution. So in this case, G just continues some of its previous work, and then yields back the control to F, and F does exactly the same thing, it preforms another computation, and yields back control to G. So one immediate thing that we can conclude here is that no one is in charge, and I think that's clear in this diagram. F and G, in case of coroutines, coexist at the same level, no one is in charge, and at the same time, everyone is in charge. So that's the main difference between functions and coroutines. Now, if we compare coroutines that are all listed here with other units of work, and computation abstractions such as processes, threads, green threads, coroutines, maybe, we can see that they all have very different properties, and most of them are maybe operating system managed, maybe runtime managed, some of them allocate a lot of memory just to start their work. So process, for example, have specific dedicated stacks, threads can have multiple stacks, one per each thread, some of them use preemptive scheduling, which essentially means that they are controlled by some runtime, or maybe operating system, and the operating system or the runtime decides who is running, and when it's the time to stop. And in case of coroutines or coroutines, they are the one in charge. So it really means that whenever coroutine is running, it will only stop when it decides to. So that's the main difference. And to describe coroutines here in a more general sense, they are essentially suspendible computations that can persist on state and do cooperative scheduling. So they work together, and are not managed by something. Most of you are probably familiar with the notion of generators that is present in many languages, such as Python, or even JavaScript, I think, now has generators. And generators are a submodel of coroutines. In this case, we define two different generators. The syntax for the definition is almost identical to the functions, which means that they share a lot with functions, as Donald Knuth mentioned in his quote, but the way they behave is quite different. So we don't see any return statements, we only see yield, and yield means that the control is passed back to the caller. So whenever we call count, it will do the initial computation and then yields a result, in this case, it will be the initial state of n. Whenever we call count again, asking for the next value, it will keep reducing the incremental values of n. So, again, coroutines are functions that can suspend, that can store some state, and they work together, they are not managed by any runtimes. Now, another interesting example of coroutines is pipes and their shell implementation. Commands like cat, grep, awk, they all can be seen as coroutines, because when we compose them they define this huge pipeline, and each member of this pipeline can be seen as a single coroutine that essentially works with other commands in the pipeline. At any point in time, we only have one running command, and whenever it's done it passes control to the following one, or maybe the previous one. And to study the behaviour of pipes, let's try to understand how they work, what kind of properties they have. And the main one is, maybe waiting for some input. When we call something like head, we are expecting it to have some input, and, of course, pipes have to be prepared to get that input. They can also produce output, of course, and that output can be used as an input for another coroutine. At any point of time, they can just decide to stop, and say, "I'm done. "I'm ready." Or maybe there's some failure, and they just terminate it. So we have like these three values that interact with the external world in pipes. Of course, one pipe by itself is not useful, so we compose different pipes, we take one pipe, we have one pipe producing a value of type-A, for example, and another pipe consumes that as an input, and the same happens for the following pipe, defining this huge and complex pipeline. And again, at any point of time, pipe1 can decide just to terminate and stop the entire execution of the pipeline. Usually when we speak about pipelines, we speak about upstream and downstream pipes. Upstream pipes are usually the prettiest in pipes, and downstream pipes are the one consuming the input. So let's try and define the pipe, the type for the pipes. How would we represent this type in a programming language? And I think it's really interesting how the language that I'm using really helps me to define these models, as was discussed in the previous presentation, because I'm using OCaml, which is like a brother to F#, and the definition is really simple. So in this case we can that the producing part is defined by a yield constructor that produces a string, and a continuation pipe that is the next state of this particular pipe, or maybe it's waiting for something, maybe it needs a string to produce the next state. And finally, of course, it can be just ready or done, and the result is there for us to get it. Now, in this previous example, I just used strings and ints, but usually, of course, we want to work with generic types. And in OCaml we define generic types by providing these arguments: A, B and R, to the main type, the concrete type. In the first constructor, yield, it produces a pair of Bs, and the following pipe and the second one is a function that expects an A, and will produce the new pipe. Now, the real implementation would be careful about things like infinite streams, for example, and that's why we need to be careful about the way we allocate the structures, and for that we usually use the lazy type. And in this presentation, I'm gonna ignore that completely just for simplicity, and implement the strict version. Based on our core type, we can implement a few other types, such producers, consumers, and pipelines. Producers are pipes that essentially just yield things, consumers only await, and pipelines are happy, because they don't need anything as an input, and they are not producing anything as an output, and we can just get the result value from there. Now let's talk about monads or sequencing operators. We can combine multiple pipes together by sequencing them, or in this case, we can obstruct a value from a pipe with a bind operator, and apply a function to its result, and these functions will be useful for us to define the rest of the library. We can also define a few helper functions such as empty, yield, and await, that combined with the monad operations, allow us to write things like these examples. So we can yield multiple values, we can await for some value and then increment it, or we can wait multiple times, get two items, and then produce their sum. I'm sure you remember this example from Python. With what I just defined in two slides, we already can implement the equivalent in OCaml. It's not using any language constructs at all, it's just functions that we just defined. So yield is a regular function, the composition operator is also defined by us. In this case, we are using recursion instead of explicit multiple state, but everything else is exactly the same. And of course, the type for these two function is producer, because they produce values. We can also implement a few other producers, things like list producers. So the first function takes a list and will yield all the elements from that list, returning empty when the list is empty too. The file producer is also very interesting because it opens the channel, it yields all the line, and once the file is empty, it actually closes the channel. Now, how do we consume things? We defined a few producers. We are able to put stuff into this pipe thing, into pipelines, but how do we consume things? For that we can implement the next function that we saw in the Python example. And next essentially part of matches on the structure of pipe and decides what to do. If it's ready, well, then we don't have anything to produce. If it's yielding something, we can drop that thing into an option type and also return the intermediate state, which will be useful in the following examples. And if our pipe is waiting, there's nothing to get out of it, so we just fail. Using that function we can implement something as simple as length to get the number of items that our pipe can yield. Another set of interesting consumers is fold and collect. Fold cumulatively processes the pipe, getting out the values, applies some reducing function to the accumulator that was passed initially, and this way produces the final result. Collect, in this case, can be implemented in fold, for example, we are getting elements out of the pipe, and constructing this syntax here is, stands for cons, we are constructing a list when we get a new element. And, of course, we can implement it just with fold by doing that. Now, we can also apply some transformations and filters. We have producers producing values, consumers that are expecting some values to produce a final result, but in between we may want to apply some transformations and filtering. Map, for example, can be implemented with just a simple and well-defined logic, defined by this part here, where it awaits for something, it gets that value, and applies the function before yielding. Filter is a bit more complex, but it's essentially the same. Take is quite different from filter and map, because it can terminate the entire pipeline by saying empty. Once the n reaches zero, it's enough for it to stop. We have producers, we have consumers, we have transformers, but how do we actually connect these things together? How do we run them? And for that we need the composition operator. This function here defines the composition of two different pipes, and it works by inspecting their structure, it works by checking what's inside what's the current state, and deciding what will happen next. In this case, we have the upstream and the downstream pipes, so it really means that we are slicing into our pipeline and trying to decide how the specific pair of pipes will combined together to produce a new one. If our downstream pipe is ready, it really means that there's nothing that we can do, so we just return ready. If our downstream pipe is yielding values, it means that there must some consumer that is expecting that value, so we just keep yielding, and compose with the result of that. If our downstream pipe is awaiting for some input, and our upstream is producing some output, then it's a perfect match, we just pass the output value from the previous pipe to the next one, and compose the result, again, closing the loop, and producing just one single pipe. Then if our upstream pipe is awaiting for something, it means that the entire pipeline is blocked. We can't do anything. We can't satisfy the consumers if the pipeline is blocked by some upstream pipe, so we just keep awaiting, and compose the result, and once we get the input value for the upstream pipe, the computation will continue. And finally, the upstream pipe is ready, it also means that we are ready. There's literally nothing that we can do. We define a few helper operators to work with these models. We can implement a fairly well-known shell command called cat, which is just an identity pipe, which does nothing at all. And also, the interesting property of these things, and this particular compose function, is that it satisfies some laws. So we can use cat as a zero value. We can see the composition as a sum, for example, and cat is our zero for sum. So the order in which we apply this zero shouldn't matter for the computation, of course. And also, the pipe operator, the compose operator is associative, which means the order in which we combine multiple pipes also doesn't matter. And this is helpful to reason about the programmes and the systems that we build. Now, to give you a bit more practical example for what we can do with this compose function, the transformers, producers, and consumers, I will implement a small example that parses log files. In this slide, you can see the log file from a web server that is used by NASA. And let's assume that we want to compute something as the total byte that are sent to a particular host for a successful response until a given date. So in this log file, we are interested in this particular last value, that is the number of bytes that were sent to that host. Here's the status code, and the date is the 4th field. So using what we just defined and a few other function, such as take_while, and the centre of library, we can implement this programme that is actually runnable, we can just copy-paste this code and run it. We see that we opened the file, we transformed the rows into lists of strings, we've checked the rows that we need to get the host, we checked for the status code, we also made sure we only take_while the date is the one that we need, we transformed the rows into integers, and then finally, just summed the result. So this is the complete implementation for this problem. And again, we can just compile it and run it, and it will be fairly efficient. And yeah, that's it. I showed you the implementation of a very simple cooperative concurrency model in OCaml. It's simple, it's extensible, it's quite efficient, it's implemented with just first-class citizens, just like functions and types. There's nothing special in it. We don't need any special treatment in building implementations in the language. It's completely backend agnostic. We can use sockets, files, database handlers. It supports partial finalisation of resources. In some cases, it's able to close the file handlers and database handlers. But it's also possible to implement even safer obstructions on top of it. That's it. - [Student] What's the memory consumption? - The memory consumption, the question is, "What's the memory consumption for this implementation?" It's linear by definition. At any point in time, we only have the values allocated in this function, everything else is defined by the pipes themselves. So if there's a misbehaved pipe that allocates whatever it gets, then, yes, we will have some problematic cases. But it's really up to the pipes themselves to decide what kind of allocations they want to do. The allocation needed by these models are only used for the composition. More questions? Yes, please. - [Student] I quite like writing paragraphs in this style, but when I don't like it is when I have to debug them, 'cause I quite often find that debugger, changing the control frame isn't a good match for like step over and step into. In OCaml do you have any good tools that help with that? - I never actually had any problem debugging these kind of models. One easy way to debug them would be just to define a specific pipe and connect them with the rest of your programme to see what's happening in the middle, so you can always slice into the middle of your computation, and try to figure out. The debugger itself is quite flexible. So I guess you could do that, but I'm not sure. I never done it. - [Student] So this seems quite similar to, in F# you got the sequence module, which is built-in, but does this have any superior functionality, or? - I never used F#'s implementation, but I know that the sequence is not entirely built-in, it's using the expression notation underneath, so it desugars into, essentially, these monad codes that I presented here. Yes. I think that in F#, it's just nicer, you get a nicer syntax for the same functionality. But I assume you could implement exactly the same thing in F# too. Okay? Thank you.