Wednesday, July 8, 2015

Reactive Streams, continued.

In the last post, I explained that I wanted ElixirFBP to exhibit Reactive Stream behavior, namely that a component could apply "back pressure" to its upstream component so that it didn't suffer from buffer overflows and similar errors. Data still flows downstream but how that flow is initiated is different: Now a component does not send an Information Packet (IP) to its downstream connection when that IP is ready - for instance, when a Sum component receives both of its inputs. Rather, it sends a value only upon being asked for it.

Recall, that components no longer connect directly with each other. Instead, they are connected via a Subscription. The Subscription knows about a Publisher and a Subscriber and is responsible for moving IP's to where they belong and for maintaining control over how many of these IP's are sent. Publishers and Subscribers are simply Components. Below is a possible implementation of a Subscription:
defmodule Subscription do
  require Logger

  defstruct [
    publisher_pid: nil, publisher_port: nil,
    subscriber_pid: nil, subscriber_port: nil
  def start(%Subscription{} = subscription) do
    spawn(fn -> loop(subscription,  0) end)
  def loop(%Subscription{
              publisher_pid: publisher_pid,
              publisher_port: publisher_port,
              subscriber_pid: subscriber_pid,
              subscriber_port: subscriber_port} = subscription, count) do
    receive do
      {:pull, n}  when n > 0 ->
        # A request for data from the subscriber
        # Ask the publisher for a data value
        send(publisher_pid, {publisher_port, subscription})
        loop(subscription, count + n)
      {publisher_port, value} when count > 0 ->
        # Publisher has sent data - pass it on to the subscriber
        send(subscriber_pid, {subscriber_port, value})
        # Ask the publisher for more data
        send(publisher_pid, {publisher_port, subscription})
        loop(subscription, count - 1)
      {publisher_port, value} when count == 0 ->
        # Unrequested data from the publisher. Drop it?
        loop(subscription, 0)
      {:complete} ->
        # From the publisher TODO: Now what?
        loop(subscription, 0)
      {:error, message} ->
        Logger.error("Error received #{inspect message}")
        loop(subscription, 0)
      message ->"Received unknown message: #{inspect message}")
        loop(subscription, count)
A subscription is started as a separate Elixir process and immediately begins listening for messages from its publisher and subscriber. Components need to be rewritten because they are no longer pushing data. Here's what a rewritten Add component would look like:
defmodule Add do
  def inports, do: [addend: :integer, augend: :integer]
  def outports, do: [sum: :integer]
  def loop(inports, outports) do
    %{:augend => augend, :addend => addend} = inports
    %{:sum => sum} = outports
    receive do
      {:addend, value} ->
        inports = %{inports | :addend => value}
        loop(inports, outports)
      {:augend, value} ->
        inports = %{inports | :augend => value}
        loop(inports, outports)
      {:sum, subscription} when is_number(addend) and is_number(augend) ->
        sum = addend + augend
        outports = %{outports | :sum => sum}
        Component.send_ip(subscription, sum)
        loop(inports, outports)
Notice how the component waits for a message specifically requesting its calculated value and for that value to be sent to the subscription rather than directly to a downstream component.

Typically, one sees Reactive Streams used in dataflows that are built via functional composition. Importantly, the composition is evaluated lazily. That is, data does not begin to flow until the last function is executed. In Apache Spark these terminal functions are called actions. In effect, the actions begin pulling the data through the flow. This is also, exactly the behavior exhibited by Elixir Streams.

No comments:

Post a Comment