# runnable at https://i...content-available-to-author-only...e.com/PEbM3m
# gist: https://g...content-available-to-author-only...b.com/d-led/9a190290e05a8131619997bbde084c34
# The demo is started via Main.run() at the end of the script.
# Main is a simple Elixir module, grouping together functions.
defmodule Main do
@moduledoc """
this sketch:
* starts 3 asynchronously running "ticking" sensor modules that
send votes to whoever is subscribed
* starts a prototypical learning module, letting it know the PIDs
(process references) of the 3 sensors
* when the learning module starts, it lets itself known to the sensor modules
(aka subscribes to their messages)
* upon a message from any of the sensor modules, the learning module adds one vote,
and trims it to a predefined maximum length, and keeps that in its state
* the Main module then sleeps a couple of times and queries
the asynchronously running learning module in an RPC/request-reply fashion
what the current hypothesis is, printing it to the console
"""
def run() do
# create 3 ticking sensor processes that run concurrently
tickers =
1..3
|> Enum.map(fn _ ->
# this is a pattern match, binding the ticker_pid variable
# to the second item of the {:ok, pid} tuple from start_link()
{:ok, ticker_pid} = TickerSensorModule.start_link()
ticker_pid
end)
# 1..3 |> Enum.map(cb) is equivalent to Enum.map(1..3, cb).
# This syntax helps chaining functions
# at this point, the tickers started running asynchronously
# start the learning module process, letting it know the ticker PIDs
{:ok, lm} = LearningModule.start_link(tickers)
# wait for some time and query the current "hypothesis" N times
1..5
|> Enum.each(fn d ->
delay = d * 100
IO.
puts("sleeping for #{delay}ms")
Process.sleep(delay)
# this is a request/response type of message, awaiting an answer from lm
{:hypothesis, h} = lm |> LearningModule.current_hypothesis?()
# :hypothesis is an "atom" (aka known string)
# {:hypothesis, h} is a tuple, deconstructing the response from 'current_hypothesis?'
# via pattern matching
IO.
puts("current hypothesis: #{h}") end)
end
end
# LearningModule implements the GenServer behavior
# the main difference to Erlang with that regard is that Elixir has default implementations
# for callbacks, and you don't have to implement all of them
defmodule LearningModule do
use GenServer
# this is a module attribute, defined at compile time
# what is, perhaps, exciting, is that one can perform calculations,
# including file IO to defin such attributes
@keep_max_votes 5
## API
def start_link(sensors),
do: GenServer.start_link(__MODULE__, %{sensors: sensors, last_votes: []})
# this is a request/response API wrapper for the :get_hypothesis message
def current_hypothesis?(pid), do: GenServer.call(pid, :get_hypothesis)
## Callbacks (as required by the GenServer behavior)
@impl true
def init(%{sensors: sensors} = state) do
# note the deconstructing pattern matching in function signatures as well
me = self()
# asynchronously letting the sensor modules who to send updates to (self() == this process)
sensors
|> Enum.each(fn sensor_pid ->
# here, we don't use the API but send a raw message to not coupled the modules by name
GenServer.cast(sensor_pid, {:subscribe, me})
end)
IO.
puts("Started a LearningModule #{inspect(self())} with #{length(sensors)} sensor(s)")
{:ok, state}
end
@impl true
def handle_cast({:vote, vote}, state = %{last_votes: last_votes}) do
# keep only @keep_max_votes votes
# the | operator is appending an element to a list
new_votes_trimmed = [vote | last_votes] |> Enum.take(@keep_max_votes)
# the | operator here updates a map
# since data is immutable, we capture the result in a new variable
new_state = %{state | last_votes: new_votes_trimmed}
{:noreply, new_state}
end
# if no votes received, the hypothesis is 0
@impl true
def handle_call(:get_hypothesis, _from, %{last_votes: []} = state) do
# the two neighboring handle_calls use pattern matching on the state of last_votes
# acting equivalent to an if/else switch
{:reply, {:hypothesis, 0}, state}
end
# if there are some votes, send an everage of those
@impl true
def handle_call(:get_hypothesis, _from, %{last_votes: last_votes} = state) do
average_hypothesis = Enum.sum(last_votes) / length(last_votes)
{:reply, {:hypothesis, average_hypothesis}, state}
end
end
defmodule TickerSensorModule do
use GenServer
@default_delay_ms 100
## API
def start_link(), do: GenServer.start_link(__MODULE__, %{count: 0, subscriber: nil})
## Callbacks
@impl true
def init(state) do
schedule_tick()
# <> is a string concatenation
"Started a TickerSensorModule #{inspect(self())} " <>
"with an approx. delay of #{@default_delay_ms}ms"
)
{:ok, state}
end
# if there's no subscriber, just continue ticking
@impl true
def handle_info(:tick, %{count: count, subscriber: nil} = state) do
schedule_tick()
# here, the state of the sensor changes, increasing the counter by 1
{:noreply, %{state | count: count + 1}}
end
# if there's a subscriber, send the current vote (count) and continue ticking
@impl true
def handle_info(:tick, %{count: count, subscriber: subscriber} = state) do
# here, again, we send a raw message not to couple the modules.
# For simplicity, the vote has a simple value of the sensor's counter
# and not a message in the Cortical Messaging Protocol,
# however, this doesn't have to look much different in a real implementation:
# e.g. vote = %{confidence: confidendce, location: {27,42, 32}, morphological_features: {count: count}}
GenServer.cast(subscriber, {:vote, count})
schedule_tick()
{:noreply, %{state | count: count + 1}}
end
@impl true
def handle_cast({:subscribe, subscriber}, state) do
IO.
puts("Subscribing #{inspect(subscriber)} to #{inspect(self())}") new_state = %{state | subscriber: subscriber}
{:noreply, new_state}
end
defp schedule_tick() do
delay
= round
(@default_delay_ms
+ 0.5 * :rand.
uniform(@default_delay_ms
)) Process.send_after(self(), :tick, delay)
end
end
# after all modules are defined, we can now run the demo
Main.run()