fork download
  1. # runnable at https://i...content-available-to-author-only...e.com/PEbM3m
  2. # gist: https://g...content-available-to-author-only...b.com/d-led/9a190290e05a8131619997bbde084c34
  3. # The demo is started via Main.run() at the end of the script.
  4. # Main is a simple Elixir module, grouping together functions.
  5. defmodule Main do
  6. @moduledoc """
  7. this sketch:
  8. * starts 3 asynchronously running "ticking" sensor modules that
  9. send votes to whoever is subscribed
  10. * starts a prototypical learning module, letting it know the PIDs
  11. (process references) of the 3 sensors
  12. * when the learning module starts, it lets itself known to the sensor modules
  13. (aka subscribes to their messages)
  14. * upon a message from any of the sensor modules, the learning module adds one vote,
  15. and trims it to a predefined maximum length, and keeps that in its state
  16. * the Main module then sleeps a couple of times and queries
  17. the asynchronously running learning module in an RPC/request-reply fashion
  18. what the current hypothesis is, printing it to the console
  19. """
  20. def run() do
  21. # create 3 ticking sensor processes that run concurrently
  22. tickers =
  23. 1..3
  24. |> Enum.map(fn _ ->
  25. # this is a pattern match, binding the ticker_pid variable
  26. # to the second item of the {:ok, pid} tuple from start_link()
  27. {:ok, ticker_pid} = TickerSensorModule.start_link()
  28. ticker_pid
  29. end)
  30.  
  31. # 1..3 |> Enum.map(cb) is equivalent to Enum.map(1..3, cb).
  32. # This syntax helps chaining functions
  33.  
  34. # at this point, the tickers started running asynchronously
  35.  
  36. # start the learning module process, letting it know the ticker PIDs
  37. {:ok, lm} = LearningModule.start_link(tickers)
  38.  
  39. # wait for some time and query the current "hypothesis" N times
  40. 1..5
  41. |> Enum.each(fn d ->
  42. delay = d * 100
  43. IO.puts("sleeping for #{delay}ms")
  44.  
  45. Process.sleep(delay)
  46.  
  47. # this is a request/response type of message, awaiting an answer from lm
  48. {:hypothesis, h} = lm |> LearningModule.current_hypothesis?()
  49. # :hypothesis is an "atom" (aka known string)
  50. # {:hypothesis, h} is a tuple, deconstructing the response from 'current_hypothesis?'
  51. # via pattern matching
  52.  
  53. IO.puts("current hypothesis: #{h}")
  54. end)
  55. end
  56. end
  57.  
  58. # LearningModule implements the GenServer behavior
  59. # the main difference to Erlang with that regard is that Elixir has default implementations
  60. # for callbacks, and you don't have to implement all of them
  61. defmodule LearningModule do
  62. use GenServer
  63.  
  64. # this is a module attribute, defined at compile time
  65. # what is, perhaps, exciting, is that one can perform calculations,
  66. # including file IO to defin such attributes
  67. @keep_max_votes 5
  68.  
  69. ## API
  70. def start_link(sensors),
  71. do: GenServer.start_link(__MODULE__, %{sensors: sensors, last_votes: []})
  72.  
  73. # this is a request/response API wrapper for the :get_hypothesis message
  74. def current_hypothesis?(pid), do: GenServer.call(pid, :get_hypothesis)
  75.  
  76. ## Callbacks (as required by the GenServer behavior)
  77. @impl true
  78. def init(%{sensors: sensors} = state) do
  79. # note the deconstructing pattern matching in function signatures as well
  80.  
  81. me = self()
  82. # asynchronously letting the sensor modules who to send updates to (self() == this process)
  83. sensors
  84. |> Enum.each(fn sensor_pid ->
  85. # here, we don't use the API but send a raw message to not coupled the modules by name
  86. GenServer.cast(sensor_pid, {:subscribe, me})
  87. end)
  88.  
  89. IO.puts("Started a LearningModule #{inspect(self())} with #{length(sensors)} sensor(s)")
  90.  
  91. {:ok, state}
  92. end
  93.  
  94. @impl true
  95. def handle_cast({:vote, vote}, state = %{last_votes: last_votes}) do
  96. # keep only @keep_max_votes votes
  97. # the | operator is appending an element to a list
  98. new_votes_trimmed = [vote | last_votes] |> Enum.take(@keep_max_votes)
  99.  
  100. # the | operator here updates a map
  101. # since data is immutable, we capture the result in a new variable
  102. new_state = %{state | last_votes: new_votes_trimmed}
  103.  
  104. {:noreply, new_state}
  105. end
  106.  
  107. # if no votes received, the hypothesis is 0
  108. @impl true
  109. def handle_call(:get_hypothesis, _from, %{last_votes: []} = state) do
  110. # the two neighboring handle_calls use pattern matching on the state of last_votes
  111. # acting equivalent to an if/else switch
  112. {:reply, {:hypothesis, 0}, state}
  113. end
  114.  
  115. # if there are some votes, send an everage of those
  116. @impl true
  117. def handle_call(:get_hypothesis, _from, %{last_votes: last_votes} = state) do
  118. average_hypothesis = Enum.sum(last_votes) / length(last_votes)
  119.  
  120. {:reply, {:hypothesis, average_hypothesis}, state}
  121. end
  122. end
  123.  
  124. defmodule TickerSensorModule do
  125. use GenServer
  126.  
  127. @default_delay_ms 100
  128.  
  129. ## API
  130. def start_link(), do: GenServer.start_link(__MODULE__, %{count: 0, subscriber: nil})
  131.  
  132. ## Callbacks
  133. @impl true
  134. def init(state) do
  135. schedule_tick()
  136.  
  137. IO.puts(
  138. # <> is a string concatenation
  139. "Started a TickerSensorModule #{inspect(self())} " <>
  140. "with an approx. delay of #{@default_delay_ms}ms"
  141. )
  142.  
  143. {:ok, state}
  144. end
  145.  
  146. # if there's no subscriber, just continue ticking
  147. @impl true
  148. def handle_info(:tick, %{count: count, subscriber: nil} = state) do
  149. schedule_tick()
  150.  
  151. # here, the state of the sensor changes, increasing the counter by 1
  152. {:noreply, %{state | count: count + 1}}
  153. end
  154.  
  155. # if there's a subscriber, send the current vote (count) and continue ticking
  156. @impl true
  157. def handle_info(:tick, %{count: count, subscriber: subscriber} = state) do
  158. # here, again, we send a raw message not to couple the modules.
  159. # For simplicity, the vote has a simple value of the sensor's counter
  160. # and not a message in the Cortical Messaging Protocol,
  161. # however, this doesn't have to look much different in a real implementation:
  162. # e.g. vote = %{confidence: confidendce, location: {27,42, 32}, morphological_features: {count: count}}
  163. GenServer.cast(subscriber, {:vote, count})
  164.  
  165. schedule_tick()
  166. {:noreply, %{state | count: count + 1}}
  167. end
  168.  
  169. @impl true
  170. def handle_cast({:subscribe, subscriber}, state) do
  171. IO.puts("Subscribing #{inspect(subscriber)} to #{inspect(self())}")
  172. new_state = %{state | subscriber: subscriber}
  173. {:noreply, new_state}
  174. end
  175.  
  176. defp schedule_tick() do
  177. delay = round(@default_delay_ms + 0.5 * :rand.uniform(@default_delay_ms))
  178. Process.send_after(self(), :tick, delay)
  179. end
  180. end
  181.  
  182. # after all modules are defined, we can now run the demo
  183. Main.run()
  184.  
Success #stdin #stdout 0.57s 36944KB
stdin
Standard input is empty
stdout
Started a TickerSensorModule #PID<0.101.0> with an approx. delay of 100ms
Started a TickerSensorModule #PID<0.102.0> with an approx. delay of 100ms
Started a TickerSensorModule #PID<0.103.0> with an approx. delay of 100ms
Started a LearningModule #PID<0.104.0> with 3 sensor(s)
Subscribing #PID<0.104.0> to #PID<0.101.0>
Subscribing #PID<0.104.0> to #PID<0.102.0>
Subscribing #PID<0.104.0> to #PID<0.103.0>
sleeping for 100ms
current hypothesis: 0.0
sleeping for 200ms
current hypothesis: 0.6
sleeping for 300ms
current hypothesis: 2.6
sleeping for 400ms
current hypothesis: 5.6
sleeping for 500ms
current hypothesis: 10.0