Thursday, July 2, 2015

Reactive Streams

It's been two weeks since my last post during which time I have embarked on a major internal design change. Early on, I said that I wanted ElixirFBP to more closely follow the so-called "classical" Flow Based Programming model - Paul Morrison is the source for this term. He discusses the differences between his implementations of FBP and that of NoFlo. One difference is that classical FBP components can exhibit back pressure on their upstream components. What this means is that a component can tell a component connected to one of its in ports: "Send no more data!" This ability is crucial when connected components can all be running at different speeds; a fast component could overwhelm a slower component leading to buffer overruns or out of memory errors.

This problem has become especially critical in the era of "Big Data" and the processing flows/streams that are being built to handle these massive amounts of data. The desire to have these data streams handled by asynchronous processes (for concurrency reasons) compounds the problem. Recently a group - Reactive Streams - was formed to study this problem. The result is a specification that describes the behavior of asynchronous processors that can apply non-blocking back pressure in the face of fast streams of data. The people at Akka contributed to the writing of the specification; they have a good description of how all this works in the context of their Akka Streams. I also recommend watching Roland Kuhn's presentation.

The Reactive Streams specification is very straightforward and easy to understand. There are Publishers and Subscribers. Subscribers ask for a Subscription from a Publisher. The Subscription is used by the Subscriber to tell the Publisher that it can only receive a certain number of packets of data. This constraint can change during processing as the dynamics of the overall graph change.

I have begun changing ElixirFBP so that any two connected components no longer speak directly with each other (exchange messages) rather, they speak to a Subscription. In true Erlang/Elixir fashion the Subscriptions are separate processes. The components need to be written a little differently now. Here, for example, is the new version of the Add component:
  defmodule Add do
    def inports, do: [addend: :integer, augend: :integer]
    def outports, do: [sum: :integer]
    def loop(inports, outports) do
      %{:augend => augend, :addend => addend} = inports
      %{:sum => sum} = outports
      receive do
        {:addend, value} ->
          inports = %{inports | :addend => value}
          loop(inports, outports)
        {:augend, value} ->
          inports = %{inports | :augend => value}
          loop(inports, outports)
        {:sum, subscription} when is_number(addend) and is_number(augend) ->
          sum = addend + augend
          outports = %{outports | :sum => sum}
          Component.send_ip(subscription, sum)
          loop(inports, outports)
        message ->
          loop(inports, outports)
      end
    end
  end
You don't see any indication of back pressure being applied - this is all handled in the Subscription that is associated with each connected in and out port. Subscriptions are created during the adding of an edge to the network graph.

Notice also that a downstream component must ask for a value (the sum) from this component rather than passively receiving it when the value has been computed. This is an example of "pulling" data rather than "pushing" data and thus exhibits "lazy" behavior. Elixir Streams operate this way and so does Apache Streams.

None of this code is available at my GitHub repository. I hope to push up all these changes next week.

No comments:

Post a Comment