microPAM convert to wav

Contents

microPAM convert to wav#

The following converts the microPAM data to wav files

# setup environment
import os
import numpy as np

import numba as nb
from numba import jit

from datetime import datetime, timedelta

print("Date and time is:", datetime.now())

import tkinter as tk
from tkinter import filedialog

root = tk.Tk()
root.withdraw()

import wavio as wv
Date and time is: 2023-09-06 20:10:22.206703
def loadData(fileName):
    xx = np.fromfile(fileName, dtype='uint32')
    vers=xx[5]
    SerNum=xx[6]
    fs = xx[7] # sampling frequency
    cp = xx[12] # data compress mode
    sh = xx[13] # shift in bits
    recTimeStamp=(xx[1:5].tobytes()).decode()
    return xx,fs,cp,sh,vers,SerNum,recTimeStamp
#
#--------------------------------------------------------
@jit(nopython=True,cache=True)
def decodeBlock(out,inp,nd,nb,NX):
    kk=0
    nx=NX
    for ii in range(nd):
        nx -= nb
        if nx>0:
            out[ii] = inp[kk]>>nx
        elif nx==0:
            out[ii] = inp[kk]
            kk += 1
            nx=NX
        elif nx <0:
            out[ii] = inp[kk] << (-nx)
            kk += 1
            nx += NX
            out[ii] |= (inp[kk]>>nx)
#
#-------------------- 16 bit decode -------------------------------------
@jit(nopython=True,cache=True)
def decodeData16(xx,idat,nd,nb,tmp0,tmp1,ifl):
    ic1=np.shape(idat)[0]
    nch=xx[16]
    nsamp=128
    NH=12        # header size

    tmp2 = np.empty(nch*nsamp,dtype='uint16')
    tmp3 = np.empty(ic1*nch*nsamp,dtype='int16')

    tmp1[:]=0
    tmp3[:]=0

    ic=1    # we may have a continuation stored in attribute values 
    #
    if ifl & ((nd[0]>>16)>0) & ((nd[1]>>16)==0): 
        i0=1
    else:
        i0=0

    kk=0
    for ii in range(i0,ic1):
        nd1=nd[ii]&0x0000ffff
        nd2=nd[ii]>>16
        j1=2*idat[ii]+NH
        j2=j1+nch
        ndx=nd1-nch
        #
        if nd2==0:
            tmp0=xx[j1:j2]
            tmp1[:]=0
            tmp1[:ndx]=xx[j2:j1+nd1]
            ic=0
        else:
            if nd1==0:
                tmp1[:]=0
                ic=-1
            elif nd1==nd2:
                tmp0=xx[j1:j2]
                tmp1[:]=0
                tmp1[:ndx]=xx[j2:j2+ndx]
                ic=0
            else:
                # block with continuation
                if ifl: # should be first block
                    continue    # have no continuation stored
                elif ic==0:   # should be first of two continued blocks
                    # store data
                    tmp1[:]=0
                    if nd1>nch:
                        tmp0 = xx[j1:j2]
                        tmp1[:ndx]=xx[j2:j2+ndx]
                    else:
                        tmp0[:nd1] = xx[j1:j1+nd1]
                    ic=1
                else:
                    # have continuation
                    # add actual data
                    if nd1<nd2-nch:
                        tmp1[nd2-nch-nd1:nd2-nch]=xx[j1:j1+nd1]
                    else:
                        tmp0[nd2-nd1:]=xx[j1:j1+(nch+nd1-nd2)]
                        tmp1[:nd2-nch]=xx[j1+(nch+nd1-nd2):j1+nd1]
                    ic=0
        #
        if (ic==0):
            nbx=nb[ii]&0xffff
            nb1=1<<(nbx-1)
            nb2=1<<nbx
            msk=nb2-1
            #
            decodeBlock(tmp2,tmp1,nch*nsamp,nbx,16)
            #
            tmp4 = tmp2 & msk
            #
            sgn= (tmp4 & nb1) >0
            tmp4i = tmp4.astype(np.int16)
            tmp4i[sgn] -= nb2
            tmp4i[:nch]=tmp0.astype(np.int16)
            for jj in range(nch,nch*nsamp):
                tmp4i[jj] += tmp4i[jj-nch]
            #
            tmp3[kk*nch*nsamp:(kk+1)*nch*nsamp] = tmp4i
            kk += 1

    ndat=kk*nsamp
    tmp5 = tmp3[:ndat*nch].reshape(ndat,nch)
    return tmp5,i0
#
def getData16(xx,tmp0,tmp1,ifl):
    idat=np.squeeze(np.where(xx==0xA5A5A5A5))

    nb=xx[idat+1]
    nd=xx[idat+5]
    yy=np.frombuffer(xx,np.uint16)
    tmp5,i0 =decodeData16(yy,idat,nd,nb,tmp0,tmp1,ifl)
    
    tdx=xx[idat+3]
    iok=np.squeeze(np.where(tdx[1:]>tdx[:-1]))
    if len(iok)==0:
        td1=[]
        td2=[]
    else:
        iok = iok[i0:]
        td2=tdx[iok]

        tdx=xx[idat+2]
        td1=tdx[iok]
    #
    return tmp5,td1,td2
#
#--------------------- 32 bit decode ---------------------------
@jit(nopython=True,cache=True)
def decodeData32(xx,idat,nd,nb,tmp0,tmp1,ifl):
    #
    n1=np.shape(xx)[0]
    ic1=np.shape(idat)[0]
    nch=xx[8]
    nsamp=128
    NH=6        # header size 

    tmp2 = np.zeros(nch*nsamp,dtype='uint32')
    tmp3 = np.zeros(ic1*nch*nsamp,dtype='int32')

    tmp1[:]=0
    tmp3[:]=0

    ic=1    # we may have a continuation stored in attribute values 
    kk=0
    # check if first block is continuation
    if ifl & ((nd[0]>>16)>0) & ((nd[1]>>16)==0): 
        i0=1
    else:
        i0=0

    for ii in range(ic1):
        nd1=nd[ii]&0x0000ffff
        nd2=nd[ii]>>16
        j1=idat[ii]+NH
        j2=j1+nch
        ndx=nd1-nch
        #
        if (j1+nd1) > n1: 
            break
        if nd2==0:
            # normal block without continuation
            tmp0=xx[j1:j2]
            tmp1[:]=0
            tmp1[:ndx]=xx[j2:j1+nd1]
            ic=0
        else:
            # have continuation
            if nd1==0:  # empty block
                tmp1[:]=0
                ic=-1
            elif nd1==nd2:  # following an empty block
                tmp0=xx[j1:j2]
                tmp1[:]=0
                tmp1[:ndx]=xx[j2:j2+ndx]
                ic=0
            else:
                # block with continuation
                if ifl==1: # should be overall first block
                    ifl=0
                    continue    # have no continuation stored
                else:
                    if ic==0:   # should be first of two continued blocks
                        tmp1[:] = 0
                        # store data

                        if nd1>nch:
                            tmp0 = xx[j1:j2]
                            tmp1[:ndx] = xx[j2:j2+ndx]
                        else:
                            tmp0[:nd1] = xx[j1:j1+nd1]
                        ic=1
                    else:
                        # have continuation
                        # add actual data
                        if nd1<(nd2-nch):
                            tmp1[nd2-nch-nd1:nd2-nch]=xx[j1:j1+nd1]
                        else:
                            tmp0[nd2-nd1:]=xx[j1:j1+(nch+nd1-nd2)]
                            tmp1[:nd2-nch]=xx[j1+(nch+nd1-nd2):j1+nd1]
                        ic=0
            #
        if (ic==0):
            nbx=nb[ii]&0xffff
            nb1=1<<(nbx-1)
            nb2=1<<nbx
            msk=nb2-1
            #
            decodeBlock(tmp2,tmp1,nch*nsamp,nbx,32)
            #
            tmp4 = tmp2 & msk
            #
            sgn= (tmp4 & nb1) >0
            tmp4i = tmp4.astype(np.int32)
            tmp4i[sgn] -= nb2
            tmp4i[:nch]=tmp0.astype(np.int32)
            for jj in range(nch,nch*nsamp):
                tmp4i[jj] += tmp4i[jj-nch]
            #
            tmp3[kk*nch*nsamp:(kk+1)*nch*nsamp] = tmp4i
            kk += 1
    ndat=kk*nsamp
    tmp5 = tmp3[:ndat*nch].reshape(ndat,nch)
    return tmp5,i0
#   
def getData32(xx, tmp0,tmp1, ifl):
    idat=np.squeeze(np.where(xx==0xA5A5A5A5))
        
    nb=xx[idat+1]
    nd=xx[idat+5]
    yy=np.frombuffer(xx,np.uint32)
    tmp5,i0 =decodeData32(yy,idat,nd,nb,tmp0,tmp1,ifl)
    #
    tdx=xx[idat+3]
    iok=np.squeeze(np.where(tdx[1:]>tdx[:-1]))
    if len(iok)==0:
        td1=[]
        td2=[]
    else:
        iok = iok[i0:]
        td2=tdx[iok]

        tdx=xx[idat+2]
        td1=tdx[iok]
    #
    return tmp5,td1,td2


def timerInterp(td2,ns):
    ## unwrap and interpolate timing vector
    tax=td2[0]+np.cumsum(np.append(0,np.append((td2[1:]-td2[:-1]),(td2[-1]-td2[-2]))))
    ta=np.interp(np.arange(0,len(td2),1/ns),np.arange(len(tax)),tax)
    return ta

def loadAcoustics(fileName,ifl):
    xx,fs,cp,sh,vers,SerNum,recTimeStamp = loadData(fileName)
    print(fs,cp,sh,vers,SerNum,recTimeStamp)

    ts = datetime.strptime(recTimeStamp[:15], '%Y%m%d_%H%M%S')
    to = datetime.timestamp(ts)
    
    if (vers>=10) & (vers<20):
        if cp==0:
            dat=np.frombuffer(xx[128:],np.int16)
            td1=[]
            td2=[]
            ta=np.arange(np.shape(dat)[0])*1000000/fs
        elif cp==1:
            if ifl:
                nch=xx[16]
                nsamp=128
                loadAcoustics.tmp0 = np.empty(nch,dtype='uint16')
                loadAcoustics.tmp1 = np.empty(nch*nsamp,dtype='uint16')
            #
            dat,td1,td2=getData16(xx,loadAcoustics.tmp0,loadAcoustics.tmp1,ifl)
            ta=timerInterp(td2,128)
    elif (vers<10) | (vers>=20):
        if cp==0:
            dat=np.frombuffer(xx[128:],np.int32)
            td1=[]
            td2=[]
            ta=np.arange(np.shape(dat)[0])*1000000/fs
        elif cp==1:
            if ifl:
                nch=xx[8]
                nsamp=128
                loadAcoustics.tmp0 = np.empty(nch,dtype='uint32')
                loadAcoustics.tmp1 = np.empty(nch*nsamp,dtype='uint32')
            #
            dat,td1,td2 = getData32(xx,loadAcoustics.tmp0,loadAcoustics.tmp1,ifl)
            ## unwrap timer (keep in microseconds)
            if len(td2)>0:
                ta=timerInterp(td2,128)
            else:
                ta=[]

    return dat,ta,to,td1,td2,fs,sh,SerNum

Conversion#

Open first one or more files, second select directory for converted wav files

# initialize 
ifl=1

file_names = filedialog.askopenfilenames(initialdir="./data/")
#
if len(file_names)>0:
    path=filedialog.askdirectory(initialdir="./wav-files/")
    #
    for ii in range(len(file_names)):
        dat,ta,to,td1,td2,fs,sh,SerNum = loadAcoustics(file_names[ii],ifl)
        ifl=0
        #
        fdate=datetime.fromtimestamp(to).strftime('%Y%m%d_%H%M%S')
        fname="{0}{1}{2:08X}{3}{4}{5}".format(path,"/T_",SerNum,"_",fdate,".wav")
        print(fname)
        #
        wv.write(fname, dat, fs, sampwidth=4)
44100 1 12 20 764542 20230407_174000
C:/Users/zimme/jupyter/microPAM/wav-files/T_000BAA7E_20230407_174000.wav