microPAM analysis#
This notebook contains a program to read and plot the microPAM data.
Python environment#
# setup environment
import os
import glob
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.dates as dates
import scipy.signal as signal
import sounddevice as sd
from numba import jit
from datetime import datetime, timedelta
print("Date and time is:", datetime.now())
plt.rc('font', size=15)
mpl.rcParams['figure.figsize'] = (15,5)
Date and time is: 2024-03-18 12:53:05.469541
General utilities#
@jit(nopython=True,cache=True)
def quadInt(uu,imx,nd1,nc):
ux=np.zeros((nc,2))
for ii in range(nc):
uo=uu[imx[ii],ii]
um=uu[imx[ii]-1,ii]
up=uu[imx[ii]+1,ii]
b=(up+um-2*uo)/2;
xo=(um-up)/(4*b);
yo=uo-b*xo**2;
xo += imx[ii]-nd1;
ux[ii,0]=xo
ux[ii,1]=yo
return ux
@jit(nopython=True,cache=True)
def TKO(x):
y=0*x
y[1:-1,:]= x[1:-1,:]*x[1:-1,:]-x[:-2,:]*x[2:,:]
return y
def mcmap():
# https://stackoverflow.com/questions/49367144/modify-matplotlib-colormap
# create a colormap that consists of
# - 1/5 : custom colormap, ranging from white to the first color of the colormap
# - 4/5 : existing colormap
# set upper part: 4 * 256/4 entries
upper = mpl.cm.jet(np.arange(256))
# set lower part: 1 * 256/4 entries
# - initialize all entries to 1 to make sure that the alpha channel (4th column) is 1
lower = np.ones((int(256/4),4))
# - modify the first three columns (RGB):
# range linearly between white (1,1,1) and the first color of the upper colormap
for i in range(3):
lower[:,i] = np.linspace(1, upper[0,i], lower.shape[0])
# combine parts of colormap
cmap = np.vstack(( lower, upper ))
# convert to matplotlib colormap
cmap = mpl.colors.ListedColormap(cmap, name='myColorMap', N=cmap.shape[0])
return cmap
microPAM support functions#
def mDir(root,hh,f):
# find file in path
return glob.glob(root+hh+"_"+str(f[0])+'%02d'%f[1]+'%02d'%f[2]+
'\\%02d'%f[3]+
'\\F_%02d'%f[3]+'%02d*.bin'%f[4])
def loadData(fileName):
xx = np.fromfile(fileName, dtype='uint32')
vers=xx[5]
SerNum=xx[6]
fs = xx[7] # sampling frequency
cp = xx[12] # data compress mode
sh = xx[13] # shift in bits
recTimeStamp=(xx[1:5].tobytes()).decode()
return xx,fs,cp,sh,vers,SerNum,recTimeStamp
#
#--------------------------------------------------------
@jit(nopython=True,cache=True)
def decodeBlock(out,inp,nd,nb,NX):
kk=0
nx=NX
for ii in range(nd):
nx -= nb
if nx>0:
out[ii] = inp[kk]>>nx
elif nx==0:
out[ii] = inp[kk]
kk += 1
nx=NX
elif nx <0:
out[ii] = inp[kk] << (-nx)
kk += 1
nx += NX
out[ii] |= (inp[kk]>>nx)
#
#-------------------- 16 bit decode -------------------------------------
@jit(nopython=True,cache=True)
def decodeData16(xx,idat,nd,nb):
ic1=np.shape(idat)[0]
nch=xx[16]
nsamp=128
NH=12 # header size
tmp0 = np.empty(nch,dtype='uint16')
tmp1 = np.empty(nch*nsamp,dtype='uint16')
tmp2 = np.empty(nch*nsamp,dtype='uint16')
tmp3 = np.empty(ic1*nch*nsamp,dtype='int16')
tmp1[:]=0
tmp3[:]=0
# note:
# nd1 contains raw data if nd2==0 or if first bolock
# nd2 contains raw data
kk=0
# check if first block is continuation
nd1=nd[:3]&0xffff
nd2=nd[:3]>>16
for ii in range(1,3):
if nd1[ii]+nd1[ii-1]== nd2[ii]: break
i0=ii-1
print(i0)
ic=0
for ii in range(i0,ic1):
nd1=nd[ii]&0x0000ffff
nd2=nd[ii]>>16
j1=2*idat[ii]+NH
j2=j1+nch
ndx=nd1-nch
#
if nd2==0:
tmp0=xx[j1:j2]
tmp1[:]=0
tmp1[:ndx]=xx[j2:j1+nd1]
ic=0
else:
if nd1==0:
tmp1[:]=0
ic=-1
elif nd1==nd2:
tmp0=xx[j1:j2]
tmp1[:]=0
tmp1[:ndx]=xx[j2:j2+ndx]
ic=0
else:
if ic==0:
tmp0=xx[j1:j2]
tmp1[:]=0
tmp1[:ndx]=xx[j2:j2+ndx]
ic=1
else:
# have continuation
tmp1[nd2-nch-nd1:nd2-nch]=xx[j1:j1+nd1]
ic=0
#
if (ic==0):
nbx=nb[ii]&0xffff
nb1=1<<(nbx-1)
nb2=1<<nbx
msk=nb2-1
#
decodeBlock(tmp2,tmp1,nch*nsamp,nbx,16)
#
tmp4 = tmp2 & msk
#
sgn= (tmp4 & nb1) >0
tmp4i = tmp4.astype(np.int16)
tmp4i[sgn] -= nb2
tmp4i[:nch]=tmp0.astype(np.int16)
for jj in range(nch,nch*nsamp):
tmp4i[jj] += tmp4i[jj-nch]
#
tmp3[kk*nch*nsamp:(kk+1)*nch*nsamp] = tmp4i
kk += 1
ndat=kk*nsamp
tmp5 = tmp3[:ndat*nch].reshape(ndat,nch)
return tmp5
#
def getData16(xx):
idat=np.squeeze(np.where(xx==0xA5A5A5A5))
nb=xx[idat+1]
nd=xx[idat+5]
yy=np.frombuffer(xx,np.uint16)
tmp5 =decodeData16(yy,idat,nd,nb)
tdx=xx[idat+3]
iok=np.squeeze(np.where(tdx[1:]>tdx[:-1]))
if nd[0]>0: iok = iok[1:]
td2=tdx[iok]
tdx=xx[idat+2]
td1=tdx[iok]
#
return tmp5,td1,td2
#
#--------------------- 32 bit decode ---------------------------
@jit(nopython=True,cache=True)
def decodeData32(xx,idat,nd,nb):
n1=np.shape(xx)[0]
ic1=np.shape(idat)[0]
nch=xx[8]
nsamp=128
NH=6 # header size
tmp0 = np.empty(nch,dtype='uint32')
tmp1 = np.empty(nch*nsamp,dtype='uint32')
tmp2 = np.empty(nch*nsamp,dtype='uint32')
tmp3 = np.empty(ic1*nch*nsamp,dtype='int32')
tmp1[:]=0
tmp3[:]=0
# note:
# nd1 contains raw data if nd2==0 or if first bolock
# nd2 contains raw data
kk=0
# check if first block is continuation
nd1=nd[:3]&0xffff
nd2=nd[:3]>>16
for ii in range(1,3):
if nd1[ii]+nd1[ii-1]== nd2[ii]: break
i0=ii-1
print(i0)
ic=0
for ii in range(i0,ic1):
nd1=nd[ii]&0x0000ffff
nd2=nd[ii]>>16
j1=idat[ii]+NH
j2=j1+nch
ndx=nd1-nch
#
if j1+nd1 >n1: break
if nd2==0:
tmp0=xx[j1:j2]
tmp1[:]=0
tmp1[:ndx]=xx[j2:j1+nd1]
ic=0
else:
if nd1==0:
tmp1[:]=0
ic=-1
elif nd1==nd2:
tmp0=xx[j1:j2]
tmp1[:]=0
tmp1[:ndx]=xx[j2:j2+ndx]
ic=0
else:
if ic==0:
tmp0=xx[j1:j2]
tmp1[:]=0
tmp1[:ndx]=xx[j2:j2+ndx]
ic=1
else:
# have continuation
tmp1[nd2-nch-nd1:nd2-nch]=xx[j1:j1+nd1]
ic=0
#
if (ic==0):
nbx=nb[ii]&0xffff
nb1=1<<(nbx-1)
nb2=1<<nbx
msk=nb2-1
#
decodeBlock(tmp2,tmp1,nch*nsamp,nbx,32)
#
tmp4 = tmp2 & msk
#
sgn= (tmp4 & nb1) >0
tmp4i = tmp4.astype(np.int32)
tmp4i[sgn] -= nb2
tmp4i[:nch]=tmp0.astype(np.int32)
for jj in range(nch,nch*nsamp):
tmp4i[jj] += tmp4i[jj-nch]
#
tmp3[kk*nch*nsamp:(kk+1)*nch*nsamp] = tmp4i
kk += 1
ndat=kk*nsamp
tmp5 = tmp3[:ndat*nch].reshape(ndat,nch)
return tmp5,i0
#
def getData32(xx):
idat=np.squeeze(np.where(xx==0xA5A5A5A5))
nb=xx[idat+1]
nd=xx[idat+5]
yy=np.frombuffer(xx,np.uint32)
tmp5,i0 =decodeData32(yy,idat,nd,nb)
tdx=xx[idat+3]
iok=np.squeeze(np.where(tdx[1:]>tdx[:-1]))
iok = iok[i0:]
td2=tdx[iok]
tdx=xx[idat+2]
td1=tdx[iok]
#
return tmp5,td1,td2
Example to load and visualize the microPAM data#
Load data#
root=".\\data\\" # root directory of data
test=1
if test==0:
HH="D1382ae" # prefix of directory ("D") + Serial number of recorder in HEX ("1382ae")
TSEL=[2023,3,8,12,39]; # time of first file (year,month,day,hour,minute)
elif test==1:
HH="D0baa7e" # prefix of directory ("D") + Serial number of recorder in HEX ("0baa7e")
TSEL=[2023,4,7,17,40]; # time of first file (year,month,day,hour,minute)
N_num=1 # number of files/minutes starting from TSEL
isel=TSEL.copy()
for jn in range(N_num):
fileNames=mDir(root,HH,isel)
for fileName in fileNames:
print('File name: ',fileName)
if os.path.getsize(fileName)==0: continue
xx,fs,cp,sh,vers,SerNum,recTimeStamp = loadData(fileName)
print(fs,cp,sh,vers,SerNum,recTimeStamp)
if (vers>=10) & (vers<20):
if cp==0:
datx=np.frombuffer(xx[128:],np.int16)
elif cp==1:
datx,td1,td2=getData16(xx)
elif vers==20:
if cp==0:
datx=np.frombuffer(xx[128:],np.int32)
elif cp==1:
datx,td1,td2=getData32(xx)
# correct for bias
bias=np.mean(datx)
datx = datx - bias
# time axis
tt=np.arange(len(datx))/fs
if 1:
plt.plot(tt,datx)
plt.grid(True)
plt.show()
f,Pxxf = signal.welch(datx,fs,nperseg=1024,axis=0)
fig=plt.figure('figure.figsize',[15, 5])
plt.plot(f,10*np.log10(Pxxf));
plt.show()
#update time vector for next minue
t1=datetime(isel[0],isel[1],isel[2],isel[3],isel[4],0)+ timedelta(minutes=1)
isel=np.array(t1.timetuple())[:5]
File name: .\data\D0baa7e_20230407\17\F_174000.bin 44100 1 12 20 764542 20230407_174000
1
Spectrogram#
xx,fs,cp,sh,vers,SerNum,recTimeStamp = loadData(fileName)
print(fs,cp,sh,vers,SerNum,recTimeStamp)
if (vers>=10) & (vers<20):
scl=2**15
if cp==0:
datx=np.frombuffer(xx[128:],np.int16)
elif cp==1:
datx,td1,td2=getData16(xx)
elif vers==20:
scl=2**31
if cp==0:
datx=np.frombuffer(xx[128:],np.int32)
elif cp==1:
datx,td1,td2=getData32(xx)
# correct for bias
bias=np.mean(datx)
datx = datx - bias
# time axis
tt=np.arange(len(datx))/fs
nw=256
nfft=1024
yy=datx/scl
f, t, Zxx = signal.stft(yy, fs,axis=0, nperseg=nw, nfft=nfft)
zz=20*np.log10(np.abs(np.squeeze(Zxx)))
zm=np.max(zz)
[nf,nt]=np.shape(zz)
cm1=plt.cm.jet
cm2=plt.cm.viridis
cm3=mcmap()
cmx=cm3
# prepare plot
fig = plt.figure("figure.figsize",[15,10])
# for time series
ax1 = plt.axes([0.15, 0.71, 0.72, 0.24])
# for spectrogram
ax2 = plt.axes([0.15, 0.09, 0.9, 0.55], sharex=ax1)
# plot time series
p1 = ax1.plot(tt,yy,"k")
ax1.set_xlim(tt[0],tt[-1])
ax1.set_ylabel("Amplitude")
ax1.grid(True)
# plot spectrogram
im = ax2.imshow(zz,
extent=(0,(nt-1)/fs*nw/2,0,fs/2000),
origin='lower', cmap=cmx, clim=(zm-70,zm),
interpolation='nearest', aspect='auto')
ext = im.get_extent()
cbar = plt.colorbar(im, orientation='vertical',ax=ax2)
ax2.set_ylabel("Spectrum (kHz)")
ax2.set_xlabel("Time (s)");
plt.show();
44100 1 12 20 764542 20230407_1740001
Playing audio data#
# remove # from next line to play sound
#sd.play(yy, 44100)