Thanks, this was exactly the documentation I needed! I ended up coding something quick in CircuitPython that ingests 250 data points from the UART and sends it over to Edge Impulse; it works like a charm. Here’s the code, in case anyone else finds it useful; it assumes you have your wifi and API keys in a file called secrets.py
:
import board
import digitalio
import neopixel
import busio
import time
import ipaddress
import ssl
import wifi
import socketpool
import adafruit_requests
import circuitpython_hmac as hmac
import adafruit_hashlib as hashlib
import json
watch = busio.UART(board.TX, board.RX, baudrate=115200)
button = digitalio.DigitalInOut(board.BUTTON)
button.pull = digitalio.Pull.UP
pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.3, auto_write=True)
RED = (255, 0, 0)
YELLOW = (255, 150, 0)
GREEN = (0, 255, 0)
OFF = (0, 0, 0)
try:
from secrets import secrets
except ImportError:
print("WiFi secrets are kept in secrets.py, please add them there!")
raise
print("Connecting to %s"%secrets["ssid"])
wifi.radio.connect(secrets["ssid"], secrets["password"])
print("Connected to %s!"%secrets["ssid"])
pool = socketpool.SocketPool(wifi.radio)
requests = adafruit_requests.Session(pool, ssl.create_default_context())
def log_data():
line = ""
data_points = []
valid = False
while True:
data = watch.read(1)
if data is not None:
data_string = ''.join([chr(b) for b in data])
line += data_string
if data_string[0] == '\n':
if valid:
axes = line.strip(' \n\x00').split(',')
try:
data_point = [(float(axis.strip()) * 9.8066) for axis in axes]
except:
print("glitch: bad format?")
data_points = [] # start over
if len(data_point) == 3:
data_points += [data_point]
else:
print("glitch: bad length?")
data_points = [] # start over
else:
valid = True
line = ""
if len(data_points) >= 250:
return data_points
def send_data(data_points):
emptySignature = ''.join(['0'] * 64)
data = {
"protected": {
"ver": "v1",
"alg": "HS256",
"iat": time.time()
},
"signature": emptySignature,
"payload": {
"device_name": "ac:87:a3:0a:2d:1b",
"device_type": "SENSOR-WATCH-MOTION-V2",
"interval_ms": 40,
"sensors": [
{ "name": "accX", "units": "m/s2" },
{ "name": "accY", "units": "m/s2" },
{ "name": "accZ", "units": "m/s2" }
],
"values": data_points
}
}
# encode in JSON
encoded = json.dumps(data)
# sign message
signature = hmac.new(bytes(secrets['hmac_key'], 'utf-8'), msg = encoded.encode('utf-8'), digestmod = hashlib.sha256).hexdigest()
# set the signature again in the message, and encode again
data['signature'] = signature
encoded = json.dumps(data)
# and upload the file
res = requests.post(url='https://ingestion.edgeimpulse.com/api/training/data',
data=encoded,
headers={
'Content-Type': 'application/json',
'x-file-name': 'walking',
'x-api-key': secrets['api_key']
})
if (res.status_code == 200):
return True
else:
print('Failed with status', res.status_code, res.text)
return False
pixel.fill(OFF)
while True:
data = watch.read(1) # keep emptying the buffer
if button.value == False:
pixel.fill(YELLOW) # Start moving here
time.sleep(2.0)
pixel.fill(GREEN) # Green while logging data
data_points = log_data()
pixel.fill(YELLOW) # Logging complete, yellow while sending to Edge Impulse
if len(data_points) and send_data(data_points):
pixel.fill(OFF) # Data sent successfully!
else:
pixel.fill(RED) # There was a problem uploading the data; signal this error condition.