Friday, July 17, 2015

The correct Subscription?

In the last post, I showed a possible implementation for a Subscription - the entity that sits between a Publisher and a Subscriber in a Reactive Stream. It wasn't a very good implementation; it is lacking in many respects not the least of which is that it couldn't deal with an FBP Component executing in multiple Elixir/Erlang processes. ElixirFBP is meant to exploit the relatively cheap cost of spawning many processes in Erlang/Elixir. I could have used one Subscription per component process but that would have placed the burden of determining which process to send a message to with the Component. I would like to keep the Components as simple as possible. Of course, there is still the possibility that a Subscription process will become a "choke" point in a flow.

Below is the heart of a new version of the Subscription module: the loop function:
    def loop(%Subscription{
                publisher_pids: publisher_pids,
                publisher_port: publisher_port,
                subscriber_pids: subscriber_pids,
                subscriber_pids_length: subscriber_pids_length,
                subscriber_port: subscriber_port} = subscription,
                subscriber_index) do
      receive do
        {:pull, n}  when n > 0 ->
          # A request for data from the subscriber
          # Ask all the publishers for a data value
          Stream.cycle(publisher_pids)
            |> Stream.take(n)
            |> Enum.each(fn(publisher_pid) ->
              send(publisher_pid, {publisher_port, self()})
            end)
          loop(subscription, 0)
        {^publisher_port, value} ->
          # Publisher has sent data - pass it on to the next subscriber
          send(elem(subscriber_pids, subscriber_index), {subscriber_port, value})
          loop(subscription, rem(subscriber_index + 1, subscriber_pids_length))
        message ->
          Logger.info("Received unknown message: #{inspect message}")
          loop(subscription, subscriber_index)
      end
    end
Notice that when a pull request is received, all of the processes that are currently executing the Publisher component are sent a request. When the responses to these requests begin arriving back, each response is sent to one of the Subscriber component processes in a round-robin fashion. Unless otherwise specified, the default number of processes spawned per component is one.

There is still a problem with this design: the responses are not necessarily received in the same order as the requests that were sent out. There are many examples where the order of the Information Packets is important. I will address this shortcoming a little later.

How does this all work? At the lowest level, these are the steps required:
  1. Spawn one or more processes for every Component in the FBP Graph.
  2. Spawn a Subscription process for every edge between two Components.
  3. Send pull requests to the Subscription processes.
These steps are illustrated in the code below:
    iip1 = Component.start_with_iips(Trial.InitialInformationPacket, [value: 44])
    iip2 = Component.start_with_iips(Trial.InitialInformationPacket, [value: 33])
    add = Component.start(Trial.Add)
    show = Component.start(Trial.Show)
    sub1_pid = Subscription.start(iip1, :value, add, :addend)
    sub2_pid = Subscription.start(iip2, :value, add, :augend)
    sub3_pid = Subscription.start(add, :sum, show, :in)
    send(sub3_pid, {:pull, 1})
    send(sub2_pid, {:pull, 1})
    send(sub1_pid, {:pull, 1})
An InitialInformationPacket is a component that always responds with a constant value. In the example, the Add component is sent the constant values of 44 and 33.

I now need to introduce Subscriptions to the existing code base and make it all work - as it once did - with the NoFlo Development Environment.

Wednesday, July 8, 2015

Reactive Streams, continued.

In the last post, I explained that I wanted ElixirFBP to exhibit Reactive Stream behavior, namely that a component could apply "back pressure" to its upstream component so that it didn't suffer from buffer overflows and similar errors. Data still flows downstream but how that flow is initiated is different: Now a component does not send an Information Packet (IP) to its downstream connection when that IP is ready - for instance, when a Sum component receives both of its inputs. Rather, it sends a value only upon being asked for it.

Recall, that components no longer connect directly with each other. Instead, they are connected via a Subscription. The Subscription knows about a Publisher and a Subscriber and is responsible for moving IP's to where they belong and for maintaining control over how many of these IP's are sent. Publishers and Subscribers are simply Components. Below is a possible implementation of a Subscription:
defmodule Subscription do
  require Logger

  defstruct [
    publisher_pid: nil, publisher_port: nil,
    subscriber_pid: nil, subscriber_port: nil
  ]
  def start(%Subscription{} = subscription) do
    spawn(fn -> loop(subscription,  0) end)
  end
  def loop(%Subscription{
              publisher_pid: publisher_pid,
              publisher_port: publisher_port,
              subscriber_pid: subscriber_pid,
              subscriber_port: subscriber_port} = subscription, count) do
    receive do
      {:pull, n}  when n > 0 ->
        # A request for data from the subscriber
        # Ask the publisher for a data value
        send(publisher_pid, {publisher_port, subscription})
        loop(subscription, count + n)
      {publisher_port, value} when count > 0 ->
        # Publisher has sent data - pass it on to the subscriber
        send(subscriber_pid, {subscriber_port, value})
        # Ask the publisher for more data
        send(publisher_pid, {publisher_port, subscription})
        loop(subscription, count - 1)
      {publisher_port, value} when count == 0 ->
        # Unrequested data from the publisher. Drop it?
        loop(subscription, 0)
      {:complete} ->
        # From the publisher TODO: Now what?
        loop(subscription, 0)
      {:error, message} ->
        Logger.error("Error received #{inspect message}")
        loop(subscription, 0)
      message ->
        Logger.info("Received unknown message: #{inspect message}")
        loop(subscription, count)
    end
  end
end
A subscription is started as a separate Elixir process and immediately begins listening for messages from its publisher and subscriber. Components need to be rewritten because they are no longer pushing data. Here's what a rewritten Add component would look like:
defmodule Add do
  def inports, do: [addend: :integer, augend: :integer]
  def outports, do: [sum: :integer]
  def loop(inports, outports) do
    %{:augend => augend, :addend => addend} = inports
    %{:sum => sum} = outports
    receive do
      {:addend, value} ->
        inports = %{inports | :addend => value}
        loop(inports, outports)
      {:augend, value} ->
        inports = %{inports | :augend => value}
        loop(inports, outports)
      {:sum, subscription} when is_number(addend) and is_number(augend) ->
        sum = addend + augend
        outports = %{outports | :sum => sum}
        Component.send_ip(subscription, sum)
        loop(inports, outports)
    end
  end
end
Notice how the component waits for a message specifically requesting its calculated value and for that value to be sent to the subscription rather than directly to a downstream component.

Typically, one sees Reactive Streams used in dataflows that are built via functional composition. Importantly, the composition is evaluated lazily. That is, data does not begin to flow until the last function is executed. In Apache Spark these terminal functions are called actions. In effect, the actions begin pulling the data through the flow. This is also, exactly the behavior exhibited by Elixir Streams.

Thursday, July 2, 2015

Reactive Streams

It's been two weeks since my last post during which time I have embarked on a major internal design change. Early on, I said that I wanted ElixirFBP to more closely follow the so-called "classical" Flow Based Programming model - Paul Morrison is the source for this term. He discusses the differences between his implementations of FBP and that of NoFlo. One difference is that classical FBP components can exhibit back pressure on their upstream components. What this means is that a component can tell a component connected to one of its in ports: "Send no more data!" This ability is crucial when connected components can all be running at different speeds; a fast component could overwhelm a slower component leading to buffer overruns or out of memory errors.

This problem has become especially critical in the era of "Big Data" and the processing flows/streams that are being built to handle these massive amounts of data. The desire to have these data streams handled by asynchronous processes (for concurrency reasons) compounds the problem. Recently a group - Reactive Streams - was formed to study this problem. The result is a specification that describes the behavior of asynchronous processors that can apply non-blocking back pressure in the face of fast streams of data. The people at Akka contributed to the writing of the specification; they have a good description of how all this works in the context of their Akka Streams. I also recommend watching Roland Kuhn's presentation.

The Reactive Streams specification is very straightforward and easy to understand. There are Publishers and Subscribers. Subscribers ask for a Subscription from a Publisher. The Subscription is used by the Subscriber to tell the Publisher that it can only receive a certain number of packets of data. This constraint can change during processing as the dynamics of the overall graph change.

I have begun changing ElixirFBP so that any two connected components no longer speak directly with each other (exchange messages) rather, they speak to a Subscription. In true Erlang/Elixir fashion the Subscriptions are separate processes. The components need to be written a little differently now. Here, for example, is the new version of the Add component:
  defmodule Add do
    def inports, do: [addend: :integer, augend: :integer]
    def outports, do: [sum: :integer]
    def loop(inports, outports) do
      %{:augend => augend, :addend => addend} = inports
      %{:sum => sum} = outports
      receive do
        {:addend, value} ->
          inports = %{inports | :addend => value}
          loop(inports, outports)
        {:augend, value} ->
          inports = %{inports | :augend => value}
          loop(inports, outports)
        {:sum, subscription} when is_number(addend) and is_number(augend) ->
          sum = addend + augend
          outports = %{outports | :sum => sum}
          Component.send_ip(subscription, sum)
          loop(inports, outports)
        message ->
          loop(inports, outports)
      end
    end
  end
You don't see any indication of back pressure being applied - this is all handled in the Subscription that is associated with each connected in and out port. Subscriptions are created during the adding of an edge to the network graph.

Notice also that a downstream component must ask for a value (the sum) from this component rather than passively receiving it when the value has been computed. This is an example of "pulling" data rather than "pushing" data and thus exhibits "lazy" behavior. Elixir Streams operate this way and so does Apache Streams.

None of this code is available at my GitHub repository. I hope to push up all these changes next week.