loading...

EdgeAI: Google Coral with Coral Environmental Sensors and TPU With NiFi and MiNiFi (Updated EFM)

tspannhw profile image Timothy Spann Originally published at datainmotion.dev on ・7 min read

EdgeAI: Google Coral with Coral Environmental Sensors and TPU With NiFi and MiNiFi

*Building MiNiFi IoT Apps with the new Cloudera EFM *

It is very easy to build a drag and drop EdgeAI application with EFM and then push to all your MiNiFi agents.

Cloudera Edge Management CEM-1.1.1

Download the newest CEM today!

https://www.cloudera.com/downloads/cdf/cem.html

https://docs.cloudera.com/cem/1.1.1/release-notes/topics/cem-whats-new.html

NiFi Flow Receiving From MiNiFi Java Agent

In a cluster in my CDP-DC Cluster I consume Kafka messages sent from my remote NiFi gateway to publish alerts to Kafka and push records to Apache HBase and Apache Kudu. We filter our data with Streaming SQL.

*We can use SQL to route, create aggregates like averages, chose a subset of fields and limit data returned. Using the power of Apache Calcite, Streaming SQL in NiFi is a game changer against Record Data Types including CSV, XML, Avro, Parquet, JSON and Grokable text. Read and write different formats and convert when your SQL is done. Or just to SELECT * FROM FLOWFILE to get everything. *

We can see this flow from Atlas as we trace the data lineage and provenance from Kafka topic.

We can search Atlas for Kafka Topics.

From coral Kafka topic to NiFi to Kudu.

Details on Coral Kafka Topic

Examining the Hive Metastore Data on the Coral Kudu Table

NiFi Flow Details in Atlas

Details on Alerts Topic

'

Statistics from Atlas

*See: * https://www.datainmotion.dev/2020/02/connecting-apache-nifi-to-apache-atlas.html

Example Web Camera Image

** Example JSON Record**

[{"cputemp":59,"id":"20200221190718_2632409e-f635-48e7-9f32-aa1333f3b8f9","temperature":"39.44","memory":91.1,"score_1":"0.29","starttime":"02/21/2020 14:07:13","label_1":"hair spray","tempf":"102.34","diskusage":"50373.5 MB","message":"Success","ambient_light":"329.92","host":"coralenv","cpu":34.1,"macaddress":"b8:27:eb:99:64:6b","pressure":"102.76","score_2":"0.14","ip":"127.0.1.1","te":"5.10","systemtime":"02/21/2020 14:07:18","label_2":"syringe","humidity":"10.21"}]

Querying Kudu results in Hue

Pushing Alerts to Slack from NiFi

I am running on Apache NiFi 1.11.1 and wanted to point out a new feature. Download flow: Will download the highlighted flow/pgroup as JSON.

Looking at NiFi counters to monitor progress:

We can see how easy it is to ingest IoT sensor data and run AI algorithms on Coral TPUs.

Shell (coralrun.sh)

p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 18.0px Menlo; color: #ffffff; background-color: #224fbc} span.s1 {font-variant-ligatures: no-common-ligatures}

!/bin/bash

DATE=$(date +"%Y-%m-%d_%H%M%S")

fswebcam -q -r 1280x720 /opt/demo/images/$DATE.jpg

python3 -W ignore /opt/demo/test.py --image /opt/demo/images/$DATE.jpg 2>/dev/null

Kudu Table DDL

https://github.com/tspannhw/table-ddl

Python 3 (test.py)

p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 18.0px Menlo; color: #ffffff; background-color: #224fbc} p.p2 {margin: 0.0px 0.0px 0.0px 0.0px; font: 18.0px Menlo; color: #ffffff; background-color: #224fbc; min-height: 21.0px} span.s1 {font-variant-ligatures: no-common-ligatures}

import time

import sys

import subprocess

import os

import base64

import uuid

import datetime

import traceback

import base64

import json

from time import gmtime, strftime

import math

import random, string

import time

import psutil

import uuid

from getmac import get_mac_address

from coral.enviro.board import EnviroBoard

from luma.core.render import canvas

from PIL import Image, ImageDraw, ImageFont

import os

import argparse

from edgetpu.classification.engine import ClassificationEngine

Importing socket library

import socket

start = time.time()

starttf = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S')

def ReadLabelFile(file_path):

with open(file\_path, 'r') as f:

    lines = f.readlines()

ret = {}

for line in lines:

    pair = line.strip().split(maxsplit=1)

    ret[int(pair[0])] = pair[1].strip()

return ret

Google Example Code

def update_display(display, msg):

with canvas(display) as draw:

    draw.text((0, 0), msg, fill='white')

def getCPUtemperature():

res = os.popen('vcgencmd measure\_temp').readline()

return(res.replace("temp=","").replace("'C\n",""))

Get MAC address of a local interfaces

def psutil_iface(iface):

# type: (str) -> Optional[str]

import psutil

nics = psutil.net\_if\_addrs()

if iface in nics:

    nic = nics[iface]

    for i in nic:

        if i.family == psutil.AF\_LINK:

            return i.address

/opt/demo/examples-camera/all_models

row = { }

try:

i = 1

while i == 1:

parser = argparse.ArgumentParser()

parser.add\_argument('--image', help='File path of the image to be recognized.', required=True)

args = parser.parse\_args()

# Prepare labels.

labels = ReadLabelFile('/opt/demo/examples-camera/all\_models/imagenet\_labels.txt')



# Initialize engine.

engine = ClassificationEngine('/opt/demo/examples-camera/all\_models/inception\_v4\_299\_quant\_edgetpu.tflite')



# Run inference.

img = Image.open(args.image)



scores = {}

kCount = 1



# Iterate Inference Results

for result in engine.ClassifyWithImage(img, top\_k=5):

    scores['label\_' + str(kCount)] = labels[result[0]]

    scores['score\_' + str(kCount)] = "{:.2f}".format(result[1])

    kCount = kCount + 1    



enviro = EnviroBoard()

host\_name = socket.gethostname()

host\_ip = socket.gethostbyname(host\_name)

cpuTemp=int(float(getCPUtemperature()))

uuid2 = '{0}\_{1}'.format(strftime("%Y%m%d%H%M%S",gmtime()),uuid.uuid4())

usage = psutil.disk\_usage("/")

end = time.time()

row.update(scores)

row['host'] = os.uname()[1]

row['ip'] = host\_ip

row['macaddress'] = psutil\_iface('wlan0')

row['cputemp'] = round(cpuTemp,2)

row['te'] = "{0:.2f}".format((end-start))

row['starttime'] = starttf

row['systemtime'] = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S')

row['cpu'] = psutil.cpu\_percent(interval=1)

row['diskusage'] = "{:.1f} MB".format(float(usage.free) / 1024 / 1024)

row['memory'] = psutil.virtual\_memory().percent

row['id'] = str(uuid2)

row['message'] = "Success"

row['temperature'] = '{0:.2f}'.format(enviro.temperature)

row['humidity'] = '{0:.2f}'.format(enviro.humidity)

row['tempf'] = '{0:.2f}'.format((enviro.temperature \* 1.8) + 32)    

row['ambient\_light'] = '{0}'.format(enviro.ambient\_light)

row['pressure'] = '{0:.2f}'.format(enviro.pressure)

msg = 'Temp: {0}'.format(row['temperature'])

msg += 'IP: {0}'.format(row['ip'])

update\_display(enviro.display, msg)

i = 2

except:

row['message'] = "Error"

print(json.dumps(row))

Source Code:

https://github.com/tspannhw/nifi-minifi-coral-env

Sensors / Devices / Hardware:

  • Humdity-HDC2010 humidity sensor
  • Light-OPT3002 ambient light sensor
  • Barometric-BMP280 barometric pressure sensor
  • PS3 Eye Camera and Microphone USB
  • Raspberry Pi 3B+
  • Google Coral Environmental Sensor Board
  • Google Coral USB Accelerator TPU

References:

Posted on by:

tspannhw profile

Timothy Spann

@tspannhw

I am a Principal Field Engineer for Data in Motion at Cloudera. I work with Apache NiFi, Apache Kafka, Apache Spark, Apache Flink, IoT, MXNet, DLJ.AI, Deep Learning, Machine Learning, Streaming...

Discussion

markdown guide