Is there an easy way to forward data for training via on-device WiFi?

My setup: I have a small embedded device with minimal connectivity, just a UART transmit pin that’s streaming accelerometer data. Currently I’m able to pair that UART with a USB-Serial converter, and use that with the data forwarder to capture data. But since it is motion data, I’d like to be able to be a bit more untethered about it.

My thinking: I’d like to plug that UART into an ESP32 dev board, and have something like the Data Forwarder running on that board to stream the accelerometer data straight to Edge Impulse without the need for a computer. I’m looking through the documentation and I can’t seem to find anything that supports this use case. Is this something that should be possible? Do you sense that it would it be a fool’s errand to try to implement something like this?

Hello @joeycastillo,

From your ESP32 dev board, you can forward your payloads using our Ingestion Service using POST requests.
You will need to use our Data Acquisition Format but it should not be an issue as it is pretty simple for accelerometer data.

Feel free to share your code with the community once you have something ready. It might be useful for other members :wink:

Regards,

Louis

1 Like

Thanks, this was exactly the documentation I needed! I ended up coding something quick in CircuitPython that ingests 250 data points from the UART and sends it over to Edge Impulse; it works like a charm. Here’s the code, in case anyone else finds it useful; it assumes you have your wifi and API keys in a file called secrets.py:

import board
import digitalio
import neopixel
import busio
import time
import ipaddress
import ssl
import wifi
import socketpool
import adafruit_requests
import circuitpython_hmac as hmac
import adafruit_hashlib as hashlib
import json

watch = busio.UART(board.TX, board.RX, baudrate=115200)
button = digitalio.DigitalInOut(board.BUTTON)
button.pull = digitalio.Pull.UP
pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.3, auto_write=True)

RED = (255, 0, 0)
YELLOW = (255, 150, 0)
GREEN = (0, 255, 0)
OFF = (0, 0, 0)

try:
    from secrets import secrets
except ImportError:
    print("WiFi secrets are kept in secrets.py, please add them there!")
    raise

print("Connecting to %s"%secrets["ssid"])
wifi.radio.connect(secrets["ssid"], secrets["password"])
print("Connected to %s!"%secrets["ssid"])
pool = socketpool.SocketPool(wifi.radio)
requests = adafruit_requests.Session(pool, ssl.create_default_context())

def log_data():
    line = ""
    data_points = []
    valid = False
    while True:
        data = watch.read(1)
        if data is not None:
            data_string = ''.join([chr(b) for b in data])
            line += data_string
            if data_string[0] == '\n':
                if valid:
                    axes = line.strip(' \n\x00').split(',')
                    try:
                        data_point = [(float(axis.strip()) * 9.8066) for axis in axes]
                    except:
                        print("glitch: bad format?")
                        data_points = [] # start over
                    if len(data_point) == 3:
                        data_points += [data_point]
                    else:
                        print("glitch: bad length?")
                        data_points = [] # start over
                else:
                    valid = True
                line = ""
        if len(data_points) >= 250:
            return data_points

def send_data(data_points):
    emptySignature = ''.join(['0'] * 64)
    data = {
        "protected": {
            "ver": "v1",
            "alg": "HS256",
            "iat": time.time()
        },
        "signature": emptySignature,
        "payload": {
            "device_name": "ac:87:a3:0a:2d:1b",
            "device_type": "SENSOR-WATCH-MOTION-V2",
            "interval_ms": 40,
            "sensors": [
                { "name": "accX", "units": "m/s2" },
                { "name": "accY", "units": "m/s2" },
                { "name": "accZ", "units": "m/s2" }
            ],
            "values": data_points
        }
    }

    # encode in JSON
    encoded = json.dumps(data)

    # sign message
    signature = hmac.new(bytes(secrets['hmac_key'], 'utf-8'), msg = encoded.encode('utf-8'), digestmod = hashlib.sha256).hexdigest()

    # set the signature again in the message, and encode again
    data['signature'] = signature
    encoded = json.dumps(data)
    # and upload the file
    res = requests.post(url='https://ingestion.edgeimpulse.com/api/training/data',
                        data=encoded,
                        headers={
                            'Content-Type': 'application/json',
                            'x-file-name': 'walking',
                            'x-api-key': secrets['api_key']
                        })
    if (res.status_code == 200):
        return True
    else:
        print('Failed with status', res.status_code, res.text)
        return False


pixel.fill(OFF)
while True:
    data = watch.read(1) # keep emptying the buffer
    if button.value == False:
        pixel.fill(YELLOW)  # Start moving here
        time.sleep(2.0)
        pixel.fill(GREEN)   # Green while logging data
        data_points = log_data()
        pixel.fill(YELLOW)  # Logging complete, yellow while sending to Edge Impulse
        if len(data_points) and send_data(data_points):
            pixel.fill(OFF) # Data sent successfully! 
        else:
            pixel.fill(RED) # There was a problem uploading the data; signal this error condition.

Hello @joeycastillo,

Great work, thank you for sharing!

Regards,

Louis