import numpy as np
import scipy.signal as sig
from autodeer.classes import Parameter
from autodeer.dataset import create_dataset_from_axes, create_dataset_from_sequence
from autodeer.pulses import Pulse
from scipy.integrate import cumulative_trapezoid
from deerlab import correctphase
[docs]
def uwb_load(matfile: np.ndarray, options: dict = dict(), verbosity=0,
compress: bool = True, mask=None, sequence=None):
"""uwb_load This function is based upon the uwb_eval matlab function
developed by Andrin Doll. This is mostly a Python version of this software,
for loading data generated by Doll style spectrometers.
Parameters
----------
matfile : np.ndarray
_description_
options : dict, optional
_description_, by default None
compress : bool, optional
Compresses all the scans into a single scan.
"""
# Extract Data
estr = matfile[matfile['expname']]
conf = matfile['conf']
def extract_data(matfile):
if "dta" in matfile.keys():
nAvgs = matfile["nAvgs"]
dta = [matfile["dta"]]
elif "dta_001" in matfile.keys():
dta = []
for ii in range(1, estr["avgs"]+1):
actname = 'dta_%03u' % ii
if actname in matfile.keys():
dta.append(matfile[actname])
# Only keep it if the average is complete, unless it is
# the first
if sum(dta[ii-1][:, -1]) == 0 and ii > 1:
dta = dta[:-1]
elif sum(dta[ii-1][:, -1]) == 0 and ii == 1:
nAvgs = 0
else:
nAvgs = ii
if compress:
dta_sum = np.zeros(dta[0].shape)
for avg in dta:
dta_sum += avg
dta = [dta_sum]
else:
raise ValueError('The file has no attached data.')
return [dta, nAvgs]
dta, nAvgs = extract_data(matfile)
if mask is not None:
dta = [dta[0][:, mask]]
# Eliminate Phase cycles
if "postsigns" not in estr.keys():
print("TODO: check uwb_eval")
raise RuntimeError("This is not implemented yet")
if np.isscalar(estr["postsigns"]["signs"]):
estr["postsigns"]["signs"] = [estr["postsigns"]["signs"]]
if type(estr["parvars"]) is dict:
estr["parvars"] = [estr["parvars"]]
cycled = np.array(list(map(np.size, estr["postsigns"]["signs"]))) > 1
# decide on wheteher the phase cycle should be eliminated or not
if any(cycled == 0):
elim_pcyc = 1 # if there is any non-phasecycling parvar
else:
elim_pcyc = 0 # if all parvars cycle phases, do not reduce them
if "elim_pcyc" in options.keys():
elim_pcyc = options["elim_pcyc"]
# Get the cycles out
if elim_pcyc:
for ii in range(0, len(cycled)):
if cycled[ii]:
if ii > 1:
n_skip = np.prod(estr["postsigns"]["dims"][0:ii-1])
else:
n_skip = 1
plus_idx = np.where(estr["postsigns"]["signs"][ii] == 1)[0]
minus_idx = np.where(estr["postsigns"]["signs"][ii] == -1)[0]
plus_mask = np.arange(0, n_skip) + (plus_idx - 1) * n_skip
minus_mask = np.arange(0, n_skip) + (minus_idx - 1) * n_skip
n_rep = np.size(dta[0], 1) // \
(n_skip * estr["postsigns"]["dims"][ii])
for kk in range(0, len(dta)):
# re-allocate
tmp = dta[kk]
dta[kk] = np.zeros((np.size(tmp, 0), n_rep*n_skip))
# subtract out
for jj in range(0, n_rep):
curr_offset = (jj) * n_skip
full_offset = (jj) * n_skip * \
estr["postsigns"]["dims"][ii]
dta[kk][:, np.arange(0, n_skip)+curr_offset] = \
tmp[:, plus_mask+full_offset] - \
tmp[:, minus_mask+full_offset]
# Find all the axes
dta_x = []
ii_dtax = 0
relevant_parvars = []
for ii in range(0, len(cycled)):
estr["postsigns"]["ids"] = \
np.array(estr["postsigns"]["ids"]).reshape(-1)
if not (elim_pcyc and cycled[ii]):
if type(estr["parvars"]) is list:
vecs = estr["parvars"][estr["postsigns"]["ids"][ii]-1]["axis"]
if np.ndim(vecs) == 1:
dta_x.append(vecs.astype(np.float32))
elif np.ndim(vecs) == 2:
unique_axes = np.unique(vecs, axis=1)
dta_x.append(unique_axes.astype(np.float32))
else:
dta_x.append(estr["parvars"]["axis"].astype(np.float32))
relevant_parvars.append(estr["postsigns"]["ids"][ii]-1)
ii_dtax += 1
exp_dim = ii_dtax
if ii_dtax == 0:
raise RuntimeError("Your dataset does not have any swept dimensions."
"Uwb_eval does not work for experiments without"
"any parvars")
elif ii_dtax > 2:
raise RuntimeError("Uwb_eval cannot handle more than two dimensions")
# mask dta_x if necessary
if mask is not None:
for ii in range(0, len(dta_x)):
dta_x[ii] = dta_x[ii][mask]
det_frq = estr["events"][estr["det_event"]-1]["det_frq"]
det_frq_dim = 0
fsmp = conf["std"]["dig_rate"]
# Check for any frequency changes as well as any fixed downconversion
# frequencies
if "det_frq" in options.keys():
det_frq = options["det_frq"]
else:
det_frq_dim = 0
for ii in range(0, len(relevant_parvars)):
act_par = estr["parvars"][relevant_parvars[ii]]
frq_change = np.zeros((len(act_par["variables"]), 1))
for jj in range(0, len(act_par["variables"])):
if not any('nu_' in word for word
in estr["parvars"][0]["variables"]):
frq_change[jj] = 1
if any(frq_change): # was there a frequency change
# is the frequency change relevant
if "det_frq_id" in estr["events"][estr["det_event"]-1]:
frq_pulse = estr["events"][estr["det_event"]-1]["det_frq_id"]
nu_init_change = 0
nu_final_change = 0
for jj in range(0, np.size(act_par["variables"])):
if ('events{' + str(frq_pulse+1) + "}.pulsedef.nu_i") in \
act_par["variables"][jj]:
nu_init_change = jj
if ('events{' + str(frq_pulse+1) + "}.pulsedef.nu_f") in \
act_par["variables"][jj]:
nu_final_change = jj
if any([nu_init_change, nu_final_change]):
# There is a frequency change on the frequency encoding
# pulse
# dimension that will determine the detection frequency
det_frq_dim = ii
if "nu_final" != estr["events"][frq_pulse]['pulsedef']:
# rectangular pulse
if nu_init_change == 0:
print(
"uwb_eval has no idea how to guess your "
"detection frequency. You were setting a "
"rectangular pulse in event" + str(frq_pulse)
+ ", but are now increasing its end frequency"
". You may obtain unexpected results.")
# get the frequencies either from
# the vectorial definition
if "vec" in act_par.keys():
det_frq = act_par["vec"][:, nu_init_change]
else:
# or from the parametric definition
nu_init = estr["events"][frq_pulse]["pulsedef"][
"nu_init"]
if np.isnan(act_par["strt"][nu_init_change]):
det_frq = np.arange(0, act_par["dim"]) * \
act_par["inc"][nu_init_change] + nu_init
else:
det_frq = np.arange(0, act_par["dim"]) * \
act_par["inc"][nu_init_change] + \
act_par["strt"][nu_init_change]
else:
# chirp pulse, both nu_init and nu_final need to be
# considered
nu_init = estr["events"][frq_pulse]["pulsedef"][
"nu_init"]
nu_final = estr["events"][frq_pulse]["pulsedef"][
"nu_final"]
# get the frequencies either from the
# vectorial definition
if "vec" in act_par.keys():
if nu_init_change != 0:
nu_init = act_par["vec"][:, nu_init_change]
if nu_final_change != 0:
nu_final = act_par["vec"][:, nu_final_change]
else:
# or from the parametric definition
if nu_init_change != 0:
if np.isnan(act_par["strt"][nu_init_change]):
nu_init = np.arange(0, act_par["dim"]) * \
act_par["inc"][nu_init_change] + \
nu_init
else:
nu_init = np.arange(0, act_par["dim"]) * \
act_par["inc"][nu_init_change] + \
act_par["strt"][nu_init_change]
if nu_final_change != 0:
if np.isnan(act_par["strt"][nu_init_change]):
nu_final = np.arange(0, act_par["dim"]) * \
act_par["inc"][nu_final_change] + \
nu_final
else:
nu_final = np.arange(0, act_par["dim"]) * \
act_par["inc"][nu_final_change] + \
act_par["strt"][nu_final]
det_frq = (nu_init + nu_final) / 2
else:
# we can only land here, if there was no det_frq_id given, but
# det_frq was explicitly provided in the experiment. This could
# be intentional, but could even so be a mistake of the user.
print("uwb_eval has no idea how to guess your detection"
"frequency. You were changing some pulse frequencies, "
"but did not provide det_frq_id for your detection event"
". I will use det_frq, as you provided it in the "
"experiment.")
# ****Check digitizer level
parvar_pts = np.zeros(np.size(estr["parvars"]))
for ii in range(0, len(estr["parvars"])):
if "vec" in estr["parvars"][ii]:
parvar_pts[ii] = np.size(estr["parvars"][ii]["vec"], 0)
else:
parvar_pts[ii] = estr["parvars"][ii]["dim"]
# the number of data points entering one echo transient (due to reduction
# during acquisition or reduction of phasecycles just above)
n_traces = np.prod(parvar_pts) / np.prod(list(map(len, dta_x)))
if "dig_max" in conf.keys():
trace_maxlev = n_traces * estr["shots"] * conf["dig_max"]
else:
trace_maxlev = n_traces * estr["shots"] * 2**11
# ***** Extract all the echoes
echopos = estr["events"][estr["det_event"]-1]["det_len"]/2 - estr[
"events"][estr["det_event"]-1]["det_pos"] * fsmp
dist = min([echopos, estr["events"][estr["det_event"]-1]["det_len"] -
echopos])
ran_echomax = np.arange(echopos - dist, echopos + dist, dtype=np.int64)
# get the downconversion to LO
t_ax_full = np.arange(0, len(ran_echomax)) / fsmp
if not np.isscalar(det_frq):
tf = np.matmul(t_ax_full[:, None], det_frq[None, :])
LO = np.exp(-2 * np.pi * 1j * tf)
else:
LO = np.exp(-2 * np.pi * 1j * t_ax_full * det_frq)
flipback = 0
# 1D or 2D
if exp_dim == 2:
dta_ev = np.zeros((len(dta_x[0]), len(dta_x[1])), dtype=np.complex128)
dta_avg = np.zeros(
(len(ran_echomax), len(dta_x[0]), len(dta_x[1])),
dtype=np.complex128)
perm_order = [0, 1, 2]
if det_frq_dim == 1:
perm_order = [0, 2, 1]
flipback = 1
dta_ev = np.transpose(dta_ev)
dta_avg = np.transpose(dta_avg, perm_order)
elif exp_dim == 1:
dta_ev = np.zeros((np.size(dta_x[0],axis=0)), dtype=np.complex128)
dta_avg = np.zeros((len(ran_echomax), np.size(dta_x[0],axis=0)),
dtype=np.complex128)
dta_scans = np.zeros((len(dta),) + dta_ev.shape, dtype=np.complex128)
for ii in range(0, len(dta)):
dta_c = dta[ii][ran_echomax, :]
dta_c = np.conj(np.apply_along_axis(sig.hilbert, 0, dta_c))
# reshape the 2D data
if exp_dim == 2:
dta_resort = np.reshape(dta_c, (len(ran_echomax), len(dta_x[0]),
len(dta_x[1]))
)
dta_resort = np.transpose(dta_resort, perm_order)
else:
dta_resort = dta_c
# downconvert
dta_dc = (dta_resort.T * LO.T).T
# refine the echo position for further evaluation,
# based on the first average
if ii == 0:
# put a symetric window to mask the expected echo position
window = sig.windows.chebwin(np.size(dta_dc, 0), 100)
dta_win = np.transpose(np.transpose(dta_dc) * window)
# absolute part of integral, since phase is not yet determined
absofsum = np.squeeze(np.abs(np.sum(dta_win, 0)))
# get the strongest echo of that series
ref_echo = np.argmax(absofsum.flatten('F'))
# use this echo to inform about the digitizer scale
max_amp = np.amax(dta[ii][ran_echomax, ref_echo], 0)
dig_level = max_amp / trace_maxlev
if "IFgain_levels" in conf["std"]:
# Check about eventual improvemnets by changing IF levels
possible_levels = dig_level * conf["std"]["IFgain_levels"] / \
conf["std"]["IFgain_levels"][estr["IFgain"]]
possible_levels[possible_levels > 0.75] = 0
best_lev = np.amax(possible_levels)
best_idx = np.argmax(possible_levels)
if (best_idx != estr["IFgain"]) & (verbosity > 0):
print(
f"You are currently using {dig_level} of the maximum"
f"possible level of the digitizer at an IFgain setting"
f"of {estr['IFgain']} \n It may be advantageous to use"
f"an IFgain setting of {best_idx} , where the maximum "
f"level will be on the order of {best_lev}.")
# for 2D data, only a certain slice may be requested
if "ref_echo_2D_idx" in options.keys():
if "ref_echo_2D_dim" not in options.keys():
options["ref_echo_2D_dim"] = 1
if flipback:
if options["ref_echo_2D_dim"] == 1:
options["ref_echo_2D_dim"] = 2
else:
options["ref_echo_2D_dim"] = 1
if options["ref_echo_2D_idx"] == "end":
options["ref_echo_2D_idx"] = np.size(
absofsum, options["ref_echo_2D_dim"]-1)
if options["ref_echo_2D_dim"] == 1:
ii_ref = options["ref_echo_2D_idx"] - 1
jj_ref = np.argmax(absofsum[ii_ref, :])
else:
jj_ref = options["ref_echo_2D_idx"] - 1
ii_ref = np.argmax(absofsum[jj_ref, :])
# convert the ii_ref,jj_ref to a linear index, as this is how
# the convolution is done a few lines below
ref_echo = np.ravel_multi_index(
[ii_ref, jj_ref], absofsum.shape)
if "ref_echo" in options.keys():
if "end" == options["ref_echo"]:
ref_echo = len(absofsum[:])
else:
ref_echo = options["ref_echo"]
# look for zerotime by crosscorrelation with
# echo-like window (chebwin..),
# use conv istead of xcorr, because of the
# life-easy-making 'same' option
# TODO turn this into a matched filter
# this is where one could use a matched echo shape
convshape = sig.windows.chebwin(min([100, len(ran_echomax)]), 100)
if exp_dim == 2:
ref_echo_unravel = np.unravel_index(ref_echo, absofsum.shape)
e_idx = np.argmax(sig.convolve(
np.abs(dta_dc[:, ref_echo_unravel[0],
ref_echo_unravel[1]]), convshape, mode="same"))
else:
e_idx = np.argmax(sig.convolve(
np.abs(dta_dc[:, ref_echo]), convshape, mode="same"))
# now get the final echo window, which is centered around the
# maximum position just found
dist = min([e_idx, np.size(dta_dc, 0)-e_idx])
evlen = 2 * dist
if "evlen" in options.keys():
evlen = options["evlen"]
if "find_echo" in options.keys():
e_idx = np.floor(np.size(dta_dc, 0)/2)
# here the final range...
ran_echo = np.arange(e_idx-evlen/2, e_idx+evlen/2, dtype=np.int16)
# ... and a check wether this is applicable
if not (ran_echo[0] >= 0 and ran_echo[-1] <= np.size(dta_dc, 0)):
raise RuntimeError(
f"Echo position at {e_idx} with evaluation length of "
f"{evlen} is not valid, since the dataset has only "
f"{np.size(dta_dc,0)} points.")
# here the final time axis of the dataset
t_ax = np.arange(-evlen/2, evlen/2) / fsmp
# get also indices of reference echo in case of 2D data
if absofsum.ndim == 2:
[ii_ref, jj_ref] = np.unravel_index(
ref_echo, absofsum.shape, order='F')
# window the echo
if exp_dim == 2:
dta_win = np.multiply(dta_dc[ran_echo, :, :].T,
sig.windows.chebwin(evlen, 100)).T
else:
dta_win = np.multiply(dta_dc[ran_echo, :].T,
sig.windows.chebwin(evlen, 100)).T
# get all the phases and use reference echo for normalization
dta_ang = np.angle(np.sum(dta_win, 0))
# for frequency changes, we phase each frequency
if det_frq_dim != 0:
if exp_dim == 2:
corr_phase = dta_ang[..., jj_ref]
else:
corr_phase = dta_ang
else:
corr_phase = dta_ang[ref_echo]
# check if a fixed phase was provided
if "corr_phase" in options.keys():
corr_phase = options["corr_phase"]
# check if any datapoint needs to be phased individually
if "phase_all" in options.keys() and options["phase_all"] == 1:
corr_phase = dta_ang
bfunc = lambda x: x * np.exp(-1j * corr_phase)
# dta_pha = np.multiply(dta_win, np.exp(-1j * corr_phase))
dta_pha = np.apply_along_axis(bfunc, 1, dta_win)
dta_this_scan = np.squeeze(np.sum(dta_pha, 0)) / \
sum(sig.windows.chebwin(evlen, 100))
dta_ev = dta_ev + dta_this_scan
dta_scans[ii, :] = dta_this_scan # This will not work for 2D
if exp_dim == 2:
dta_avg[0:evlen, :, :] = dta_avg[0:evlen, :, :] + \
np.apply_along_axis(bfunc, 1, dta_win)
# np.multiply(dta_dc[ran_echo, :, :], np.exp(-1j * corr_phase))
else:
dta_avg[0:evlen, :] = dta_avg[0:evlen, :] + \
np.multiply(dta_dc[ran_echo, :], np.exp(-1j * corr_phase))
dta_avg = dta_avg[0:evlen, ...]
# keyboard
# flip back 2D Data
if flipback:
dta_avg = np.transpose(dta_avg, perm_order)
dta_ev = np.transpose(dta_ev)
if sequence is None:
params = {'nAvgs': nAvgs, 'LO': estr['LO']+1.5, 'B': estr['B'],
'reptime': estr['reptime'], 'shots': estr['shots']}
axis = [];
for i in range(exp_dim):
if (dta_x[i].ndim == 1) or (dta_x[i].shape[1] == 1):
axis.append(dta_x[i])
else:
axis.append(dta_x[i][:, 0])
output = create_dataset_from_axes(dta_ev, axis, params)
else:
params = {'nAvgs': nAvgs}
output = create_dataset_from_sequence(dta_ev, sequence,params)
return output
# ---------------------------------------------------------------------------
# uwb_eval rewritten to use a matched filter
# ---------------------------------------------------------------------------
[docs]
def uwb_eval_match(matfile, sequence=None, scans=None, mask=None,filter_pulse=None,filter_type='match',filter_width=None,verbosity=0,corr_phase = False, **kwargs):
"""
Parameters
----------
matfile : np.ndarray
The data file to be loaded.
sequence : ad.Sequence, optional
The sequence used to generate the data.
scans : list, optional
The scans to be loaded.
mask : list, optional
The mask to be used.
filter_pulse : ad.Pulse, optional
The pulse to be used as a matched filter. If None, the maximum pulse width will be used. This is only used if filter_type is 'match'
filter_type : str, optional
The type of filter to be used. Options are 'match', 'cheby2' and 'butter. Default is 'match'
filter_width : float, optional
The width of the filter to be used. This is only used if filter_type is 'cheby2' or 'butter'
verbosity : int, optional
The verbosity of the function. Default is 0.
"""
# imports Andrin Doll AWG datafiles using a matched filter
estr = matfile[matfile['expname']]
conf = matfile['conf']
def extract_data(matfile,scans):
if "dta" in matfile.keys():
nAvgs = matfile["nAvgs"]
dta = [matfile["dta"]]
elif "dta_001" in matfile.keys():
dta = []
if scans is None:
for ii in range(1, estr["avgs"]+1):
actname = 'dta_%03u' % ii
if actname in matfile.keys():
dta.append(matfile[actname])
# Only keep it if the average is complete, unless it is
# the first
if sum(dta[ii-1][:, -1]) == 0 and ii > 1:
dta = dta[:-1]
elif sum(dta[ii-1][:, -1]) == 0 and ii == 1:
nAvgs = 0
else:
nAvgs = ii
else:
for i,ii in enumerate(scans):
actname = 'dta_%03u' % ii
scans = 0
if actname in matfile.keys():
dta.append(matfile[actname])
# Only keep it if the average is complete, unless it is
# the first
if sum(dta[i-1][:, -1]) == 0 and ii > 1:
dta = dta[:-1]
nAvgs = len(dta)
elif sum(dta[i-1][:, -1]) == 0 and ii == 1:
nAvgs = 0
else:
nAvgs = len(dta)
dta_sum = np.zeros(dta[0].shape)
for avg in dta:
dta_sum += avg
dta = [dta_sum]
else:
print('The file has no attached data.')
raise ValueError('The file has no attached data.')
return [dta, nAvgs]
dta, nAvgs = extract_data(matfile,scans)
if mask is not None:
dta = [dta[0][:, mask]]
# Eliminate Phase cycles
if "postsigns" not in estr.keys():
print("TODO: check uwb_eval")
raise RuntimeError("This is not implemented yet")
if np.isscalar(estr["postsigns"]["signs"]):
estr["postsigns"]["signs"] = [estr["postsigns"]["signs"]]
if type(estr["parvars"]) is dict:
estr["parvars"] = [estr["parvars"]]
cycled = np.array(list(map(np.size, estr["postsigns"]["signs"]))) > 1
# decide on wheteher the phase cycle should be eliminated or not
if any(cycled == 0):
elim_pcyc = 1 # if there is any non-phasecycling parvar
else:
elim_pcyc = 0 # if all parvars cycle phases, do not reduce them
# Get the cycles out
if elim_pcyc:
for ii in range(0, len(cycled)):
if cycled[ii]:
if ii > 1:
n_skip = np.prod(estr["postsigns"]["dims"][0:ii-1])
else:
n_skip = 1
plus_idx = np.where(estr["postsigns"]["signs"][ii] == 1)[0]
minus_idx = np.where(estr["postsigns"]["signs"][ii] == -1)[0]
plus_mask = np.arange(0, n_skip) + (plus_idx - 1) * n_skip
minus_mask = np.arange(0, n_skip) + (minus_idx - 1) * n_skip
n_rep = np.size(dta[0], 1) // \
(n_skip * estr["postsigns"]["dims"][ii])
for kk in range(0, len(dta)):
# re-allocate
tmp = dta[kk]
dta[kk] = np.zeros((np.size(tmp, 0), n_rep*n_skip))
# subtract out
for jj in range(0, n_rep):
curr_offset = (jj) * n_skip
full_offset = (jj) * n_skip * \
estr["postsigns"]["dims"][ii]
dta[kk][:, np.arange(0, n_skip)+curr_offset] = \
tmp[:, plus_mask+full_offset] - \
tmp[:, minus_mask+full_offset]
# Find all the axes
dta_x = []
ii_dtax = 0
relevant_parvars = []
for ii in range(0, len(cycled)):
estr["postsigns"]["ids"] = \
np.array(estr["postsigns"]["ids"]).reshape(-1)
if not (elim_pcyc and cycled[ii]):
if type(estr["parvars"]) is list:
vecs = estr["parvars"][estr["postsigns"]["ids"][ii]-1]["axis"]
if np.ndim(vecs) == 1:
dta_x.append(vecs.astype(np.float32))
elif np.ndim(vecs) == 2:
unique_axes = np.unique(vecs, axis=1)
dta_x.append(unique_axes.astype(np.float32))
else:
dta_x.append(estr["parvars"]["axis"].astype(np.float32))
relevant_parvars.append(estr["postsigns"]["ids"][ii]-1)
ii_dtax += 1
exp_dim = ii_dtax
if ii_dtax == 0:
raise RuntimeError("Your dataset does not have any swept dimensions."
"Uwb_eval does not work for experiments without"
"any parvars")
elif ii_dtax > 2:
raise RuntimeError("Uwb_eval cannot handle more than two dimensions")
det_frq = estr["events"][estr["det_event"]-1]["det_frq"]
det_frq_dim = 0
fsmp = conf["std"]["dig_rate"]
for ii in range(0, len(relevant_parvars)):
act_par = estr["parvars"][relevant_parvars[ii]]
frq_change = np.zeros((len(act_par["variables"]), 1))
for jj in range(0, len(act_par["variables"])):
if not any('nu_' in word for word
in estr["parvars"][0]["variables"]):
frq_change[jj] = 1
if any(frq_change): # was there a frequency change
# is the frequency change relevant
if "det_frq_id" in estr["events"][estr["det_event"]-1]:
frq_pulse = estr["events"][estr["det_event"]-1]["det_frq_id"]
nu_init_change = 0
nu_final_change = 0
for jj in range(0, np.size(act_par["variables"])):
if ('events{' + str(frq_pulse+1) + "}.pulsedef.nu_i") in \
act_par["variables"][jj]:
nu_init_change = jj
if ('events{' + str(frq_pulse+1) + "}.pulsedef.nu_f") in \
act_par["variables"][jj]:
nu_final_change = jj
if any([nu_init_change, nu_final_change]):
# There is a frequency change on the frequency encoding
# pulse
# dimension that will determine the detection frequency
det_frq_dim = ii
if "nu_final" != estr["events"][frq_pulse]['pulsedef']:
# rectangular pulse
if nu_init_change == 0:
print(
"uwb_eval has no idea how to guess your "
"detection frequency. You were setting a "
"rectangular pulse in event" + str(frq_pulse)
+ ", but are now increasing its end frequency"
". You may obtain unexpected results.")
# get the frequencies either from
# the vectorial definition
if "vec" in act_par.keys():
det_frq = act_par["vec"][:, nu_init_change]
else:
# or from the parametric definition
nu_init = estr["events"][frq_pulse]["pulsedef"][
"nu_init"]
if np.isnan(act_par["strt"][nu_init_change]):
det_frq = np.arange(0, act_par["dim"]) * \
act_par["inc"][nu_init_change] + nu_init
else:
det_frq = np.arange(0, act_par["dim"]) * \
act_par["inc"][nu_init_change] + \
act_par["strt"][nu_init_change]
else:
# chirp pulse, both nu_init and nu_final need to be
# considered
nu_init = estr["events"][frq_pulse]["pulsedef"][
"nu_init"]
nu_final = estr["events"][frq_pulse]["pulsedef"][
"nu_final"]
# get the frequencies either from the
# vectorial definition
if "vec" in act_par.keys():
if nu_init_change != 0:
nu_init = act_par["vec"][:, nu_init_change]
if nu_final_change != 0:
nu_final = act_par["vec"][:, nu_final_change]
else:
# or from the parametric definition
if nu_init_change != 0:
if np.isnan(act_par["strt"][nu_init_change]):
nu_init = np.arange(0, act_par["dim"]) * \
act_par["inc"][nu_init_change] + \
nu_init
else:
nu_init = np.arange(0, act_par["dim"]) * \
act_par["inc"][nu_init_change] + \
act_par["strt"][nu_init_change]
if nu_final_change != 0:
if np.isnan(act_par["strt"][nu_init_change]):
nu_final = np.arange(0, act_par["dim"]) * \
act_par["inc"][nu_final_change] + \
nu_final
else:
nu_final = np.arange(0, act_par["dim"]) * \
act_par["inc"][nu_final_change] + \
act_par["strt"][nu_final]
det_frq = (nu_init + nu_final) / 2
parvar_pts = np.zeros(np.size(estr["parvars"]))
for ii in range(0, len(estr["parvars"])):
if "vec" in estr["parvars"][ii]:
parvar_pts[ii] = np.size(estr["parvars"][ii]["vec"], 0)
else:
parvar_pts[ii] = estr["parvars"][ii]["dim"]
# the number of data points entering one echo transient (due to reduction
# during acquisition or reduction of phasecycles just above)
n_traces = np.prod(parvar_pts) / np.prod(list(map(len, dta_x)))
if "dig_max" in conf.keys():
trace_maxlev = n_traces * estr["shots"] * conf["dig_max"]
else:
trace_maxlev = n_traces * estr["shots"] * 2**11
max_amp = np.amax(dta[0],0)
dig_level = np.amax(max_amp)
if "IFgain_levels" in conf["std"]:
# Check about eventual improvemnets by changing IF levels
possible_levels = dig_level * conf["std"]["IFgain_levels"] / \
conf["std"]["IFgain_levels"][estr["IFgain"]]
possible_levels[possible_levels > 0.75] = 0
best_lev = np.amax(possible_levels)
best_idx = np.argmax(possible_levels)
if (best_idx != estr["IFgain"]) & (verbosity > 0):
print(
f"You are currently using {dig_level} of the maximum"
f"possible level of the digitizer at an IFgain setting"
f"of {estr['IFgain']} \n It may be advantageous to use"
f"an IFgain setting of {best_idx} , where the maximum "
f"level will be on the order of {best_lev}.")
det_frqs_perc = calc_percieved_freq(fsmp,det_frq)
echo_len = dta[0].shape[0]
dt = 1/fsmp
t = np.linspace(0,echo_len//2,echo_len,endpoint=False)
# Create the matched filter
if (filter_pulse is None) and (filter_type.lower() == 'match'):
# If no filter pulse is given, use a rectangular pulse matching the length of the longest fixed pulse
tp = find_max_pulse_length(estr)
tp *= fsmp
AM,FM = np.zeros((2,echo_len))
AM[echo_len//2-tp//2:echo_len//2+tp//2] = 1
FM[echo_len//2-tp//2:echo_len//2+tp//2] = 0
FM_arg = 2*np.pi*cumulative_trapezoid(FM, initial=0) * dt
complex_shape = AM * (np.cos(FM_arg) +1j* np.sin(FM_arg))
filter_func = lambda dta, det_frq: match_filter_dc(dta,t,complex_shape,det_frq)
elif isinstance(filter_pulse,Pulse):
complex_shape = filter_pulse.build_shape(t)
filter_func = lambda dta, det_frq: match_filter_dc(dta,t,complex_shape,det_frq)
elif (filter_type.lower() == 'cheby2') or (filter_type.lower() == 'butter'):
if filter_width is None:
raise ValueError('You must provide a filter width for the cheby2 or butter filter')
filter_func = lambda dta, det_frq: scipy_filter_dc(dta,t,filter_width,det_frq,filter_type)
# Apply the matched filter
dta_c = np.apply_along_axis(sig.hilbert, 0, dta[0])
echo_len = dta_c.shape[0]
if exp_dim ==2:
dims = [len(dta_x[0]),len(dta_x[1])]
else:
dims = [len(dta_x[0])]
dta_c=dta_c.reshape(echo_len,*dims,order='F')
if isinstance(det_frqs_perc,np.ndarray) and (len(det_frq) > 1):
n_det_frqs = len(det_frqs_perc)
dta_filt_dc = np.array([np.apply_along_axis(filter_func, 0, np.take(dta_c,i,det_frq_dim+1), det_frqs_perc[i]) for i in range(n_det_frqs)])
dta_filt_dc = np.moveaxis(dta_filt_dc,0,det_frq_dim+1)
else:
dta_filt_dc = np.apply_along_axis(filter_func, 0, dta_c,det_frqs_perc)
peak_echo_idx = np.unravel_index(np.argmax(np.abs(dta_filt_dc).max(axis=0)),dims)
echo_pos = np.argmax(np.abs(dta_filt_dc[tuple([slice(None)]) + peak_echo_idx]))
dta_ev = dta_filt_dc[echo_pos,:]
if corr_phase is True:
dta_ang = np.angle(dta_ev)
if np.any(frq_change):
if exp_dim == 2:
corr_phase = dta_ang[..., peak_echo_idx[1]]
else:
corr_phase = dta_ang
else:
corr_phase = dta_ang[peak_echo_idx]
dta_ev = np.apply_along_axis(lambda x: x * np.exp(-1j * corr_phase), 0, dta_ev)
if sequence is None:
params = {'nAvgs': nAvgs, 'LO': estr['LO']+1.5, 'B': estr['B'],
'reptime': estr['reptime'], 'shots': estr['shots'], 'diglevel': dig_level}
axis = [];
for i in range(exp_dim):
if (dta_x[i].ndim == 1) or (dta_x[i].shape[1] == 1):
axis.append(dta_x[i])
else:
axis.append(dta_x[i][:, 0])
output = create_dataset_from_axes(dta_ev, axis, params)
else:
params = {'nAvgs': nAvgs,'diglevel': dig_level}
output = create_dataset_from_sequence(dta_ev, sequence,params)
return output
[docs]
def find_max_pulse_length(estr):
n_pulses = len(estr["events"])
tps = []
for i in range(n_pulses):
if 'pulsedef' in estr["events"][i].keys():
tps.append(estr["events"][i]['pulsedef']['tp'])
return max(tps)
[docs]
def digitally_upconvert(t,complex,fc):
upconvert = np.exp(1j*2*np.pi*fc*t)
return complex * upconvert
[docs]
def calc_percieved_freq(sampling_freq,fc):
return np.abs(fc - sampling_freq * np.around(sampling_freq/2))
[docs]
def match_filter_dc(pulse,t, win, offset_freq):
win_fc = digitally_upconvert(t,win,offset_freq)
filtered = sig.convolve(pulse,win_fc,mode='same')
filtered_dc = digitally_upconvert(t,filtered,-offset_freq)
return filtered_dc
[docs]
def scipy_filter_dc(dta,t,filter_width,offset_freq,filter_type='cheby2'):
if filter_type == 'cheby2':
filter_sos = sig.cheby2(10,40,(offset_freq-filter_width,offset_freq+filter_width),fs=2,btype="bandpass",output='sos')
elif filter_type == 'butter':
filter_sos = sig.butter(10,(offset_freq-filter_width,offset_freq+filter_width),fs=2,btype="bandpass",output='sos')
filtered = sig.sosfilt(filter_sos,dta)
filtered_dc = digitally_upconvert(t,filtered,-offset_freq)
return filtered_dc