DEV Community

Lionel Marco
Lionel Marco

Posted on

Elixir Cowboy Websocket - Querying IOT devices by latitude longitude proximity

In our problem purpose statement we have
many devices with a known latitude an longitude location, and their constantly inform any change in their position.

Our objetive is send a message to the devices that are near to a given location.
Or another use will be if one device need to know who are the nearby devices inside a given radius.

Possible use cases

Send or retrieve messages from-to nearby connected devices
can be applied in many use cases:

Taxi cab fleet

Supouse that in a taxi cab fleet a new passenger request a trip,
they give us the pickup point, the system must to offer the trip to the closest cabs. Then just need to broadcast only to the cabs in the surrounding area.

Sensors mesh

In a sensor mesh one of the devices read an abnormal lecture value and want to know the values of the nearby devices inside a given radius.

Delivery food

One of the motorcycle of delivery company broke a tire and
then request help assistance to near partners, for them to distribute the food.

Table of Contents

Dependencies

Showing the dependencies is the classic way to start an Elixir post.

  defp deps do
    [
      {:cowboy, "~> 2.4"},
      {:plug_cowboy, "~> 2.0"},
      {:jason, "~> 1.3"},
      {:geocoder, "~> 1.1"},
      {:geocalc, "~> 0.8"}
    ]
  end
Enter fullscreen mode Exit fullscreen mode

The application

The application is the core where all the involved processes will be started.

defmodule IotDevicesApp do
  use Application
  def start(_type, _args) do

    children = [
      Plug.Cowboy.child_spec(
        scheme: :http,
        plug:  SocketApp.HttpRouter,
        options: [dispatch: dispatch(),port: 5000]
      ),
      Registry.child_spec(
        keys: :duplicate,
        name: Registry.IotApp
      )
    ]

    opts = [strategy: :one_for_one, name: IotDevicesApp.Application]
    Supervisor.start_link(children, opts)
  end

  defp dispatch do
    [
      {:_,
        [
          {"/ws/[...]", IotDevicesApp.SocketHandlerLocation, []},
          {:_, Plug.Cowboy.Handler, {SocketApp.HttpRouter, []}}
        ]
      }
    ]
  end  
end

Enter fullscreen mode Exit fullscreen mode

Space Zonification

In our app we have thousand of devices spread along a State, County or City, so measure the distance with all the device is not performant, a coarse zonification is nedeed and then apply a fine graded filtering using distance.

First solution that come to mind is a zonificate by city, using a reverse geocoding service like
Google Maps
or OpenStreetMap

The geocoder library fit our needs:
Hexdocs
Github

def getZone({lon, lat}) do 
  {:ok, coord} = Geocoder.call({lat, lon}, provider: Geocoder.Providers.OpenStreetMaps)
  # OpenStreetMaps is only for testing, it is limited to 1 request by second.
  "#{coord.location.state}, #{coord.location.city}"
end

Enter fullscreen mode Exit fullscreen mode

Testing the function :


times_square = {-73.985130,40.758896}
getZone(times_square)
Return : "New York, New York"

Enter fullscreen mode Exit fullscreen mode

Filtering by distance

A distance function must to be implemented, some question are nedeed:

  • How far away are the neighbors ? 1km, 5km, 10km
  • Which level of accuracy we need 1m, 100m, 500m
  • Need for speed ?

Two options are presented:
A) The fast way work in flat earth calculating the "Euclidean distance" in degrees, 1 degrees at equator 69 miles

def distance({cx, cy}, {nx, ny}) do
  :math.sqrt(:math.pow(abs(cx - nx), 2) + :math.pow(abs(cy - ny), 2))
end

Enter fullscreen mode Exit fullscreen mode

B) Correct and may be a litle slow way, due to many calculus are involved, apply ‘haversine’ formula from GeoCalc

def distance({cx, cy}, {nx, ny}) do  
  Geocalc.distance_between([cx, cy], [nx, ny])
end

Enter fullscreen mode Exit fullscreen mode

Devices Registry

It is necessary to store an keep a record ot all the connected devices, the right place is a Registry working as publisher subscriber. The PID and the location of every new connection is stored in the Registry, it will be showed bellow, in the Socket Handler.

Elixir Registry
The Registry is based in ETS (Erlang Term Storage).
Erlang Term Storage

The socket Handler

The start point to understand socket handler is the documentation:Cowboy WebSocket

defmodule IotDevicesApp.SocketHandlerLocation do
  @behaviour :cowboy_websocket
  @degrees  0.5
  @meters   1500

  # When a client open the connection, it send to us his latitude and longitude
  # javascript: websocket = new WebSocket("ws://192.168.1.109:5000/ws?lon=-73.985130&lat=40.758896");
  def init(request, _state) do   

    vars = URI.query_decoder(request.qs) |> Map.new()    

    lon_float = String.to_float(Map.get(vars, "lon", 0.0))
    lat_float = String.to_float(Map.get(vars, "lat", 0.0))
    {:cowboy_websocket, request, {lon_float, lat_float}, %{idle_timeout: :infinity}}
  end


  # Called once the connection has been upgraded to Websocket.

  def websocket_init(location) do
    IO.puts("-------------websocket_init--------------")
    # this function have different PID that the init() function    
    # for that reason the PID registration is here
    key = getZone(location)
    Registry.IotApp
    |> Registry.register(key, location)

    {:ok, {key, location}}
  end

  # websocket_handle called when:
  # javascript: websocket.send(JSON.stringify({action:"updlocation", lon:11, lat:22})
  # javascript: websocket.send(JSON.stringify({action:"status", ...status})
  # javascript: websocket.send(JSON.stringify({action:"wakeupneighbors"})

  def websocket_handle({:text, json}, state = {key, location}) do
    IO.puts("-------------websocket_handle--------------")

    IO.inspect(state, label: "state")

    payload = Jason.decode!(json)

    case payload["action"] do
      "status" -> {:reply, reportStatus(payload["status"]), state}
      "updlocation" -> updateLocation(self(), key, {payload["lon"], payload["lat"]})
      "wakeupneighbors" -> {:reply, wakeupNeighbors(key, location), state}
      _ -> {:ok, state}
    end
  end


  # Here broadcast happen
  # websocket_info: Will be called for every Erlang message received.
  # Trigger from  wakeupNeighbors()
  def websocket_info(info, state) do
    {:reply, {:text, info}, state}
  end

  #----------- Next are utilities functions ---------------------

  def reportStatus(status) do
    IO.puts("-------------reportStatus--------------")
    IO.inspect(status ,label: "status")
    # Here put your logic
    {:ok, str} = Jason.encode(%{"action"=>"ack"})
    {:text, str}
  end

  def updateLocation(pid, oldkey, newlocation) do
    IO.puts("-------------updateLocation--------------")
    IO.inspect({pid, self()}, label: "PID")
    # this function have same PID that the websocket_init() function
    # IO.inspect(newlocation ,label: "newlocation")
    newkey = getZone(newlocation)

    IO.inspect(Registry.values(Registry.IotApp, oldkey, self()), label: "old location")
    Registry.unregister(Registry.IotApp, oldkey)
    Registry.register(Registry.IotApp, newkey, newlocation)
    IO.inspect(Registry.values(Registry.IotApp, newkey, self()), label: "new location")
    {:ok, str} = Jason.encode(%{"action" => "ack"})
    {:reply, {:text, str}, {newkey, newlocation}}
  end


  def wakeupNeighbors(key, location) do
    # this function have same PID that the websocket_init() function
    IO.puts("-------------wakeupNeighbors--------------")
    {:ok, message} = Jason.encode(%{"action" => "report_status"})

    Registry.IotApp
    |> Registry.dispatch(key, fn entries ->
      for {pid, neighbor} <- entries do
        IO.inspect(neighbor, label: "neighbor location")

        if pid != self() && isNeighbour(location, neighbor) do
          IO.inspect(pid, label: "      send msg to")
          Process.send(pid, message, [])
        end
      end
    end)

    {:ok, str} = Jason.encode(%{"action" => "ack"})
    {:text, str}
  end

  def isNeighbour(location, neighbor) do
    # @degrees > distanceEuclidean(location,neighbor)
    @meters > distanceHaversine(location, neighbor)
  end

  def distanceEuclidean({cx, cy}, {nx, ny}) do
    :math.sqrt(:math.pow(abs(cx - nx), 2) + :math.pow(abs(cy - ny), 2))
  end

  def distanceHaversine({cx, cy}, {nx, ny}) do
    Geocalc.distance_between([cx, cy], [nx, ny])
  end

  def getZone({lon, lat}) do
    {:ok, coord} = Geocoder.call({lat, lon}, provider: Geocoder.Providers.OpenStreetMaps)

    # OpenStreetMaps is only for testing, it is limited to 1 request by second.

    key = "#{coord.location.state}, #{coord.location.city}"
    IO.inspect(key, label: "getZone")
    key
  end

end


Enter fullscreen mode Exit fullscreen mode

Top comments (0)