Wednesday, July 20, 2016

GenStage Example

Way back in December I wrote about a new Elixir project named GenRouter. It was a first attempt at defining a reactive streaming library for Elixir. It has come a long way since then! On July 14th, José Valim posted a blog entry announcing GenStage - the succesor to GenRouter.  I encourage you to read the post and watch the accompanying video as well. The "stage" in GenStage refers to the name given to the nodes in a flow; every stage is an Elixir/Erlang process thereby providing the means to parallelize flows. Stages can be of three types: consumers, producer/consumers and producers.  The data that move through a flow are called events - events can be of any type. There is much, much more to GenStage. Take a look at the excellent online documentation as well.

Just over a year ago, I wrote  about a graphical tool, named Streamtools, developed by the New York Times R&D group (it does not appear to be in active development as of this date; here's a link to description of the tool and the example.). I took one of their examples and implemented it with my ElixirFBP library. The example accesses the data feed of a bike share program in New York City. Below is an implementation of that example using GenStage (the code and instructions for running this example are available at GitHub):
alias Experimental.GenStage

defmodule GenstageExample do
  defmodule Map do
    use GenStage
    def init(url) do
      {:producer, url}
    end
    def handle_demand(demand, url) when demand > 0 do
      events = List.duplicate(url, demand)
      {:noreply, events, url}
    end
  end

  defmodule GetHTTP do
    use GenStage
    def init(_) do
      {:producer_consumer, :ok}
    end
    def handle_events(events, _from, _state) do
      events = Enum.map(events, & retrieve_as_json &1)
      {:noreply, events, :ok}
    end
    defp retrieve_as_json(url) do
      {:ok, %HTTPoison.Response{body: body}} = HTTPoison.get(url)
      Poison.Parser.parse!(body)
    end
  end

  defmodule Unpack do
    use GenStage
    def init(element) do
      {:producer_consumer, element}
    end
    def handle_events(events, _from, element) do
      events = Enum.map(events, & &1[element])
      {:noreply, events, element}
    end
  end

  defmodule Filter do
    use GenStage
    def init({filter, filter_value}) do
      {:producer_consumer, {filter, filter_value}}
    end
    def handle_events(events, _from, {filter, filter_value} = state) do
      events = Enum.map(events,
                           fn stations ->
                             Enum.filter(stations, & &1[filter] == filter_value)
                           end)
      {:noreply, events, state}
    end
  end
  defmodule Ticker do
    use GenStage
    def init(sleeping_time) do
      {:consumer, sleeping_time}
    end
    def handle_events(events, _from, sleeping_time) do
      IO.inspect(events)
      Process.sleep(sleeping_time)
      {:noreply, [], sleeping_time}
    end
  end

  # Start up each stage with initial values.
  {:ok, map}     = GenStage.start_link(Map, 
                                   "http://feeds.citibikenyc.com/stations/stations.json")
  {:ok, getHTTP} = GenStage.start_link(GetHTTP, :ok)
  {:ok, unpack}  = GenStage.start_link(Unpack, "stationBeanList")
  {:ok, filter}  = GenStage.start_link(Filter, {"stationName", "W 14 St & The High Line"})
  {:ok, ticker}  = GenStage.start_link(Ticker, 5_000)

  # Join (subscribe) stages and create the data (event) flow
  GenStage.sync_subscribe(ticker, to: filter)
  GenStage.sync_subscribe(filter, to: unpack)
  GenStage.sync_subscribe(unpack, to: getHTTP)
  GenStage.sync_subscribe(getHTTP, to: map, max_demand: 1)

  Process.sleep(:infinity)
end

There are five stages/nodes in this data flow:
  1. Map - supply a url but only if there is demand for it.
  2. GetHTTP - retrieve web data, converting the json to an Elixir KeyMap
  3. Unpack - extract an element from the keymap
  4. Filter - find a particular bike station by its name
  5. Ticker - Send a single event (demand) every 5 seconds
My original implementation of this data flow operated in push mode. That is, the first node of the flow sent a signal to the second node in order to get things started. GenStage flows operate in a pull or demand driven mode so this Ticker node is placed at the end of the flow. It notifies the stage that it is connected to that it wants - at most - one event. This demand is passed to all of the other stages until it arrives at the only pure producer in the flow: Map. All other stages are producer/consumers. They respond to - consume - events by producing new ones.