# runnable at https://i...content-available-to-author-only...e.com/PEbM3m # gist: https://g...content-available-to-author-only...b.com/d-led/9a190290e05a8131619997bbde084c34 # The demo is started via Main.run() at the end of the script. # Main is a simple Elixir module, grouping together functions. defmodule Main do @moduledoc """ this sketch: * starts 3 asynchronously running "ticking" sensor modules that send votes to whoever is subscribed * starts a prototypical learning module, letting it know the PIDs (process references) of the 3 sensors * when the learning module starts, it lets itself known to the sensor modules (aka subscribes to their messages) * upon a message from any of the sensor modules, the learning module adds one vote, and trims it to a predefined maximum length, and keeps that in its state * the Main module then sleeps a couple of times and queries the asynchronously running learning module in an RPC/request-reply fashion what the current hypothesis is, printing it to the console """ def run() do # create 3 ticking sensor processes that run concurrently tickers = 1..3 |> Enum.map(fn _ -> # this is a pattern match, binding the ticker_pid variable # to the second item of the {:ok, pid} tuple from start_link() {:ok, ticker_pid} = TickerSensorModule.start_link() ticker_pid end) # 1..3 |> Enum.map(cb) is equivalent to Enum.map(1..3, cb). # This syntax helps chaining functions # at this point, the tickers started running asynchronously # start the learning module process, letting it know the ticker PIDs {:ok, lm} = LearningModule.start_link(tickers) # wait for some time and query the current "hypothesis" N times 1..5 |> Enum.each(fn d -> delay = d * 100 IO.puts("sleeping for #{delay}ms") Process.sleep(delay) # this is a request/response type of message, awaiting an answer from lm {:hypothesis, h} = lm |> LearningModule.current_hypothesis?() # :hypothesis is an "atom" (aka known string) # {:hypothesis, h} is a tuple, deconstructing the response from 'current_hypothesis?' # via pattern matching IO.puts("current hypothesis: #{h}") end) end end # LearningModule implements the GenServer behavior # the main difference to Erlang with that regard is that Elixir has default implementations # for callbacks, and you don't have to implement all of them defmodule LearningModule do use GenServer # this is a module attribute, defined at compile time # what is, perhaps, exciting, is that one can perform calculations, # including file IO to defin such attributes @keep_max_votes 5 ## API def start_link(sensors), do: GenServer.start_link(__MODULE__, %{sensors: sensors, last_votes: []}) # this is a request/response API wrapper for the :get_hypothesis message def current_hypothesis?(pid), do: GenServer.call(pid, :get_hypothesis) ## Callbacks (as required by the GenServer behavior) @impl true def init(%{sensors: sensors} = state) do # note the deconstructing pattern matching in function signatures as well me = self() # asynchronously letting the sensor modules who to send updates to (self() == this process) sensors |> Enum.each(fn sensor_pid -> # here, we don't use the API but send a raw message to not coupled the modules by name GenServer.cast(sensor_pid, {:subscribe, me}) end) IO.puts("Started a LearningModule #{inspect(self())} with #{length(sensors)} sensor(s)") {:ok, state} end @impl true def handle_cast({:vote, vote}, state = %{last_votes: last_votes}) do # keep only @keep_max_votes votes # the | operator is appending an element to a list new_votes_trimmed = [vote | last_votes] |> Enum.take(@keep_max_votes) # the | operator here updates a map # since data is immutable, we capture the result in a new variable new_state = %{state | last_votes: new_votes_trimmed} {:noreply, new_state} end # if no votes received, the hypothesis is 0 @impl true def handle_call(:get_hypothesis, _from, %{last_votes: []} = state) do # the two neighboring handle_calls use pattern matching on the state of last_votes # acting equivalent to an if/else switch {:reply, {:hypothesis, 0}, state} end # if there are some votes, send an everage of those @impl true def handle_call(:get_hypothesis, _from, %{last_votes: last_votes} = state) do average_hypothesis = Enum.sum(last_votes) / length(last_votes) {:reply, {:hypothesis, average_hypothesis}, state} end end defmodule TickerSensorModule do use GenServer @default_delay_ms 100 ## API def start_link(), do: GenServer.start_link(__MODULE__, %{count: 0, subscriber: nil}) ## Callbacks @impl true def init(state) do schedule_tick() IO.puts( # <> is a string concatenation "Started a TickerSensorModule #{inspect(self())} " <> "with an approx. delay of #{@default_delay_ms}ms" ) {:ok, state} end # if there's no subscriber, just continue ticking @impl true def handle_info(:tick, %{count: count, subscriber: nil} = state) do schedule_tick() # here, the state of the sensor changes, increasing the counter by 1 {:noreply, %{state | count: count + 1}} end # if there's a subscriber, send the current vote (count) and continue ticking @impl true def handle_info(:tick, %{count: count, subscriber: subscriber} = state) do # here, again, we send a raw message not to couple the modules. # For simplicity, the vote has a simple value of the sensor's counter # and not a message in the Cortical Messaging Protocol, # however, this doesn't have to look much different in a real implementation: # e.g. vote = %{confidence: confidendce, location: {27,42, 32}, morphological_features: {count: count}} GenServer.cast(subscriber, {:vote, count}) schedule_tick() {:noreply, %{state | count: count + 1}} end @impl true def handle_cast({:subscribe, subscriber}, state) do IO.puts("Subscribing #{inspect(subscriber)} to #{inspect(self())}") new_state = %{state | subscriber: subscriber} {:noreply, new_state} end defp schedule_tick() do delay = round(@default_delay_ms + 0.5 * :rand.uniform(@default_delay_ms)) Process.send_after(self(), :tick, delay) end end # after all modules are defined, we can now run the demo Main.run()