microPAM analysis#

This notebook contains a program to read and plot the microPAM data.

Python environment#

# setup environment
import os
import glob

import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.dates as dates

import scipy.signal as signal

import sounddevice as sd

from numba import jit

from datetime import datetime, timedelta

print("Date and time is:", datetime.now())

plt.rc('font', size=15)
mpl.rcParams['figure.figsize'] = (15,5)
Date and time is: 2024-03-18 12:53:05.469541

General utilities#

@jit(nopython=True,cache=True)
def quadInt(uu,imx,nd1,nc):
    ux=np.zeros((nc,2))
    for ii in range(nc):
        uo=uu[imx[ii],ii]
        um=uu[imx[ii]-1,ii]
        up=uu[imx[ii]+1,ii]

        b=(up+um-2*uo)/2;

        xo=(um-up)/(4*b);
        yo=uo-b*xo**2;

        xo += imx[ii]-nd1;
        ux[ii,0]=xo
        ux[ii,1]=yo
    return ux

@jit(nopython=True,cache=True)
def TKO(x):
    y=0*x
    y[1:-1,:]= x[1:-1,:]*x[1:-1,:]-x[:-2,:]*x[2:,:]
    return y


def mcmap():
    # https://stackoverflow.com/questions/49367144/modify-matplotlib-colormap
    # create a colormap that consists of
    # - 1/5 : custom colormap, ranging from white to the first color of the colormap
    # - 4/5 : existing colormap

    # set upper part: 4 * 256/4 entries
    upper = mpl.cm.jet(np.arange(256))

    # set lower part: 1 * 256/4 entries
    # - initialize all entries to 1 to make sure that the alpha channel (4th column) is 1
    lower = np.ones((int(256/4),4))
    # - modify the first three columns (RGB):
    #   range linearly between white (1,1,1) and the first color of the upper colormap
    for i in range(3):
        lower[:,i] = np.linspace(1, upper[0,i], lower.shape[0])

    # combine parts of colormap
    cmap = np.vstack(( lower, upper ))

    # convert to matplotlib colormap
    cmap = mpl.colors.ListedColormap(cmap, name='myColorMap', N=cmap.shape[0])
    return cmap

microPAM support functions#

def mDir(root,hh,f):
    # find file in path
    return glob.glob(root+hh+"_"+str(f[0])+'%02d'%f[1]+'%02d'%f[2]+
                     '\\%02d'%f[3]+
                     '\\F_%02d'%f[3]+'%02d*.bin'%f[4])

def loadData(fileName):
    xx = np.fromfile(fileName, dtype='uint32')
    vers=xx[5]
    SerNum=xx[6]
    fs = xx[7] # sampling frequency
    cp = xx[12] # data compress mode
    sh = xx[13] # shift in bits
    recTimeStamp=(xx[1:5].tobytes()).decode()
    return xx,fs,cp,sh,vers,SerNum,recTimeStamp
#
#--------------------------------------------------------
@jit(nopython=True,cache=True)
def decodeBlock(out,inp,nd,nb,NX):
    kk=0
    nx=NX
    for ii in range(nd):
        nx -= nb
        if nx>0:
            out[ii] = inp[kk]>>nx
        elif nx==0:
            out[ii] = inp[kk]
            kk += 1
            nx=NX
        elif nx <0:
            out[ii] = inp[kk] << (-nx)
            kk += 1
            nx += NX
            out[ii] |= (inp[kk]>>nx)
#
#-------------------- 16 bit decode -------------------------------------
@jit(nopython=True,cache=True)
def decodeData16(xx,idat,nd,nb):
    ic1=np.shape(idat)[0]
    nch=xx[16]
    nsamp=128
    NH=12        # header size

    tmp0 = np.empty(nch,dtype='uint16')
    tmp1 = np.empty(nch*nsamp,dtype='uint16')
    tmp2 = np.empty(nch*nsamp,dtype='uint16')

    tmp3 = np.empty(ic1*nch*nsamp,dtype='int16')

    tmp1[:]=0
    tmp3[:]=0

    # note:
    # nd1 contains raw data if nd2==0 or if first bolock
    # nd2 contains raw data
    kk=0
    # check if first block is continuation
    nd1=nd[:3]&0xffff
    nd2=nd[:3]>>16

    for ii in range(1,3):
        if nd1[ii]+nd1[ii-1]== nd2[ii]: break
    i0=ii-1
    print(i0)
    ic=0 
    for ii in range(i0,ic1):
        nd1=nd[ii]&0x0000ffff
        nd2=nd[ii]>>16
        j1=2*idat[ii]+NH
        j2=j1+nch
        ndx=nd1-nch
        #
        if nd2==0:
            tmp0=xx[j1:j2]
            tmp1[:]=0
            tmp1[:ndx]=xx[j2:j1+nd1]
            ic=0
        else:
            if nd1==0:
                tmp1[:]=0
                ic=-1
            elif nd1==nd2:
                tmp0=xx[j1:j2]
                tmp1[:]=0
                tmp1[:ndx]=xx[j2:j2+ndx]
                ic=0
            else:
                if ic==0:
                    tmp0=xx[j1:j2]
                    tmp1[:]=0
                    tmp1[:ndx]=xx[j2:j2+ndx]
                    ic=1
                else:
                    # have continuation
                    tmp1[nd2-nch-nd1:nd2-nch]=xx[j1:j1+nd1]
                    ic=0
        #
        if (ic==0):
            nbx=nb[ii]&0xffff
            nb1=1<<(nbx-1)
            nb2=1<<nbx
            msk=nb2-1
            #
            decodeBlock(tmp2,tmp1,nch*nsamp,nbx,16)
            #
            tmp4 = tmp2 & msk
            #
            sgn= (tmp4 & nb1) >0
            tmp4i = tmp4.astype(np.int16)
            tmp4i[sgn] -= nb2
            tmp4i[:nch]=tmp0.astype(np.int16)
            for jj in range(nch,nch*nsamp):
                tmp4i[jj] += tmp4i[jj-nch]
            #
            tmp3[kk*nch*nsamp:(kk+1)*nch*nsamp] = tmp4i
            kk += 1

    ndat=kk*nsamp
    tmp5 = tmp3[:ndat*nch].reshape(ndat,nch)
    return tmp5
#
def getData16(xx):
    idat=np.squeeze(np.where(xx==0xA5A5A5A5))
        
    nb=xx[idat+1]
    nd=xx[idat+5]
    yy=np.frombuffer(xx,np.uint16)
    tmp5 =decodeData16(yy,idat,nd,nb)
    
    tdx=xx[idat+3]
    iok=np.squeeze(np.where(tdx[1:]>tdx[:-1]))
    if nd[0]>0: iok = iok[1:]
    td2=tdx[iok]

    tdx=xx[idat+2]
    td1=tdx[iok]

    #
    return tmp5,td1,td2
#
#--------------------- 32 bit decode ---------------------------
@jit(nopython=True,cache=True)
def decodeData32(xx,idat,nd,nb):
    n1=np.shape(xx)[0]
    ic1=np.shape(idat)[0]
    nch=xx[8]
    nsamp=128
    NH=6        # header size 

    tmp0 = np.empty(nch,dtype='uint32')
    tmp1 = np.empty(nch*nsamp,dtype='uint32')
    tmp2 = np.empty(nch*nsamp,dtype='uint32')

    tmp3 = np.empty(ic1*nch*nsamp,dtype='int32')

    tmp1[:]=0
    tmp3[:]=0

    # note:
    # nd1 contains raw data if nd2==0 or if first bolock
    # nd2 contains raw data
    kk=0
    # check if first block is continuation
    nd1=nd[:3]&0xffff
    nd2=nd[:3]>>16

    for ii in range(1,3):
        if nd1[ii]+nd1[ii-1]== nd2[ii]: break
    i0=ii-1
    print(i0)
    ic=0 
    for ii in range(i0,ic1):
        nd1=nd[ii]&0x0000ffff
        nd2=nd[ii]>>16
        j1=idat[ii]+NH
        j2=j1+nch
        ndx=nd1-nch
        #
        if j1+nd1 >n1: break
        if nd2==0:
            tmp0=xx[j1:j2]
            tmp1[:]=0
            tmp1[:ndx]=xx[j2:j1+nd1]
            ic=0
        else:
            if nd1==0:
                tmp1[:]=0
                ic=-1
            elif nd1==nd2:
                tmp0=xx[j1:j2]
                tmp1[:]=0
                tmp1[:ndx]=xx[j2:j2+ndx]
                ic=0
            else:
                if ic==0:
                    tmp0=xx[j1:j2]
                    tmp1[:]=0
                    tmp1[:ndx]=xx[j2:j2+ndx]
                    ic=1
                else:
                    # have continuation
                    tmp1[nd2-nch-nd1:nd2-nch]=xx[j1:j1+nd1]
                    ic=0
        #
        if (ic==0):
            nbx=nb[ii]&0xffff
            nb1=1<<(nbx-1)
            nb2=1<<nbx
            msk=nb2-1
            #
            decodeBlock(tmp2,tmp1,nch*nsamp,nbx,32)
            #
            tmp4 = tmp2 & msk
            #
            sgn= (tmp4 & nb1) >0
            tmp4i = tmp4.astype(np.int32)
            tmp4i[sgn] -= nb2
            tmp4i[:nch]=tmp0.astype(np.int32)
            for jj in range(nch,nch*nsamp):
                tmp4i[jj] += tmp4i[jj-nch]
            #
            tmp3[kk*nch*nsamp:(kk+1)*nch*nsamp] = tmp4i
            kk += 1

    ndat=kk*nsamp
    tmp5 = tmp3[:ndat*nch].reshape(ndat,nch)
    return tmp5,i0
#   
def getData32(xx):
    idat=np.squeeze(np.where(xx==0xA5A5A5A5))
        
    nb=xx[idat+1]
    nd=xx[idat+5]
    yy=np.frombuffer(xx,np.uint32)
    tmp5,i0 =decodeData32(yy,idat,nd,nb)
    
    tdx=xx[idat+3]
    iok=np.squeeze(np.where(tdx[1:]>tdx[:-1]))
    iok = iok[i0:]
    td2=tdx[iok]

    tdx=xx[idat+2]
    td1=tdx[iok]

    #
    return tmp5,td1,td2

Example to load and visualize the microPAM data#

Load data#

root=".\\data\\"        # root directory of data
test=1

if test==0:
    HH="D1382ae"            # prefix of directory ("D") + Serial number of recorder in HEX ("1382ae")
    TSEL=[2023,3,8,12,39];  # time of first file (year,month,day,hour,minute)
elif test==1:
    HH="D0baa7e"            # prefix of directory ("D") + Serial number of recorder in HEX ("0baa7e")
    TSEL=[2023,4,7,17,40];  # time of first file (year,month,day,hour,minute)

N_num=1                 # number of files/minutes starting from TSEL

isel=TSEL.copy()
for jn in range(N_num):
    fileNames=mDir(root,HH,isel)
    for fileName in fileNames:
        print('File name: ',fileName)
        if os.path.getsize(fileName)==0: continue
        xx,fs,cp,sh,vers,SerNum,recTimeStamp = loadData(fileName)
        print(fs,cp,sh,vers,SerNum,recTimeStamp)

        if (vers>=10) & (vers<20):
            if cp==0:
                datx=np.frombuffer(xx[128:],np.int16)
            elif cp==1:
                datx,td1,td2=getData16(xx)
        elif vers==20:
            if cp==0:
                datx=np.frombuffer(xx[128:],np.int32)
            elif cp==1:
                datx,td1,td2=getData32(xx)

        # correct for bias
        bias=np.mean(datx)
        datx = datx - bias
        # time axis
        tt=np.arange(len(datx))/fs
        if 1:
            plt.plot(tt,datx)
            plt.grid(True)
            plt.show()

            f,Pxxf = signal.welch(datx,fs,nperseg=1024,axis=0)
            fig=plt.figure('figure.figsize',[15, 5])
            plt.plot(f,10*np.log10(Pxxf));
            plt.show()

    #update time vector for next minue
    t1=datetime(isel[0],isel[1],isel[2],isel[3],isel[4],0)+ timedelta(minutes=1)
    isel=np.array(t1.timetuple())[:5]
File name:  .\data\D0baa7e_20230407\17\F_174000.bin
44100 1 12 20 764542 20230407_174000
1
_images/1142258d7aebc9d688f188f2e76cf2835542ff025e5523f4bc8e2347d218ba04.png _images/0cfa38e4ec308ab5de9bf918d7f59f4c9439a7213bf03e07e2e5a419b700d5f2.png

Spectrogram#

xx,fs,cp,sh,vers,SerNum,recTimeStamp = loadData(fileName)
print(fs,cp,sh,vers,SerNum,recTimeStamp)

if (vers>=10) & (vers<20):
    scl=2**15
    if cp==0:
        datx=np.frombuffer(xx[128:],np.int16)
    elif cp==1:
        datx,td1,td2=getData16(xx)
elif vers==20:
    scl=2**31
    if cp==0:
        datx=np.frombuffer(xx[128:],np.int32)
    elif cp==1:
        datx,td1,td2=getData32(xx)

# correct for bias
bias=np.mean(datx)
datx = datx - bias
# time axis
tt=np.arange(len(datx))/fs

nw=256
nfft=1024
yy=datx/scl
f, t, Zxx = signal.stft(yy, fs,axis=0, nperseg=nw, nfft=nfft)
zz=20*np.log10(np.abs(np.squeeze(Zxx))) 
zm=np.max(zz)
[nf,nt]=np.shape(zz)

cm1=plt.cm.jet
cm2=plt.cm.viridis
cm3=mcmap()
cmx=cm3

# prepare plot
fig = plt.figure("figure.figsize",[15,10])
# for time series
ax1 = plt.axes([0.15, 0.71, 0.72, 0.24])
# for spectrogram
ax2 = plt.axes([0.15, 0.09, 0.9, 0.55], sharex=ax1)

# plot time series
p1 = ax1.plot(tt,yy,"k")
ax1.set_xlim(tt[0],tt[-1])
ax1.set_ylabel("Amplitude")
ax1.grid(True)

# plot spectrogram
im = ax2.imshow(zz, 
                extent=(0,(nt-1)/fs*nw/2,0,fs/2000),
                origin='lower', cmap=cmx, clim=(zm-70,zm),
                interpolation='nearest', aspect='auto')
ext = im.get_extent()
cbar = plt.colorbar(im, orientation='vertical',ax=ax2)
ax2.set_ylabel("Spectrum (kHz)")
ax2.set_xlabel("Time (s)");
plt.show();
44100 1 12 20 764542 20230407_1740001
_images/f07fc5e7b51cd43064386b4abcff691abe552fe2710d0005542adbf47ffdef37.png

Playing audio data#

# remove # from next line to play sound
#sd.play(yy, 44100)