microPAM convert to wav#
The following converts the microPAM data to wav files
# setup environment
import os
import numpy as np
import numba as nb
from numba import jit
from datetime import datetime, timedelta
print("Date and time is:", datetime.now())
import tkinter as tk
from tkinter import filedialog
root = tk.Tk()
root.withdraw()
import wavio as wv
Date and time is: 2023-09-06 20:10:22.206703
def loadData(fileName):
xx = np.fromfile(fileName, dtype='uint32')
vers=xx[5]
SerNum=xx[6]
fs = xx[7] # sampling frequency
cp = xx[12] # data compress mode
sh = xx[13] # shift in bits
recTimeStamp=(xx[1:5].tobytes()).decode()
return xx,fs,cp,sh,vers,SerNum,recTimeStamp
#
#--------------------------------------------------------
@jit(nopython=True,cache=True)
def decodeBlock(out,inp,nd,nb,NX):
kk=0
nx=NX
for ii in range(nd):
nx -= nb
if nx>0:
out[ii] = inp[kk]>>nx
elif nx==0:
out[ii] = inp[kk]
kk += 1
nx=NX
elif nx <0:
out[ii] = inp[kk] << (-nx)
kk += 1
nx += NX
out[ii] |= (inp[kk]>>nx)
#
#-------------------- 16 bit decode -------------------------------------
@jit(nopython=True,cache=True)
def decodeData16(xx,idat,nd,nb,tmp0,tmp1,ifl):
ic1=np.shape(idat)[0]
nch=xx[16]
nsamp=128
NH=12 # header size
tmp2 = np.empty(nch*nsamp,dtype='uint16')
tmp3 = np.empty(ic1*nch*nsamp,dtype='int16')
tmp1[:]=0
tmp3[:]=0
ic=1 # we may have a continuation stored in attribute values
#
if ifl & ((nd[0]>>16)>0) & ((nd[1]>>16)==0):
i0=1
else:
i0=0
kk=0
for ii in range(i0,ic1):
nd1=nd[ii]&0x0000ffff
nd2=nd[ii]>>16
j1=2*idat[ii]+NH
j2=j1+nch
ndx=nd1-nch
#
if nd2==0:
tmp0=xx[j1:j2]
tmp1[:]=0
tmp1[:ndx]=xx[j2:j1+nd1]
ic=0
else:
if nd1==0:
tmp1[:]=0
ic=-1
elif nd1==nd2:
tmp0=xx[j1:j2]
tmp1[:]=0
tmp1[:ndx]=xx[j2:j2+ndx]
ic=0
else:
# block with continuation
if ifl: # should be first block
continue # have no continuation stored
elif ic==0: # should be first of two continued blocks
# store data
tmp1[:]=0
if nd1>nch:
tmp0 = xx[j1:j2]
tmp1[:ndx]=xx[j2:j2+ndx]
else:
tmp0[:nd1] = xx[j1:j1+nd1]
ic=1
else:
# have continuation
# add actual data
if nd1<nd2-nch:
tmp1[nd2-nch-nd1:nd2-nch]=xx[j1:j1+nd1]
else:
tmp0[nd2-nd1:]=xx[j1:j1+(nch+nd1-nd2)]
tmp1[:nd2-nch]=xx[j1+(nch+nd1-nd2):j1+nd1]
ic=0
#
if (ic==0):
nbx=nb[ii]&0xffff
nb1=1<<(nbx-1)
nb2=1<<nbx
msk=nb2-1
#
decodeBlock(tmp2,tmp1,nch*nsamp,nbx,16)
#
tmp4 = tmp2 & msk
#
sgn= (tmp4 & nb1) >0
tmp4i = tmp4.astype(np.int16)
tmp4i[sgn] -= nb2
tmp4i[:nch]=tmp0.astype(np.int16)
for jj in range(nch,nch*nsamp):
tmp4i[jj] += tmp4i[jj-nch]
#
tmp3[kk*nch*nsamp:(kk+1)*nch*nsamp] = tmp4i
kk += 1
ndat=kk*nsamp
tmp5 = tmp3[:ndat*nch].reshape(ndat,nch)
return tmp5,i0
#
def getData16(xx,tmp0,tmp1,ifl):
idat=np.squeeze(np.where(xx==0xA5A5A5A5))
nb=xx[idat+1]
nd=xx[idat+5]
yy=np.frombuffer(xx,np.uint16)
tmp5,i0 =decodeData16(yy,idat,nd,nb,tmp0,tmp1,ifl)
tdx=xx[idat+3]
iok=np.squeeze(np.where(tdx[1:]>tdx[:-1]))
if len(iok)==0:
td1=[]
td2=[]
else:
iok = iok[i0:]
td2=tdx[iok]
tdx=xx[idat+2]
td1=tdx[iok]
#
return tmp5,td1,td2
#
#--------------------- 32 bit decode ---------------------------
@jit(nopython=True,cache=True)
def decodeData32(xx,idat,nd,nb,tmp0,tmp1,ifl):
#
n1=np.shape(xx)[0]
ic1=np.shape(idat)[0]
nch=xx[8]
nsamp=128
NH=6 # header size
tmp2 = np.zeros(nch*nsamp,dtype='uint32')
tmp3 = np.zeros(ic1*nch*nsamp,dtype='int32')
tmp1[:]=0
tmp3[:]=0
ic=1 # we may have a continuation stored in attribute values
kk=0
# check if first block is continuation
if ifl & ((nd[0]>>16)>0) & ((nd[1]>>16)==0):
i0=1
else:
i0=0
for ii in range(ic1):
nd1=nd[ii]&0x0000ffff
nd2=nd[ii]>>16
j1=idat[ii]+NH
j2=j1+nch
ndx=nd1-nch
#
if (j1+nd1) > n1:
break
if nd2==0:
# normal block without continuation
tmp0=xx[j1:j2]
tmp1[:]=0
tmp1[:ndx]=xx[j2:j1+nd1]
ic=0
else:
# have continuation
if nd1==0: # empty block
tmp1[:]=0
ic=-1
elif nd1==nd2: # following an empty block
tmp0=xx[j1:j2]
tmp1[:]=0
tmp1[:ndx]=xx[j2:j2+ndx]
ic=0
else:
# block with continuation
if ifl==1: # should be overall first block
ifl=0
continue # have no continuation stored
else:
if ic==0: # should be first of two continued blocks
tmp1[:] = 0
# store data
if nd1>nch:
tmp0 = xx[j1:j2]
tmp1[:ndx] = xx[j2:j2+ndx]
else:
tmp0[:nd1] = xx[j1:j1+nd1]
ic=1
else:
# have continuation
# add actual data
if nd1<(nd2-nch):
tmp1[nd2-nch-nd1:nd2-nch]=xx[j1:j1+nd1]
else:
tmp0[nd2-nd1:]=xx[j1:j1+(nch+nd1-nd2)]
tmp1[:nd2-nch]=xx[j1+(nch+nd1-nd2):j1+nd1]
ic=0
#
if (ic==0):
nbx=nb[ii]&0xffff
nb1=1<<(nbx-1)
nb2=1<<nbx
msk=nb2-1
#
decodeBlock(tmp2,tmp1,nch*nsamp,nbx,32)
#
tmp4 = tmp2 & msk
#
sgn= (tmp4 & nb1) >0
tmp4i = tmp4.astype(np.int32)
tmp4i[sgn] -= nb2
tmp4i[:nch]=tmp0.astype(np.int32)
for jj in range(nch,nch*nsamp):
tmp4i[jj] += tmp4i[jj-nch]
#
tmp3[kk*nch*nsamp:(kk+1)*nch*nsamp] = tmp4i
kk += 1
ndat=kk*nsamp
tmp5 = tmp3[:ndat*nch].reshape(ndat,nch)
return tmp5,i0
#
def getData32(xx, tmp0,tmp1, ifl):
idat=np.squeeze(np.where(xx==0xA5A5A5A5))
nb=xx[idat+1]
nd=xx[idat+5]
yy=np.frombuffer(xx,np.uint32)
tmp5,i0 =decodeData32(yy,idat,nd,nb,tmp0,tmp1,ifl)
#
tdx=xx[idat+3]
iok=np.squeeze(np.where(tdx[1:]>tdx[:-1]))
if len(iok)==0:
td1=[]
td2=[]
else:
iok = iok[i0:]
td2=tdx[iok]
tdx=xx[idat+2]
td1=tdx[iok]
#
return tmp5,td1,td2
def timerInterp(td2,ns):
## unwrap and interpolate timing vector
tax=td2[0]+np.cumsum(np.append(0,np.append((td2[1:]-td2[:-1]),(td2[-1]-td2[-2]))))
ta=np.interp(np.arange(0,len(td2),1/ns),np.arange(len(tax)),tax)
return ta
def loadAcoustics(fileName,ifl):
xx,fs,cp,sh,vers,SerNum,recTimeStamp = loadData(fileName)
print(fs,cp,sh,vers,SerNum,recTimeStamp)
ts = datetime.strptime(recTimeStamp[:15], '%Y%m%d_%H%M%S')
to = datetime.timestamp(ts)
if (vers>=10) & (vers<20):
if cp==0:
dat=np.frombuffer(xx[128:],np.int16)
td1=[]
td2=[]
ta=np.arange(np.shape(dat)[0])*1000000/fs
elif cp==1:
if ifl:
nch=xx[16]
nsamp=128
loadAcoustics.tmp0 = np.empty(nch,dtype='uint16')
loadAcoustics.tmp1 = np.empty(nch*nsamp,dtype='uint16')
#
dat,td1,td2=getData16(xx,loadAcoustics.tmp0,loadAcoustics.tmp1,ifl)
ta=timerInterp(td2,128)
elif (vers<10) | (vers>=20):
if cp==0:
dat=np.frombuffer(xx[128:],np.int32)
td1=[]
td2=[]
ta=np.arange(np.shape(dat)[0])*1000000/fs
elif cp==1:
if ifl:
nch=xx[8]
nsamp=128
loadAcoustics.tmp0 = np.empty(nch,dtype='uint32')
loadAcoustics.tmp1 = np.empty(nch*nsamp,dtype='uint32')
#
dat,td1,td2 = getData32(xx,loadAcoustics.tmp0,loadAcoustics.tmp1,ifl)
## unwrap timer (keep in microseconds)
if len(td2)>0:
ta=timerInterp(td2,128)
else:
ta=[]
return dat,ta,to,td1,td2,fs,sh,SerNum
Conversion#
Open first one or more files, second select directory for converted wav files
# initialize
ifl=1
file_names = filedialog.askopenfilenames(initialdir="./data/")
#
if len(file_names)>0:
path=filedialog.askdirectory(initialdir="./wav-files/")
#
for ii in range(len(file_names)):
dat,ta,to,td1,td2,fs,sh,SerNum = loadAcoustics(file_names[ii],ifl)
ifl=0
#
fdate=datetime.fromtimestamp(to).strftime('%Y%m%d_%H%M%S')
fname="{0}{1}{2:08X}{3}{4}{5}".format(path,"/T_",SerNum,"_",fdate,".wav")
print(fname)
#
wv.write(fname, dat, fs, sampwidth=4)
44100 1 12 20 764542 20230407_174000
C:/Users/zimme/jupyter/microPAM/wav-files/T_000BAA7E_20230407_174000.wav