In our problem purpose statement we have
many devices with a known latitude an longitude location, and their constantly inform any change in their position.
Our objetive is send a message to the devices that are near to a given location.
Or another use will be if one device need to know who are the nearby devices inside a given radius.
Possible use cases
Send or retrieve messages from-to nearby connected devices
can be applied in many use cases:
Taxi cab fleet
Supouse that in a taxi cab fleet a new passenger request a trip,
they give us the pickup point, the system must to offer the trip to the closest cabs. Then just need to broadcast only to the cabs in the surrounding area.
Sensors mesh
In a sensor mesh one of the devices read an abnormal lecture value and want to know the values of the nearby devices inside a given radius.
Delivery food
One of the motorcycle of delivery company broke a tire and
then request help assistance to near partners, for them to distribute the food.
Table of Contents
- Dependencies
- The application
- Space Zonification
- Filtering by distance
- Devices Registry
- Socket Handler
Dependencies
Showing the dependencies is the classic way to start an Elixir post.
defp deps do
[
{:cowboy, "~> 2.4"},
{:plug_cowboy, "~> 2.0"},
{:jason, "~> 1.3"},
{:geocoder, "~> 1.1"},
{:geocalc, "~> 0.8"}
]
end
The application
The application is the core where all the involved processes will be started.
defmodule IotDevicesApp do
use Application
def start(_type, _args) do
children = [
Plug.Cowboy.child_spec(
scheme: :http,
plug: SocketApp.HttpRouter,
options: [dispatch: dispatch(),port: 5000]
),
Registry.child_spec(
keys: :duplicate,
name: Registry.IotApp
)
]
opts = [strategy: :one_for_one, name: IotDevicesApp.Application]
Supervisor.start_link(children, opts)
end
defp dispatch do
[
{:_,
[
{"/ws/[...]", IotDevicesApp.SocketHandlerLocation, []},
{:_, Plug.Cowboy.Handler, {SocketApp.HttpRouter, []}}
]
}
]
end
end
Space Zonification
In our app we have thousand of devices spread along a State, County or City, so measure the distance with all the device is not performant, a coarse zonification is nedeed and then apply a fine graded filtering using distance.
First solution that come to mind is a zonificate by city, using a reverse geocoding service like
Google Maps
or OpenStreetMap
The geocoder library fit our needs:
Hexdocs
Github
def getZone({lon, lat}) do
{:ok, coord} = Geocoder.call({lat, lon}, provider: Geocoder.Providers.OpenStreetMaps)
# OpenStreetMaps is only for testing, it is limited to 1 request by second.
"#{coord.location.state}, #{coord.location.city}"
end
Testing the function :
times_square = {-73.985130,40.758896}
getZone(times_square)
Return : "New York, New York"
Filtering by distance
A distance function must to be implemented, some question are nedeed:
- How far away are the neighbors ? 1km, 5km, 10km
- Which level of accuracy we need 1m, 100m, 500m
- Need for speed ?
Two options are presented:
A) The fast way work in flat earth calculating the "Euclidean distance" in degrees, 1 degrees at equator 69 miles
def distance({cx, cy}, {nx, ny}) do
:math.sqrt(:math.pow(abs(cx - nx), 2) + :math.pow(abs(cy - ny), 2))
end
B) Correct and may be a litle slow way, due to many calculus are involved, apply ‘haversine’ formula from GeoCalc
def distance({cx, cy}, {nx, ny}) do
Geocalc.distance_between([cx, cy], [nx, ny])
end
Devices Registry
It is necessary to store an keep a record ot all the connected devices, the right place is a Registry working as publisher subscriber. The PID and the location of every new connection is stored in the Registry, it will be showed bellow, in the Socket Handler.
Elixir Registry
The Registry is based in ETS (Erlang Term Storage).
Erlang Term Storage
The socket Handler
The start point to understand socket handler is the documentation:Cowboy WebSocket
defmodule IotDevicesApp.SocketHandlerLocation do
@behaviour :cowboy_websocket
@degrees 0.5
@meters 1500
# When a client open the connection, it send to us his latitude and longitude
# javascript: websocket = new WebSocket("ws://192.168.1.109:5000/ws?lon=-73.985130&lat=40.758896");
def init(request, _state) do
vars = URI.query_decoder(request.qs) |> Map.new()
lon_float = String.to_float(Map.get(vars, "lon", 0.0))
lat_float = String.to_float(Map.get(vars, "lat", 0.0))
{:cowboy_websocket, request, {lon_float, lat_float}, %{idle_timeout: :infinity}}
end
# Called once the connection has been upgraded to Websocket.
def websocket_init(location) do
IO.puts("-------------websocket_init--------------")
# this function have different PID that the init() function
# for that reason the PID registration is here
key = getZone(location)
Registry.IotApp
|> Registry.register(key, location)
{:ok, {key, location}}
end
# websocket_handle called when:
# javascript: websocket.send(JSON.stringify({action:"updlocation", lon:11, lat:22})
# javascript: websocket.send(JSON.stringify({action:"status", ...status})
# javascript: websocket.send(JSON.stringify({action:"wakeupneighbors"})
def websocket_handle({:text, json}, state = {key, location}) do
IO.puts("-------------websocket_handle--------------")
IO.inspect(state, label: "state")
payload = Jason.decode!(json)
case payload["action"] do
"status" -> {:reply, reportStatus(payload["status"]), state}
"updlocation" -> updateLocation(self(), key, {payload["lon"], payload["lat"]})
"wakeupneighbors" -> {:reply, wakeupNeighbors(key, location), state}
_ -> {:ok, state}
end
end
# Here broadcast happen
# websocket_info: Will be called for every Erlang message received.
# Trigger from wakeupNeighbors()
def websocket_info(info, state) do
{:reply, {:text, info}, state}
end
#----------- Next are utilities functions ---------------------
def reportStatus(status) do
IO.puts("-------------reportStatus--------------")
IO.inspect(status ,label: "status")
# Here put your logic
{:ok, str} = Jason.encode(%{"action"=>"ack"})
{:text, str}
end
def updateLocation(pid, oldkey, newlocation) do
IO.puts("-------------updateLocation--------------")
IO.inspect({pid, self()}, label: "PID")
# this function have same PID that the websocket_init() function
# IO.inspect(newlocation ,label: "newlocation")
newkey = getZone(newlocation)
IO.inspect(Registry.values(Registry.IotApp, oldkey, self()), label: "old location")
Registry.unregister(Registry.IotApp, oldkey)
Registry.register(Registry.IotApp, newkey, newlocation)
IO.inspect(Registry.values(Registry.IotApp, newkey, self()), label: "new location")
{:ok, str} = Jason.encode(%{"action" => "ack"})
{:reply, {:text, str}, {newkey, newlocation}}
end
def wakeupNeighbors(key, location) do
# this function have same PID that the websocket_init() function
IO.puts("-------------wakeupNeighbors--------------")
{:ok, message} = Jason.encode(%{"action" => "report_status"})
Registry.IotApp
|> Registry.dispatch(key, fn entries ->
for {pid, neighbor} <- entries do
IO.inspect(neighbor, label: "neighbor location")
if pid != self() && isNeighbour(location, neighbor) do
IO.inspect(pid, label: " send msg to")
Process.send(pid, message, [])
end
end
end)
{:ok, str} = Jason.encode(%{"action" => "ack"})
{:text, str}
end
def isNeighbour(location, neighbor) do
# @degrees > distanceEuclidean(location,neighbor)
@meters > distanceHaversine(location, neighbor)
end
def distanceEuclidean({cx, cy}, {nx, ny}) do
:math.sqrt(:math.pow(abs(cx - nx), 2) + :math.pow(abs(cy - ny), 2))
end
def distanceHaversine({cx, cy}, {nx, ny}) do
Geocalc.distance_between([cx, cy], [nx, ny])
end
def getZone({lon, lat}) do
{:ok, coord} = Geocoder.call({lat, lon}, provider: Geocoder.Providers.OpenStreetMaps)
# OpenStreetMaps is only for testing, it is limited to 1 request by second.
key = "#{coord.location.state}, #{coord.location.city}"
IO.inspect(key, label: "getZone")
key
end
end
Top comments (0)