from autodeer.sequences import Sequence
from autodeer.pulses import RectPulse, Delay, Detection
from autodeer.utils import gcd, val_in_ns, val_in_us, transpose_list_of_dicts
import numpy as np
import re
import time
import importlib
from autodeer import __version__
import os
import logging
[docs]
MODULE_DIR = importlib.util.find_spec('autodeer').submodule_search_locations[0]
[docs]
hw_log = logging.getLogger('interface.Xepr')
header += "; " + \
f"Auto-generated PulseSpel {{}} file by autoDEER {__version__}\n"
header += "; " + "-" * 79 + "\n"
# =============================================================================
[docs]
class PSPhaseCycle:
def __init__(self, sequence, MPFU=None, OnlyDet=False) -> None:
BPhaseCycles = []
if MPFU is not None:
pulse_dicts = self._MPFU(sequence, MPFU)
else:
pulse_dicts = self._main(sequence)
detect_dicts =self._detect(sequence)
if not OnlyDet:
BPhaseCycles += pulse_dicts
BPhaseCycles += detect_dicts
[docs]
self.BPhaseCycles = BPhaseCycles
self.__str__()
pass
[docs]
def _MPFU(self, sequence, MPFU):
BPhaseCycles = []
for i,pulse in enumerate(sequence.pulses):
if type(pulse) == Delay:
continue
elif type(pulse) == Detection:
continue
dict = {}
dict["Pulse"] = i
dict["Cycle"] = []
if pulse.pcyc["Channels"] == "ELDOR":
dict["Cycle"].append("ELDOR")
else:
for j in pulse.pcyc["Channels"]:
dict["Cycle"].append(MPFU[j])
BPhaseCycles.append(dict)
return BPhaseCycles
[docs]
def _main(self, sequence):
num_pulses = len(sequence.pcyc_vars)
num_cycles = sequence.pcyc_dets.shape[0]
BPhaseCycles = []
# Normal pulses
for i in range(0, num_pulses):
dict = {}
dict["Pulse"] = sequence.pcyc_vars[i]
dict["Cycle"] = []
if sequence.pulses[i].pcyc["Channels"] == "ELDOR":
dict["Cycle"].append("ELDOR")
else:
for j in range(0, num_cycles):
phase = sequence.pcyc_cycles[j, i]
norm_phase = (phase / np.pi) % 2
if norm_phase == 0:
B_phase = "+x"
elif norm_phase == 1:
B_phase = "-x"
elif norm_phase == 0.5:
B_phase = "+y"
elif norm_phase == 1.5:
B_phase = "-y"
dict["Cycle"].append(B_phase)
BPhaseCycles.append(dict)
return BPhaseCycles
[docs]
def _detect(self, sequence):
num_cycles = sequence.pcyc_dets.shape[0]
BPhaseCycles = []
asg_dict = {"Cycle": [], "Pulse": 'a'}
bsg_dict = {"Cycle": [], "Pulse": 'b'}
for j in range(0, num_cycles):
phase = sequence.pcyc_dets[j]
if phase == 1:
a_phase = "+a"
b_phase = "+b"
elif phase == -1:
a_phase = "-a"
b_phase = "-b"
elif phase == 1j:
a_phase = "+a"
b_phase = "-b"
elif phase == -1j:
a_phase = "-a"
b_phase = "+b"
asg_dict["Cycle"].append(a_phase)
bsg_dict["Cycle"].append(b_phase)
BPhaseCycles.append(asg_dict)
BPhaseCycles.append(bsg_dict)
return BPhaseCycles
[docs]
def __str__(self) -> str:
full_str = "begin lists \"auto\" \n"
pcyc_hash = {}
for i, cycle_dict in enumerate(self.BPhaseCycles):
cycle = cycle_dict["Cycle"]
if type(cycle_dict["Pulse"]) != str:
pcyc_hash[cycle_dict["Pulse"]] = f"ph{i+1}"
line_str = f"ph{i+1} "
elif cycle_dict["Pulse"] == 'a':
line_str = f"asg1 "
elif cycle_dict["Pulse"] == 'b':
line_str = f"bsg1 "
for phase in cycle:
line_str += f"{phase} "
line_str += "\n"
full_str += line_str
full_str += "end lists \n"
self.pcyc_hash = pcyc_hash
return full_str
# =============================================================================
[docs]
class PSparvar:
def __init__(self, sequence: Sequence, id) -> None:
progTable = sequence.progTable
progTable_n = len(progTable["EventID"])
parvar = {}
parvar["variables"] = []
parvar["types"] = []
parvar["vec"] = []
parvar["step"] = []
for i in range(0, progTable_n): # Loop over all progressions
if progTable["axID"][i] == id: # Ignore all progressions that aren't for this axis
pulse_num = progTable["EventID"][i]
var = progTable["Variable"][i]
vec = progTable["axis"][i]
parvar["variables"].append(var)
if var == 'B':
continue
if var == 't':
#add this to the assoicated delay
parvar["types"].append("d")
self.events.append(pulse_num)
elif var == 'reptime':
parvar["types"].append('srt')
self.events.append('srt')
elif not isinstance(sequence.pulses[pulse_num], Detection):
parvar["types"].append("p")
self.events.append(pulse_num)
parvar["vec"].append(vec)
if len(np.unique(np.diff(vec))) != 1:
self.PulseSpel = False
parvar["step"].append(0)
else:
parvar["step"].append(np.unique(np.diff(vec))[0])
if "B" in parvar["variables"]:
self.Bsweep = True
else:
self.Bsweep = False
if "reptime" in parvar["variables"]:
self.repsweep = True
else:
self.repsweep = False
if len(parvar['step']) == 0:
self.ax_step = None
self.init = None
self.dim = None
self.parvar = parvar
else:
self.ax_step = parvar["step"][0]
self.init = parvar["vec"][0][0]
self.dim = parvar["vec"][0].shape[0]
self.parvar = parvar
pass
[docs]
def checkPulseSpel(self) -> bool:
"""Checks if parvar can be run in PulseSpel.\\
Criteria:
1) Only pulse/delay lengths changing
2) Constant integer step changes
Returns
-------
bool
_description_
"""
[docs]
possible_delays = [
"d1", "d2", "d3", "d4", "d5", "d6", "d7", "d8", "d10", "d11", "d12",
"d13", "d14", "d15", "d16", "d17", "d18", "d19", "d20", "d21", "d22",
"d23", "d24", "d25", "d26", "d27", "d28", "d29", "d30"]
[docs]
possible_vars = ['a','b','c','e']
[docs]
possible_pulses = [
"p0", "p1", "p2", "p3", "p4", "p5", "p6", "p7", "p8", "p9", "p10"]
[docs]
def convert_progtable(progtable):
"""
This function reformats the progtable to be compatible with the Bruker_tools.
This is done by converting the axis of each moving pulse to be relative to the previous moving pulse.
"""
n_entries = len(progtable['EventID'])
uaxes = np.unique(progtable['axID'])
n_uaxes = len(uaxes)
for ax in uaxes:
base_axis = None
total_axis = None
for i in range(n_entries):
if progtable['axID'][i] != ax:
continue
if progtable['Variable'][i] != 't':
continue
if base_axis is None:
base_axis = progtable['axis'][i]
total_axis = base_axis
else:
new_axis = progtable['axis'][i] - total_axis
total_axis = progtable['axis'][i]
progtable['axis'][i] = new_axis
return progtable
[docs]
def calc_rel_positions(sequence):
"""
Calcuates the starting relative positions of all pulses in a sequence.
"""
n_pulses = len(sequence.pulses)
positions = np.zeros(n_pulses)
for i,pulse in enumerate(sequence.pulses):
positions[i] = pulse.t.value
rel_positions = np.diff(positions,prepend=0)
return rel_positions
# =============================================================================
[docs]
class PulseSpel:
def __init__(self, sequence, d0, MPFU=None, AWG=False) -> None:
self._check_sequence(sequence)
self.convert_progtable(sequence.progTable)
[docs]
self.possible_delays = possible_delays.copy()
[docs]
self.possible_pulses = possible_pulses.copy()
[docs]
self.possible_vars = possible_vars.copy()
[docs]
self.sequence = sequence
# Build hash of initial pulses
# For every event and delay is created immediately prior to the event
rel_positions = calc_rel_positions(sequence)
for id, pulse in enumerate(sequence.pulses):
str = self._new_delay(f"{id}d")
self._addDef(f"{str} = {int(rel_positions[id])}")
if type(pulse) == Detection:
self.var_hash[id] = "pg"
self._addDef(f"pg = {int(pulse.tp.value)}")
else:
str = self._new_pulse(id)
self._addDef(f"{str} = {int(pulse.tp.value)}")
self._addDef(f"h = {sequence.shots.value}")
self._addDef(f"n = {sequence.averages.value}")
self._addDef(f"srt = {sequence.reptime.value:.0f} * srtu")
self._addDef(f"d0 = {d0:.0f} ")
# Build table of parvars
unique_parvars = np.unique(sequence.progTable["axID"])
for i in unique_parvars:
parvar = PSparvar(sequence, i)
self.parvars.append(parvar)
# TODO: Add checker that the only the first two parvars are bruker
# compatible.
# Incremented variables need both a placeholder variable and a stepper.
step_hash = {}
place_hash = {}
for i, parvar in enumerate(self.parvars):
for j, event in enumerate(parvar.events):
if event == 'srt': #srt loop
str = self._new_var('srt')
self._addDef(f"{str} = {parvar.parvar['step'][j]} * srtu")
str = self._new_var('srt_step')
step_hash[event] = str
self._addDef(f"{str} = {parvar.parvar['step'][j]} * srtu")
place_hash[event] = 'srt'
elif parvar.parvar['types'][j] == 'p': #pulse loop
str = self._new_delay(f"{event}_step")
step_hash[f"{event}p"] = str
self._addDef(f"{str} = {parvar.parvar['step'][j]}")
str = self._new_pulse(f"{event}_place")
place_hash[f"{event}p"] = str
elif parvar.parvar['types'][j] == 'd': #delay loop
str = self._new_delay(f"{event}_step")
step_hash[f"{event}d"] = str
self._addDef(f"{str} = {parvar.parvar['step'][j]}")
str = self._new_delay(f"{event}_place")
place_hash[f"{event}d"] = str
dim_step = self._new_delay(f"dim{i}_step")
self._addDef(f"{dim_step} = {parvar.ax_step}")
self._addDef(f"d{20+i} = {parvar.init}")
self.dims.append(parvar.dim)
# Build experiment file
if self.AWG:
self.pcyc_hash = {}
id = -1
for pulse_num,pulse in enumerate(sequence.pulses):
if (type(pulse) is Delay):
continue
elif (type(pulse) is Detection):
continue
id += 1
awg_id = self._addAWGPulse(sequence,pulse_num, id)
self.pcyc_hash[pulse_num] = awg_id
pcyc = PSPhaseCycle(self.sequence, MPFU=None, OnlyDet=True)
self.pcyc_str = pcyc.__str__() + self.pcyc_str
else:
self._addPhaseCycle()
for i in place_hash.keys():
self._addExp(f"{place_hash[i]} = {self.var_hash[i]}")
if self.parvars[0].Bsweep:
self._addExp(f"bsweep x=1 to sx")
elif self.parvars[0].repsweep:
self._addExp(f"for x=1 to sx")
else:
self._addExp(f"sweep x=1 to sx")
self._addExp(f"shot i=1 to h")
self._addExp(f"d9") # Just a stupid uncessary variable
# # Read pulse sequence with phase cycle
for id, pulse in enumerate(sequence.pulses):
# if id in place_hash:
# pulse_str = place_hash[id]
# else:
# pulse_str = self.var_hash[id]
# Add delays
if f"{id}d" in place_hash:
delay_str = place_hash[f"{id}d"]
else:
delay_str = self.var_hash[f"{id}d"]
self._addExp(f"{delay_str}")
# Add pulses
if f"{id}p" in place_hash:
pulse_str = place_hash[f"{id}p"]
else:
pulse_str = self.var_hash[id]
if id in self.pcyc_hash:
self._addExp(f"{pulse_str} [{self.pcyc_hash[id]}]")
elif type(pulse) == Detection:
self._addExp(f"d0\nacq [sg1]")
else:
self._addExp(f"{pulse_str}")
self._addExp(f"next i") # End of shots loop
for i in place_hash.keys():
self._addExp(f"{place_hash[i]} = {place_hash[i]} + {step_hash[i]}")
if not self.parvars[0].Bsweep:
self._addExp(f"dx = dx + {dim_step}")
self._addExp(f"next x") # End of scan loop
self._addScanLoop()
self._cmpl_Exp()
self._cmpl_def()
pass
[docs]
def _new_delay(self, key):
str = self.possible_delays.pop(0)
self.var_hash[key] = str
return str
[docs]
def _new_var(self, key):
str = self.possible_vars.pop(0)
self.var_hash[key] = str
return str
[docs]
def _new_pulse(self, key):
str = self.possible_pulses.pop(0)
self.var_hash[key] = str
return str
[docs]
def save(self, filepath):
fdef_path = f"{filepath}.def"
fexp_path = f"{filepath}.exp"
with open(fdef_path, "w") as fdef:
fdef.write(header.format("definition"))
fdef.write(self.def_file_str)
with open(fexp_path, "w") as fexp:
fexp.write(header.format("experiment"))
fexp.write(self._ExpDefs())
fexp.write("\n"*3)
fexp.write(self.pcyc_str)
fexp.write("\n"*3)
fexp.write(self.exp_file_str)
pass
[docs]
def _addDef(self, str):
self.def_file_str += str
self.def_file_str += "\n"
[docs]
def _addExp(self, str):
self.exp_file_str += str
self.exp_file_str += "\n"
[docs]
def _ExpDefs(self):
str = "begin defs \n"
if len(self.dims) == 1:
str += f"dim s[{self.dims[0]}] \n"
elif len(self.dims) == 2:
str += f"dim s[{self.dims[0]},{self.dims[1]}] \n"
str += "end defs \n"
return str
[docs]
def _addScanLoop(self):
self.exp_file_str = "for k=1 to n\n" + self.exp_file_str
self.exp_file_str += "next k \n"
pass
[docs]
def _addPhaseCycle(self):
self.pcyc = PSPhaseCycle(self.sequence, self.MPFU)
self.pcyc_hash = self.pcyc.pcyc_hash
self.pcyc_str = self.pcyc.__str__()
[docs]
def _addAWGPulse(self, sequence, pulse_num, id):
awg_id = id
pulse=sequence.pulses[pulse_num]
if type(pulse) is RectPulse:
shape = 0
init_freq = pulse.freq.value
final_freq = init_freq
if hasattr(pulse,"scale"):
amplitude = pulse.scale.value
else:
amplitude = 0
def add_AWG_line(elements, comment=None, indent=2, repeat=1):
string = ""
string += " "*indent
for i in range(0,repeat):
if isinstance(elements,list) or isinstance(elements,np.ndarray):
for ele in elements:
string += f"{ele:<4} "
else:
string += f"{elements:<4} "
if comment is not None:
string += "; " + comment
string += "\n"
return string
string = ""
# phase = np.round(np.degrees(pulse.pcyc["Phases"]))
phase = np.round(np.degrees(sequence.pcyc_cycles[:,sequence.pcyc_vars.index(pulse_num)]))
num_cycles = len(phase)
string += f"begin awg{awg_id}\n"
string += add_AWG_line(
init_freq, repeat=num_cycles, comment="Initial Frequency [MHz]")
string += add_AWG_line(
final_freq, repeat=num_cycles, comment="Final Frequency [MHz]")
string += add_AWG_line(
phase, repeat=1, comment="Phase")
string += add_AWG_line(
amplitude, repeat=num_cycles, comment="Amplitude")
string += add_AWG_line(
shape, repeat=num_cycles, comment="Shape")
string += f"end awg{awg_id}\n"
self.pcyc_str += string
return f"awg{id}"
[docs]
def _check_sequence(self, sequence):
for i, pulse in enumerate(sequence.pulses):
if (type(pulse) is not Delay):
continue
elif (type(pulse) is not Detection):
continue
elif pulse.scale is None:
raise RuntimeError(f"Missing scale in pulse {i}")
[docs]
def _cmpl_Exp(self):
self.exp_file_str = f"begin exp \"{'auto'}\" [INTG QUAD]\n" +\
self.exp_file_str
self.exp_file_str += "end exp\n"
[docs]
def _cmpl_def(self):
self.def_file_str = f"begin defs \n\n" +\
self.def_file_str
self.def_file_str += "end defs\n"
[docs]
def __str__(self) -> str:
output_str = "DEF File \n" + "#"*79 + "\n"
output_str += self.def_file_str
output_str += "#"*79 + "\n"
output_str += "EXP File \n" + "#"*79 + "\n"
output_str += self._ExpDefs() + "\n \n" + self.pcyc_str + "\n \n" + self.exp_file_str
return output_str
# =============================================================================
# Functions
# =============================================================================
[docs]
def run_general(
api, ps_file: tuple, exp: tuple, settings: dict, variables: dict,
run: bool = True) -> None:
"""A function to run a general Pulse Spel experiment through autoDeer.
Parameters
----------
api : _type_
The current Bruker Xepr class
ps_file : tuple
A tuple containing the file path to both the ".exp" and ".def" files.
exp : tuple
A tuple giving the name of the experiment and phase cycle.
settings : dict
A dictionary containing possible acquisition settings. Options include
['ReplaceMode','PhaseCycle','Acquisition_mode']
variables : dict
A dictionary containg pulse spel variables to choose from can, these
can also be dimension of experiment.
run : bool, optional
Should the experiment run or just compile, by default True
Raises
------
ValueError
If an input is of the wrong type.
"""
if os.path.isfile(ps_file[0]):
if os.path.isabs(ps_file[0]):
base_path = ""
else:
base_path = MODULE_DIR
if len(ps_file) == 1:
# Assuming that both the EXP file and the DEF file have the same
# name bar-extention
exp_file = base_path + ps_file[0] + ".exp"
def_file = base_path + ps_file[0] + ".def"
elif len(ps_file) == 2:
# EXP and DEF file have seperate name
exp_file = base_path + ps_file[0] + ".exp"
def_file = base_path + ps_file[1] + ".def"
else:
raise ValueError(
"ps_file must be of form ['EXP file'] or ['EXP file','DEF file']")
with open(exp_file, "r") as exp_file:
exp_text = exp_file.read()
with open(def_file, "r") as def_file:
def_text = def_file.read()
else:
exp_text = ps_file[0]
def_text = ps_file[1]
# # Identifying a dimension change in settings
# r = re.compile("dim([0-9]*)")
# match_list: list = list(filter(
# lambda list: list is not None, [r.match(i) for i in variables.keys()]))
# if len(match_list) >= 1:
# for i in range(0, len(match_list)):
# key = match_list[i][0]
# dim = int(r.findall(key)[0])
# new_length = int(variables[key])
# change_dimensions(exp_file, dim, new_length)
verbMsgParam = api.cur_exp.getParam('*ftEPR.PlsSPELVerbMsg')
plsSPELCmdParam = api.cur_exp.getParam('*ftEPR.PlsSPELCmd')
api.XeprCmds.aqPgSelectBuf(1)
api.cur_exp.getParam('*ftEpr.PlsSPELGlbTxt').value = def_text
api.XeprCmds.aqPgShowDef()
plsSPELCmdParam.value=3
while not "The variable values are set up" in verbMsgParam.value:
time.sleep(0.1)
api.XeprCmds.aqPgSelectBuf(2)
api.cur_exp.getParam('*ftEpr.PlsSPELPrgTxt').value = exp_text
plsSPELCmdParam.value=7
while not "Second pass ended" in verbMsgParam.value:
time.sleep(0.1)
if "ReplaceMode" in settings:
api.set_ReplaceMode(settings["ReplaceMode"])
else:
api.set_ReplaceMode(False)
if "PhaseCycle" in settings:
api.set_PhaseCycle(settings["PhaseCycle"])
else:
api.set_PhaseCycle(True)
if "Acquisition_mode" in settings:
api.set_Acquisition_mode(settings["Acquisition_mode"])
else:
api.set_Acquisition_mode(1)
for var in variables:
api.set_PulseSpel_var(var.lower(), variables[var])
api.set_PulseSpel_experiment(exp[0])
api.set_PulseSpel_phase_cycling(exp[1])
# Run Experiment
if run is True:
api.run_exp()
time.sleep(1)
pass
# =============================================================================
[docs]
def change_dimensions(path, dim: int, new_length):
"""A function to rewrite a pulseSpel experiment file with a new dimension
Parameters
----------
path : str
The full file path.
dim : int
The experiment number that needs to be changed
new_length : int
The new length can be a list of two if 2D.
Raises
------
ValueError
If there more than 2 dimesnions are supplied. Xepr can not handle 3+D
experiments.
"""
# dim_str = "dim" + str(int(dim))
# Open file
with open(path, 'r') as file:
data = file.readlines()
# Build Regular expression to search for
# This identifies the correct line and extracts the comment.
re_search = fr"dim{int(dim)}\s*s\[[0-9]+,*[0-9]*\]"
if type(new_length) == int:
new_string = f"dim{int(dim)} s[{int(new_length)}]"
elif type(new_length) == list:
if len(new_length) == 1:
new_string = f"dim{int(dim)} s[{int(new_length[0])}]"
elif len(new_length) == 2:
new_string = \
f"dim{int(dim)} s[{int(new_length[0])},{int(new_length[1])}]"
else:
raise ValueError("New length can't have more than 2 dimensions")
else:
raise ValueError("new_length must be either an int or a list")
data = [re.sub(re_search, new_string, line) for line in data]
with open(path, 'w') as file:
file.writelines(data)
[docs]
def _addAWGPulse(sequence, pulse_num, id, pcyc_str,amp_var=None):
awg_id = id
pulse=sequence.pulses[pulse_num]
if type(pulse) is RectPulse:
shape = 0
init_freq = pulse.freq.value
final_freq = init_freq
if amp_var is not None:
amplitude = amp_var
else:
if hasattr(pulse,"scale"):
amplitude = pulse.scale.value
else:
amplitude = 0
def add_AWG_line(elements, comment=None, indent=2, repeat=1):
string = ""
string += " "*indent
for i in range(0,repeat):
if isinstance(elements,list) or isinstance(elements,np.ndarray):
for ele in elements:
string += f"{ele:<4} "
else:
string += f"{elements:<4} "
if comment is not None:
string += "; " + comment
string += "\n"
return string
string = ""
# phase = np.round(np.degrees(pulse.pcyc["Phases"]))
phase = np.round(np.degrees(sequence.pcyc_cycles[:,sequence.pcyc_vars.index(pulse_num)]))
num_cycles = len(phase)
string += f"begin awg{awg_id}\n"
string += add_AWG_line(
init_freq, repeat=num_cycles, comment="Initial Frequency [MHz]")
string += add_AWG_line(
final_freq, repeat=num_cycles, comment="Final Frequency [MHz]")
string += add_AWG_line(
phase, repeat=1, comment="Phase")
string += add_AWG_line(
amplitude, repeat=num_cycles, comment="Amplitude")
string += add_AWG_line(
shape, repeat=num_cycles, comment="Shape")
string += f"end awg{awg_id}\n"
pcyc_str += string
return pcyc_str, f"awg{id}"
[docs]
def get_arange(array):
if np.ndim(array) > 1:
raise ValueError("The array must be 1D")
array = np.around(array,6)
unique_diff = np.unique(np.diff(array))
step = unique_diff[0]
start = array[0]
stop = array[-1]
return start,stop+step, step
[docs]
def build_unique_progtable(seq):
progtable = seq.progTable
unique_axs = np.unique(progtable["axID"])
u_progtable = []
d_shifts=[]
for axID in unique_axs:
axes = []
variables = []
for i,j in enumerate(progtable["axID"]):
if axID != j:
continue
axes.append(progtable["axis"][i])
variables.append((progtable["EventID"][i], progtable["Variable"][i]))
if np.any([a[1]=='scale' for a in variables]):
axes = [ax * 100 for ax in axes]
# Find unique axis
steps = []
for axis in axes:
start,_, step = get_arange(axis)
steps.append(step)
if np.any(steps):
if np.all(np.mod(steps,1) == 0):
common_step = gcd(steps)
else:
common_step = np.min(steps)
# idx = np.argmin(steps)
# start = get_arange(axes[idx])
multipliers = np.array(steps)/common_step
else:
common_step = 0
multipliers = np.ones(len(steps))
start = axes[0][0]/multipliers[0]
index = progtable["axID"].index(axID)
common_axis={"start":start,"dim":np.shape(axes[0])[0],"step":common_step,"reduce":progtable["reduce"][index]}
n_steps = len(steps)
vars = [{"variable": variables[i],"multiplier":multipliers[i]} for i in range(n_steps)]
n_pulses = len(seq.pulses)
t_shift = np.zeros(n_pulses)
for var,mul in zip(variables,multipliers):
if var[1] != 't':
continue
t_shift[var[0]] = mul
u_progtable.append({"axID":axID,"axis":common_axis,"variables":vars, "delay_shifts": np.diff(t_shift,prepend=0)})
return u_progtable
# def _addAWGPulse(self, sequence, pulse_num, id):
# awg_id = id
# pulse=sequence.pulses[pulse_num]
# if type(pulse) is RectPulse:
# shape = 0
# init_freq = pulse.freq.value
# final_freq = init_freq
# if hasattr(pulse,"scale"):
# amplitude = pulse.scale.value
# else:
# amplitude = 0
# def add_AWG_line(elements, comment=None, indent=2, repeat=1):
# string = ""
# string += " "*indent
# for i in range(0,repeat):
# if isinstance(elements,list) or isinstance(elements,np.ndarray):
# for ele in elements:
# string += f"{ele:<4} "
# else:
# string += f"{elements:<4} "
# if comment is not None:
# string += "; " + comment
# string += "\n"
# return string
# string = ""
# # phase = np.round(np.degrees(pulse.pcyc["Phases"]))
# phase = np.round(np.degrees(sequence.pcyc_cycles[:,sequence.pcyc_vars.index(pulse_num)]))
# num_cycles = len(phase)
# string += f"begin awg{awg_id}\n"
# string += add_AWG_line(
# init_freq, repeat=num_cycles, comment="Initial Frequency [MHz]")
# string += add_AWG_line(
# final_freq, repeat=num_cycles, comment="Final Frequency [MHz]")
# string += add_AWG_line(
# phase, repeat=1, comment="Phase")
# string += add_AWG_line(
# amplitude, repeat=num_cycles, comment="Amplitude")
# string += add_AWG_line(
# shape, repeat=num_cycles, comment="Shape")
# string += f"end awg{awg_id}\n"
# self.pcyc_str += string
# return f"awg{id}"
[docs]
def check_variable(var:str, uprog):
if isinstance(uprog,list):
for up in uprog:
if check_variable(var,up):
return True
return False
else:
for i in range(len(uprog['variables'])):
if uprog['variables'][i]["variable"][1] == var:
return True
return False
[docs]
def determine_TWT_split(sequence, MaxGate):
n_pulses = len(sequence.pulses)
uprogtable = build_unique_progtable(sequence)
TWT_gate_max = MaxGate *1e3
try:
idx = transpose_list_of_dicts(uprogtable[0]['variables'])['variable'].index((n_pulses-1, 't'))
except ValueError:
return False
multiplier = uprogtable[0]['variables'][idx]['multiplier']
axis = uprogtable[0]['axis']
max_time = axis['start'] + axis['step'] * axis['dim'] * multiplier
if max_time > TWT_gate_max:
hw_log.info('Time too long for TWT gate, need to split the experiment')
split_point = int(0.75 * TWT_gate_max / max_time * axis['dim'])
return split_point
else:
return False
[docs]
def write_pulsespel_file(sequence,d0, AWG=False, MPFU=False,MaxGate=40):
"""Write the pulsespel file for a given sequence.
Parameters
----------
sequence : Sequence
The sequence class to be converted.
AWG : bool, optional
Is this a pulse spel file for an AWG spectrometer, by default False
MPFU : list, optional
A list of MPFU channels, by default False
MaxGate : float, optional
The maximum gate time for the TWT gate in microseconds, by default 40 us
Returns
-------
str
The string for the definition file
str
The string for the experiment file
"""
split_point = determine_TWT_split(sequence,MaxGate=40)
uprogtable = build_unique_progtable(sequence)
possible_delays = [
"d1", "d2", "d3", "d4", "d5", "d6", "d7", "d8", "d10", "d11", "d12",
"d13", "d14", "d15", "d16", "d17", "d18", "d19", "d20", "d21", "d22",
"d23", "d24", "d25", "d26", "d27", "d28", "d29", "d30"]
possible_pulses= ["p0", "p1","p2", "p3", "p4", "p5", "p6", "p7", "p8", "p9"]
loop_iterators = ["i","j"]
loop_dims = ["m","f"]
letter_vars = ['b','c','e','f','g']
possible_amps = ["aa1","aa2","aa3","aa4","aa5","aa6","aa7","aa8","aa9","aa10",
"ab1","ab2","ab3","ab4","ab5","ab6","ab7","ab8","ab9","ab10",
"ac1","ac2","ac3","ac4","ac5","ac6","ac7","ac8","ac9","ac10"]
n_pulses = len(sequence.pulses)
n_axes = len(uprogtable)
static_delay_hash = {0:"d9"}
static_pulse_hash = {}
mv_delay_hash = {}
mv_pulse_hash = {}
axis_start_hash = {}
axis_step_hash = {}
axis_val_hash = {}
mv_pulse_amp_hash = {}
static_pulse_amp_hash = {}
axs_ids = []
scanhead = "begin exp \"auto\" [INTG QUAD] \n"
foot = "end exp"
def_file = "begin defs \n"
dims = "begin defs \n dim s["
pcyc_str = ""
# Add shot repetition time
if not check_variable("reptime", uprogtable):
def_file += f"srt = {val_in_us(sequence.reptime, False):.0f} * srtu\n"
def_file += f"d0 = {d0:.0f}\n"
prev_pulse = None
for i,pulse in enumerate(sequence.pulses):
static_delay_hash[i] = possible_delays.pop()
if prev_pulse is None:
def_file += f"{static_delay_hash[i]} = {pulse.t.value}\n"
else:
def_file += f"{static_delay_hash[i]} = {pulse.t.value - prev_pulse.t.value}\n"
if type(pulse) is Detection:
def_file += f"pg = {pulse.tp.value}\n"
else:
static_pulse_hash[i] = possible_pulses.pop()
def_file += f"{static_pulse_hash[i]} = {pulse.tp.value}\n"
if AWG and (not isinstance(pulse,Detection)):
static_pulse_amp_hash[i] = possible_amps.pop()
def_file += f"{static_pulse_amp_hash[i]} = {int(pulse.scale.value * 100):d}\n"
prev_pulse = pulse
for axis in uprogtable:
ax_ID = axis["axID"]
axs_ids.append(ax_ID)
axis_start_hash[ax_ID] = possible_delays.pop()
axis_step_hash[ax_ID] = possible_delays.pop()
axis_val_hash[ax_ID] = possible_delays.pop()
def_file += f"{axis_step_hash[ax_ID]} = {axis['axis']['step']}\n"
if not axis['axis']['reduce']:
dims += f"{axis['axis']['dim']},"
axs_ids.sort(reverse=True)
reduced_axes = np.array([uprogtable[i]["axis"]["reduce"] for i in range(n_axes)])
n_reduced = np.sum(reduced_axes)
if n_axes - n_reduced == 1:
kept_axes_param = ["x"]
elif n_axes - n_reduced == 2:
kept_axes_param = ["x","y"]
# elif n_axes - n_reduced == 0:
# raise RuntimeWarning("Transient experiments are not currently supported")
# elif n_axes - n_reduced > 2:
# raise RuntimeWarning("A maximum of 2 non reduced axes are allowed.")
# Setup averaging loop
scanhead += "for k=1 to m \ntotscans(m) \n"
def_file += f"m = {sequence.averages.value}\n"
scanfoot = "scansdone(k) \nnext k \n"+foot
foot = ""
head = ""
delay_build = ""
for ax in axs_ids:
index = [uprogtable[i]['axID'] for i in range(len(uprogtable))].index(ax)
uprog = uprogtable[index]
if check_variable("B", uprog):
reduce = False # You can not reduce a Bsweep
loop_str = "bsweep {param}=1 to {stop} \n"
else:
reduce = uprogtable[index]['axis']["reduce"]
if ax == 0 and not check_variable("reptime", uprog):
loop_str = "sweep {param}=1 to {stop} \n"
else:
loop_str = "for {param}=1 to {stop} \n"
if reduce:
param = loop_iterators.pop()
else:
param = kept_axes_param.pop()
foot = f"next {param} \n" + foot
if param == 'x':
stop = 'sx'
foot = f"dx=dx+{axis_step_hash[ax]}\n"+ foot
elif param == 'y':
stop = 'sy'
foot = f"dy=dy+{axis_step_hash[ax]}\n"+ foot
else:
stop = loop_dims.pop()
def_file += f"{stop} = {uprog['axis']['dim']}\n"
if check_variable("t", uprog):
changing_pulses = uprog["delay_shifts"].nonzero()[0]
for i in changing_pulses:
if i not in mv_delay_hash:
mv_delay_hash[i] = possible_delays.pop()
# delay_build = f"{mv_delay_hash[i]} = {static_delay_hash[i]}\n" + delay_build
scanhead += f"{mv_delay_hash[i]} = {static_delay_hash[i]}\n"
for k in range(int(np.abs(uprog["delay_shifts"][i]))):
if uprog["delay_shifts"][i] > 0:
sign = "+"
else:
sign = "-"
# foot = f"{axis_val_hash[ax]}={axis_val_hash[ax]}{sign}{axis_step_hash[ax]}\n"+ foot
foot = f"{mv_delay_hash[i]}={mv_delay_hash[i]}{sign}{axis_step_hash[ax]}\n"+ foot
# delay_build += f"{mv_delay_hash[i]} = {mv_delay_hash[i]} + {axis_val_hash[ax]} \n"
if check_variable("tp", uprog):
for var in uprog["variables"]:
if var["variable"][1] != "tp":
continue
pulse_id = var["variable"][0]
mult = var["multiplier"]
mv_pulse_hash[pulse_id] = possible_pulses.pop()
scanhead += f"{mv_pulse_hash[pulse_id]} = {static_pulse_hash[pulse_id]}\n"
if mult > 0:
sign = "+"
else:
sign = "-"
for k in range(int(np.abs(mult))):
foot = f"{mv_pulse_hash[pulse_id]}={mv_pulse_hash[pulse_id]}{sign}{axis_step_hash[ax]}\n"+ foot
if check_variable("reptime", uprog):
for var in uprog["variables"]:
if var["variable"][1] != "reptime":
continue
reptime0_var = letter_vars.pop()
scanhead += f"srt = {reptime0_var}\n"
axis = val_in_us(sequence.reptime, True)
def_file += f"{reptime0_var} = {axis[0]:.0f} * srtu\n"
srt_step_var = letter_vars.pop()
def_file += f"{srt_step_var} = {var['multiplier']*uprog['axis']['step']:.0f} * srtu\n"
foot = f"srt=srt + {srt_step_var}\n"+ foot
if check_variable("scale", uprog):
for var in uprog["variables"]:
if var["variable"][1] != "scale":
continue
pulse_id = var["variable"][0]
mult = var["multiplier"]
mv_pulse_amp_hash[pulse_id] = possible_amps.pop()
scanhead += f"{mv_pulse_amp_hash[pulse_id]} = {static_pulse_amp_hash[pulse_id]}\n"
if mult > 0:
sign = "+"
else:
sign = "-"
for k in range(int(np.abs(mult))):
foot = f"{mv_pulse_amp_hash[pulse_id]}={mv_pulse_amp_hash[pulse_id]}{sign}{axis_step_hash[ax]}\n"+ foot
# head += f"{axis_val_hash[ax]} = 0\n"
head += loop_str.format(param=param,stop=stop)
# Add moving pulses
# Add shot loop
head += delay_build
head += f"shot i=1 to h\n"
def_file += f"h = {sequence.shots.value}\n"
foot = "next i\n" + foot
if AWG:
pcyc_hash = {}
id = -1
for pulse_num,pulse in enumerate(sequence.pulses):
if (type(pulse) is Detection):
continue
id += 1
amp_var = mv_pulse_amp_hash[pulse_num] if pulse_num in mv_pulse_amp_hash else static_pulse_amp_hash[pulse_num]
pcyc_str, awg_id = _addAWGPulse(sequence,pulse_num, id, pcyc_str,amp_var)
pcyc_hash[pulse_num] = awg_id
pcyc = PSPhaseCycle(sequence, MPFU=None, OnlyDet=True)
pcyc_str = pcyc.__str__() + "\n"*1 + pcyc_str
elif MPFU is not None:
pcyc = PSPhaseCycle(sequence, MPFU)
pcyc_hash = pcyc.pcyc_hash
pcyc_str = pcyc.__str__()
else:
pcyc = PSPhaseCycle(sequence, MPFU)
pcyc_hash = pcyc.pcyc_hash
pcyc_str = pcyc.__str__()
# Add Pulse Sequence
pulse_str = "d9\n"
for id, pulse in enumerate(sequence.pulses):
if id in mv_delay_hash:
pulse_str += f"{mv_delay_hash[id]}\n"
else:
pulse_str += f"{static_delay_hash[id]}\n"
if type(pulse) == Detection:
pulse_str += f"d0\nacq [sg1]\n"
else:
if id in mv_pulse_hash:
pulse_str += f"{mv_pulse_hash[id]} [{pcyc_hash[id]}]\n"
else:
pulse_str += f"{static_pulse_hash[id]} [{pcyc_hash[id]}]\n"
dims = dims[:-1]
dims += "] \nend defs\n"
if split_point:
exp_file = dims + "\n"*2 + pcyc_str + "\n"*2 + scanhead
exp_file += head.replace('sx','w') + pulse_str + foot + "\n"
exp_file += head.replace('x=1','x=w') + pulse_str + foot + "\n"
exp_file += scanfoot
def_file += f"w = {split_point}\n"
def_file += "end defs"
else:
exp_file = dims + "\n"*2 + pcyc_str + "\n"*2 + scanhead+ head + pulse_str + foot + scanfoot
def_file += "end defs"
return def_file, exp_file