(SOLVED) Record audio with Arduino Nano 33 BLE Sense through terminal

Question/Issue: Hello, I am collecting sound or grain weevils in a soundproof controlled box using an Arduino Nano 33 BLE Sense. I was able to collect a few audio files using Edge Impulse Studio, but I have been experiencing writing failure after 2 to 3 and sometimes 4 recordings of 10 seconds. So, I have to disconnect the device and re-connect to continue with the data collection.

So, I decided to work on the PDM library in order to have audio signals and thereafter, I managed to create a Python script that will read audio data from the serial and save it into the directory I have specified. So far, I have generated audio files, but the quality is not as good as it is in Edge Impulse Studio. The purpose of this approach is to collect more audio data, which I could then upload to Edge Impulse Studio for model development.

I would like to seek your advice on my case and how I can improve the quality of my audio files.

Thank you.

Arduino Code:

#include <PDM.h>

// Default number of output channels
static const char channels = 1;

// default PCM output frequency
static const int frequency = 16000;

// Buffer size for reading samples
const int BUFFER_SIZE = 256;

// Buffer to read samples into, each sample is 16-bit
int16_t sampleBuffer[BUFFER_SIZE];

// Number of audio samples read
volatile int samplesRead;

void setup() {
  Serial.begin(9600);
  while (!Serial); // Wait for serial connection

  // Configure the data receive callback
  PDM.onReceive(onPDMdata);
// optionally set the gain, defaults to 20
// PDM.setGain(30);

  // Initialize PDM with:
  // - one channel (mono mode)
  // - a 22050 Hz sample rate (adjust as needed)
  if (!PDM.begin(channels, frequency)) {
    Serial.println("Failed to start PDM!");
    while (1);
  }

  // Adjust the gain as needed
  PDM.setGain(30);
}

void loop() {
  // Wait for samples to be read
  if (samplesRead) {
    // Send the audio data over the serial port as comma-separated values
    for (int i = 0; i < samplesRead; i++) {
      Serial.print(sampleBuffer[i]);
      if (i < samplesRead - 1) {
        Serial.print(",");
      }
    }
  Serial.print(",");
    // Clear the read count
    samplesRead = 0;
  }

  Serial.println();
}

/**
 * Callback function to process the data from the PDM microphone.
 * NOTE: This callback is executed as part of an ISR.
 * Therefore using `Serial` to print messages inside this function isn't supported.
 */
void onPDMdata() {
  // Query the number of available bytes
  int bytesAvailable = PDM.available();

  // Read into the sample buffer
  PDM.read(sampleBuffer, bytesAvailable);

  // 16-bit, 2 bytes per sample (little-endian)
  samplesRead = bytesAvailable / 2;
}

Python Script:

import argparse
import wave
import struct
import os
import serial
import serial.tools.list_ports
import time
import numpy as np
from scipy.signal import butter, sosfilt, lfilter, freqz

# Default values
DEFAULT_BAUD = 9600
DEFAULT_LABEL = 'audio'
SAMPLE_RATE = 16000  # Adjust as needed
CHUNK_DURATION = 10  # Duration of each audio chunk in seconds

# Bandpass filter parameters
LOWCUT = 3000  # Lower cutoff frequency (Hz)
HIGHCUT = 5000  # Higher cutoff frequency (Hz)
ORDER = 4  # Filter order

# Normalize cutoff frequencies
LOWCUT_NORM = LOWCUT / (SAMPLE_RATE / 2)
HIGHCUT_NORM = HIGHCUT / (SAMPLE_RATE / 2)
sos = butter(ORDER, [LOWCUT_NORM, HIGHCUT_NORM], btype='band', output='sos')

# Parse arguments
parser = argparse.ArgumentParser(description="Serial Audio Data Collection")
parser.add_argument('-p', '--port', dest='port', type=str, required=True, help="Serial port to connect to")
parser.add_argument('-b', '--baud', dest='baud', type=int, default=DEFAULT_BAUD, help="Baud rate (default = " + str(DEFAULT_BAUD) + ")")
parser.add_argument('-d', '--directory', dest='directory', type=str, default=".", help="Output directory for files (default =.)")
parser.add_argument('-l', '--label', dest='label', type=str, default=DEFAULT_LABEL, help="Label for files (default = " + DEFAULT_LABEL + ")")

args = parser.parse_args()
port = args.port
baud = args.baud
out_dir = args.directory
label = args.label

# Spectral subtraction parameters
OVERSUBTRACTION_FACTOR = 1.0  # Adjust as needed
NOISE_SAMPLE_LENGTH = 1000  # Length of the noise sample for noise spectrum estimation

# Notch filter parameters
NOTCH_FREQ = 3.0  # Frequency to be notched (e.g., 60 Hz for power line noise)
NOTCH_Q = 1.0  # Quality factor for the notch filter

# Wiener filter parameters
WIENER_WINDOW_LENGTH = 256  # Length of the window for Wiener filtering

def spectral_subtraction(signal, noise_mean):
    """
    Apply spectral subtraction to a signal using a given noise spectrum.

    Args:
        signal (np.ndarray): Input signal.
        noise_mean (np.ndarray): Noise spectrum estimate.

    Returns:
        np.ndarray: Signal with noise reduced by spectral subtraction.
    """
    signal_spectrum = np.fft.fft(signal)
    signal_magnitude = np.abs(signal_spectrum)

    # Spectral subtraction
    subtracted_magnitude = signal_magnitude**2 - OVERSUBTRACTION_FACTOR * noise_mean**2
    subtracted_magnitude = np.maximum(subtracted_magnitude, 0)  # Ensure non-negative values
    subtracted_magnitude = np.sqrt(subtracted_magnitude)

    # Reconstruct the signal with the subtracted spectrum
    subtracted_spectrum = subtracted_magnitude * np.exp(1j * np.angle(signal_spectrum))
    enhanced_signal = np.fft.ifft(subtracted_spectrum).real

    return enhanced_signal

def notch_filter(signal, notch_freq, notch_q, sample_rate=SAMPLE_RATE):
    """
    Apply a notch filter to remove a specific frequency component from the signal.

    Args:
        signal (np.ndarray): Input signal.
        notch_freq (float): Frequency to be notched (in Hz).
        notch_q (float): Quality factor for the notch filter.
        sample_rate (int): Sample rate of the signal (default: SAMPLE_RATE).

    Returns:
        np.ndarray: Signal with the notched frequency component removed.
    """
    notch_freq_norm = notch_freq / (sample_rate / 2)  # Normalize frequency
    notch_bandwidth = notch_freq_norm / notch_q  # Calculate the bandwidth
    low_cutoff = notch_freq_norm - notch_bandwidth / 2
    high_cutoff = notch_freq_norm + notch_bandwidth / 2
    b_notch, a_notch = butter(ORDER, [low_cutoff, high_cutoff], btype='bandstop', analog=False, output='ba')

    filtered_signal = lfilter(b_notch, a_notch, signal)

    return filtered_signal

def wiener_filter(signal, window_length=WIENER_WINDOW_LENGTH):
    """
    Apply a Wiener filter to the signal for noise reduction.

    Args:
        signal (np.ndarray): Input signal.
        window_length (int): Length of the window for Wiener filtering (default: WIENER_WINDOW_LENGTH).

    Returns:
        np.ndarray: Signal with noise reduced by Wiener filtering.
    """
    window_length = min(window_length, len(signal))
    padded_signal = np.pad(signal, (window_length // 2, window_length // 2), mode='constant')

    filtered_signal = np.zeros_like(signal)
    for i in range(len(signal)):
        window = padded_signal[i:i + window_length]
        window_mean = np.mean(window)
        filtered_signal[i] = window_mean

    return filtered_signal

# Print out available serial ports
print()
print("Available serial ports:")
available_ports = serial.tools.list_ports.comports()
for port, desc, hwid in sorted(available_ports):
    print(" {} : {} [{}]".format(port, desc, hwid))

# Parse arguments
args = parser.parse_args()
port = args.port
baud = args.baud
out_dir = args.directory
label = args.label

print(f"Connected to: {port} successfully")
# Configure serial port
ser = serial.Serial()
ser.port = port
ser.baudrate = baud

# Attempt to connect to the serial port
try:
    ser.open()
except Exception as e:
    print("ERROR:", e)
    exit()

# Make output directory
try:
    os.makedirs(out_dir)
except FileExistsError:
    pass

# Audio configuration
CHANNELS = 1
SAMPLE_WIDTH = 2  # 2 bytes per sample (16-bit)

try:
    while True:
        # Read data from the serial port for 10 seconds
        data_list = []
        start_time = time.time()
        while time.time() - start_time < CHUNK_DURATION:
            data_str = ser.readline().decode('utf-8').strip()
            if data_str:
                data_list.extend([int(x) for x in data_str.split(',') if x])

        # Apply bandpass filter
        data_filtered = sosfilt(sos, data_list)

        # Normalize the audio data
        data_normalized = (data_filtered - np.min(data_filtered)) / (np.max(data_filtered) - np.min(data_filtered))

        # Estimate the noise spectrum from the first few samples
        noise_sample = data_normalized[:NOISE_SAMPLE_LENGTH]
        noise_mean = np.mean(np.abs(np.fft.fft(noise_sample))**2)

        # Apply spectral subtraction
        data_enhanced = spectral_subtraction(data_normalized, noise_mean)

        # Apply notch filter
        data_enhanced = notch_filter(data_enhanced, NOTCH_FREQ, NOTCH_Q)

        # Apply Wiener filter
        #data_enhanced = wiener_filter(data_enhanced)

        # Pack the enhanced data into a byte string
        #data_bytes = struct.pack(f'<{len(data_enhanced)}h', *(int(sample * 32767) for sample in data_enhanced))

        # Clip the enhanced data to the valid range
        data_enhanced = np.clip(data_enhanced, -1, 1)

        # Pack the clipped data into a byte string
        data_bytes = struct.pack(f'<{len(data_enhanced)}h', *(int(sample * 32767) for sample in data_enhanced))

        # Get the current timestamp
        import datetime
        timestamp = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')

        # Create a unique filename for the WAV file
        import uuid
        uid = str(uuid.uuid4())[-12:]
        filename = f"{label}.{uid}.{timestamp}.wav"
        audio_path = os.path.join(out_dir, filename)

        # Open the WAV file and write the data
        with wave.open(audio_path, 'w') as wav_file:
            wav_file.setparams((CHANNELS, SAMPLE_WIDTH, SAMPLE_RATE, len(data_normalized), 'NONE', 'NONE'))
            wav_file.writeframes(data_bytes)

        print(f'Audio data saved to {audio_path}')

        # Calculate elapsed time for this recording
        elapsed_time = time.time() - start_time
        if elapsed_time < CHUNK_DURATION:
            time.sleep(CHUNK_DURATION - elapsed_time)  # Wait for remaining time if needed

except KeyboardInterrupt:
    pass

finally:
    # Close the serial port
    ser.close()

Hello,
I have managed to work with the PDM library and I adjusted a little bit of the code and now I have managed to get clear data from the Arduino Nano 33 BLE Sense. In case you are reading this reply to me I advise you to make some adjustments related to your requirements and even modify the code for more better results. This idea came from this comment from the Arduino forum Arduino Forum by plengqui and at last I managed to work on the suggestion provided and I have the code working.

I hope this gonna help someone on the way.

Arduino Code Settings:
short transmitBuffer[4096];
PDM.setGain(80);
short sampleBuffer[16000];

Python Code Settings:
CHUNK_SIZE = 16000 # Number of samples to read at a time
RECORD_DURATION = 2 # Duration of each audio file in seconds (“I am recording 2 seconds Audio for each sample”)

Explanation:

  • Increasing the size of the transmitBuffer to 4096 should help accommodate more samples before sending them over the serial connection, reducing the likelihood of data loss or corruption.

  • Setting PDM.setGain(80) with a higher gain value of 80 can improve the sensitivity of the microphone, making it easier to capture quieter sounds like human voice.

  • Increasing the size of the sampleBuffer to 16000 provides a larger buffer for storing audio samples, which can be beneficial for capturing longer audio clips or reducing the risk of buffer overflows.

  • Setting CHUNK_SIZE = 16000 in the Python script matches the size of the sampleBuffer in the Arduino code, ensuring that the entire buffer is read and processed in each iteration.

Arduino Code (Full):

/* 
Code by: Ipyana Issah Mwaisekwa
*/

#include <PDM.h>

// default number of output channels
static const char channels = 1;

// default PCM output frequency
static const int frequency = 16000;

// Buffer to read samples into, each sample is 16-bits
short sampleBuffer[16000];

// Number of audio samples read
volatile int samplesRead;

// Buffer to accumulate samples before sending over serial
short transmitBuffer[4096];
int transmitBufferIndex = 0;

void setup() {
  Serial.begin(115200);  // Use a higher baud rate for faster data transfer

  // Configure the data receive callback
  PDM.onReceive(onPDMdata);

  // Initialize PDM with:
  // - one channel (mono mode)
  // - a 16 kHz sample rate for the Arduino Nano 33 BLE Sense
  if (!PDM.begin(channels, frequency)) {
    Serial.println("Failed to start PDM!");
    while (1);
  }
  // Optionally set the gain, defaults to 20
  PDM.setGain(80);
}

void loop() {
  // Check if there are samples in the transmit buffer
  if (transmitBufferIndex > 0) {
    // Send the transmit buffer over the serial connection
    Serial.write((byte*)transmitBuffer, transmitBufferIndex * sizeof(short));

    // Clear the transmit buffer index
    transmitBufferIndex = 0;
  }
}

/**
 * Callback function to process the data from the PDM microphone.
 * NOTE: This callback is executed as part of an ISR.
 * Therefore using `Serial` to print messages inside this function isn't supported.
 */
void onPDMdata() {
  // Query the number of available bytes
  int bytesAvailable = PDM.available();

  // Read into the sample buffer
  PDM.read(sampleBuffer, bytesAvailable);

  // 16-bit, 2 bytes per sample
  samplesRead = bytesAvailable / 2;

  // Copy the samples to the transmit buffer
  for (int i = 0; i < samplesRead; i++) {
    transmitBuffer[transmitBufferIndex++] = sampleBuffer[i];

    // Check if the transmit buffer is full
    if (transmitBufferIndex == sizeof(transmitBuffer) / sizeof(short)) {
      // Send the transmit buffer over the serial connection
      Serial.write((byte*)transmitBuffer, sizeof(transmitBuffer));

      // Clear the transmit buffer index
      transmitBufferIndex = 0;
    }
  }
}

Python Code:


/* 
Code by: Ipyana Issah Mwaisekwa
*/

import serial
import numpy as np
import wave
import struct
import argparse
import serial.tools.list_ports
import os
import datetime
import uuid
from scipy.signal import butter, lfilter

# Default values
DEFAULT_BAUD = 115200  # Match the baud rate with the Arduino code
DEFAULT_LABEL = "audio"

# Parse arguments
parser = argparse.ArgumentParser(description="Serial Audio Data Collection")
parser.add_argument('-p', '--port', dest='port', type=str, required=True, help="Serial port to connect to")
parser.add_argument('-b', '--baud', dest='baud', type=int, default=DEFAULT_BAUD, help="Baud rate (default = " + str(DEFAULT_BAUD) + ")")
parser.add_argument('-d', '--directory', dest='directory', type=str, default=".", help="Output directory for files (default =.)")
parser.add_argument('-l', '--label', dest='label', type=str, default=DEFAULT_LABEL, help="Label for files (default = " + DEFAULT_LABEL + ")")

# Print out available serial ports
print()
print("Available serial ports:")
available_ports = serial.tools.list_ports.comports()
for port, desc, hwid in sorted(available_ports):
    print(" {} : {} [{}]".format(port, desc, hwid))

# Parse arguments
args = parser.parse_args()
port = args.port
baud = args.baud
out_dir = args.directory
label = args.label

print(f"Connected to: {port} successfully")

# Configure serial port
ser = serial.Serial()
ser.port = port
ser.baudrate = baud

# Attempt to connect to the serial port
try:
    ser.open()
except Exception as e:
    print("ERROR:", e)
    exit()

# Make output directory
try:
    os.makedirs(out_dir)
except FileExistsError:
    pass

# Audio configuration
CHANNELS = 1
SAMPLE_WIDTH = 2  # 16-bit samples
SAMPLE_RATE = 16000
CHUNK_SIZE = 16000  # Number of samples to read at a time
RECORD_DURATION = 2  # Duration of each audio file in seconds

# Butterworth filter parameters
CUTOFF_LOW = 5000  # Low-pass cutoff frequency in Hz
ORDER_LOW = 4  # Low-pass filter order
CUTOFF_HIGH = 200  # High-pass cutoff frequency in Hz
ORDER_HIGH = 2  # High-pass filter order

# Noise gating parameters
NOISE_GATE_THRESHOLD = 300  # Adjust this value as needed

# Create Butterworth filters
nyquist_freq = 0.5 * SAMPLE_RATE
normalized_cutoff_low = CUTOFF_LOW / nyquist_freq
normalized_cutoff_high = CUTOFF_HIGH / nyquist_freq
b_low, a_low = butter(ORDER_LOW, normalized_cutoff_low, btype='low', analog=False)
b_high, a_high = butter(ORDER_HIGH, normalized_cutoff_high, btype='high', analog=False)

# Function to amplify the audio data
def amplify(data, gain):
    return data * gain

def main():
    buffer = []

    while True:
        # Read a chunk of audio data from the serial port
        data = ser.read(CHUNK_SIZE * SAMPLE_WIDTH)
        if not data:
            continue

        # Convert the raw data to 16-bit samples
        samples = np.frombuffer(data, dtype=np.int16)

        # Apply low-pass and high-pass filtering
        filtered_samples = lfilter(b_low, a_low, samples)
        filtered_samples = lfilter(b_high, a_high, filtered_samples)

        # Apply noise gating
        filtered_samples[np.abs(filtered_samples) < NOISE_GATE_THRESHOLD] = 0

        # Amplify the audio data
        amplified_samples = amplify(filtered_samples, 2)  # Adjust the gain as needed

        buffer.extend(amplified_samples.astype(np.int16))  # Convert to int16 before appending

        # Check if we have enough samples for a 2-second audio file
        if len(buffer) >= SAMPLE_RATE * RECORD_DURATION * CHANNELS:
            # Generate a unique filename for the audio file
            timestamp = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
            uid = str(uuid.uuid4())[-12:]
            filename = f"{label}.{uid}.{timestamp}.wav"
            audio_path = os.path.join(out_dir, filename)

            # Save the audio data to a WAV file
            buffer_array = np.asarray(buffer, dtype=np.int16)  # Convert buffer to an array of int16
            with wave.open(audio_path, "w") as wavefile:
                wavefile.setparams((CHANNELS, SAMPLE_WIDTH, SAMPLE_RATE, len(buffer_array), "NONE", "NONE"))
                wav_data = struct.pack("<" + ("h" * len(buffer_array)), *buffer_array)
                wavefile.writeframes(wav_data)

            print(f"Audio file '{audio_path}' saved.")
            buffer = []  # Clear the buffer after saving the audio file

if __name__ == "__main__":
    main()

To run the python script, you will need to open your terminal and run the following command and provide information required:

python Filename.py -p COM Port -d /path/to/output/directory -l labelName

The audio files in WAV format will be saved in the directory path you have stated on the terminal.

In case of any issues, I am here to help more.

1 Like