Friday, July 17, 2015

The correct Subscription?

In the last post, I showed a possible implementation for a Subscription - the entity that sits between a Publisher and a Subscriber in a Reactive Stream. It wasn't a very good implementation; it is lacking in many respects not the least of which is that it couldn't deal with an FBP Component executing in multiple Elixir/Erlang processes. ElixirFBP is meant to exploit the relatively cheap cost of spawning many processes in Erlang/Elixir. I could have used one Subscription per component process but that would have placed the burden of determining which process to send a message to with the Component. I would like to keep the Components as simple as possible. Of course, there is still the possibility that a Subscription process will become a "choke" point in a flow.

Below is the heart of a new version of the Subscription module: the loop function:
    def loop(%Subscription{
                publisher_pids: publisher_pids,
                publisher_port: publisher_port,
                subscriber_pids: subscriber_pids,
                subscriber_pids_length: subscriber_pids_length,
                subscriber_port: subscriber_port} = subscription,
                subscriber_index) do
      receive do
        {:pull, n}  when n > 0 ->
          # A request for data from the subscriber
          # Ask all the publishers for a data value
          Stream.cycle(publisher_pids)
            |> Stream.take(n)
            |> Enum.each(fn(publisher_pid) ->
              send(publisher_pid, {publisher_port, self()})
            end)
          loop(subscription, 0)
        {^publisher_port, value} ->
          # Publisher has sent data - pass it on to the next subscriber
          send(elem(subscriber_pids, subscriber_index), {subscriber_port, value})
          loop(subscription, rem(subscriber_index + 1, subscriber_pids_length))
        message ->
          Logger.info("Received unknown message: #{inspect message}")
          loop(subscription, subscriber_index)
      end
    end
Notice that when a pull request is received, all of the processes that are currently executing the Publisher component are sent a request. When the responses to these requests begin arriving back, each response is sent to one of the Subscriber component processes in a round-robin fashion. Unless otherwise specified, the default number of processes spawned per component is one.

There is still a problem with this design: the responses are not necessarily received in the same order as the requests that were sent out. There are many examples where the order of the Information Packets is important. I will address this shortcoming a little later.

How does this all work? At the lowest level, these are the steps required:
  1. Spawn one or more processes for every Component in the FBP Graph.
  2. Spawn a Subscription process for every edge between two Components.
  3. Send pull requests to the Subscription processes.
These steps are illustrated in the code below:
    iip1 = Component.start_with_iips(Trial.InitialInformationPacket, [value: 44])
    iip2 = Component.start_with_iips(Trial.InitialInformationPacket, [value: 33])
    add = Component.start(Trial.Add)
    show = Component.start(Trial.Show)
    sub1_pid = Subscription.start(iip1, :value, add, :addend)
    sub2_pid = Subscription.start(iip2, :value, add, :augend)
    sub3_pid = Subscription.start(add, :sum, show, :in)
    send(sub3_pid, {:pull, 1})
    send(sub2_pid, {:pull, 1})
    send(sub1_pid, {:pull, 1})
An InitialInformationPacket is a component that always responds with a constant value. In the example, the Add component is sent the constant values of 44 and 33.

I now need to introduce Subscriptions to the existing code base and make it all work - as it once did - with the NoFlo Development Environment.

No comments:

Post a Comment