Source code for pyepr.hardware.Bruker_MPFU

from pyepr.classes import Interface, Parameter
from pyepr.pulses import Delay, Detection, RectPulse
from pyepr.hardware.XeprAPI_link import XeprAPILink
from pyepr.hardware.Bruker_tools import PulseSpel, run_general,build_unique_progtable,PSPhaseCycle, write_pulsespel_file
from pyepr.sequences import Sequence, HahnEchoSequence
from pyepr.utils import save_file, transpose_list_of_dicts, transpose_dict_of_list, round_step
from pyepr import create_dataset_from_sequence, create_dataset_from_axes

import tempfile
import time
from scipy.optimize import minimize_scalar, curve_fit
import numpy as np
import os
from pathlib import Path
import datetime
from deerlab import correctphase
import matplotlib.pyplot as plt
import logging
# =============================================================================
[docs] hw_log = logging.getLogger('interface.Xepr')
class BrukerMPFU(Interface): """ Represents the interface for connecting to MPFU based Bruker ELEXSYS-II Spectrometers. """ def __init__(self, config_file:str) -> None: """An interface for connecting to MPFU based Bruker ELEXSYS-II Spectrometers. Getting Started ------------------ Before a connection can be made an appropriate configuration file first needs to be written. 1. Open Xepr 2. Processing -> XeprAPI -> Enable XeprAPI 3. `BrukerAWG.connect()` Parameters ---------- config_file : str The path to a YAML configuration file. Attributes ---------- bg_thread: None or threading.Thread If a background thread is needed, it is stored here. """ self.api = XeprAPILink(config_file) self.spec_config = self.api.config["Spectrometer"] self.bridge_config = self.api.spec_config["Bridge"] self.MPFU = self.bridge_config["MPFU Channels"] if self.MPFU == 'None': self.MPFU = None self.temp_dir = tempfile.mkdtemp("autoDEER") self.d0 = self.bridge_config["d0"] self.bg_thread = None self.bg_data = None self.cur_exp = None self.tuning = False self.savename = '' self.savefolder = str(Path.home()) self.setup_flag=False super().__init__() def connect(self, d0=None) -> None: self.api.connect() time.sleep(1) self.setup(d0) return super().connect() def setup(self,d0=None): self.api.hidden['BrPlsMode'].value = True self.api.hidden['OpMode'].value = 'Operate' self.api.hidden['RefArm'].value = 'On' self.api.set_attenuator('Main',0) self.api.cur_exp['VideoBW'].value = 20 if d0 is None: self.calc_d0() else: self.d0 = d0 self.api.hidden['Detection'].value = 'Signal' self.setup_flag = True pass def acquire_dataset(self): if self.bg_data is None: if not self.isrunning(): if (self.savename is not None) and (self.savename != ''): self.api.xepr_save(os.path.join(self.savefolder,self.savename)) data = self.api.acquire_dataset(self.cur_exp) else: data = self.api.acquire_scan(self.cur_exp) else: data = create_dataset_from_sequence(self.bg_data, self.cur_exp) return super().acquire_dataset(data) def _launch_complex_thread(self,sequence,axID=1,tune=True): uProgTable = build_unique_progtable(sequence) reduced_seq = sequence.copy() reduced_seq.progTable = transpose_list_of_dicts([transpose_dict_of_list(sequence.progTable)[0]]) uProgTable_py = uProgTable[axID] axis = uProgTable_py['axis'] reduced_seq.averages.value = 1 py_ax_dim = uProgTable_py['axis']['dim'] self.bg_data = np.zeros((uProgTable[0]['axis']['dim'],py_ax_dim),dtype=np.complex128) print("Initial PulseSpel Launch") self.launch(reduced_seq,savename='test',tune=tune, update_pulsespel=True, start=False,reset_bg_data=False,reset_cur_exp=False) self.terminate() variables = uProgTable_py['variables'] # print("Creating Thread") # # thread = threading.Thread(target=step_parameters,args=[self,reduced_seq,py_ax_dim,variables]) # # self.bg_thread = self.pool.submit(step_parameters, self,reduced_seq,py_ax_dim,variables) # self.bg_thread = Worker(step_parameters, self,reduced_seq,py_ax_dim,variables) # self.pool.start(self.bg_thread) # print("Started Thread") step_parameters(self,reduced_seq,py_ax_dim,variables) # thread.start() pass def launch(self, sequence: Sequence, savename: str, start=True, tune=True, MPFU_overwrite=None,update_pulsespel=True, reset_bg_data = True, reset_cur_exp=True,**kwargs): sequence.shift_detfreq_to_zero() if self.isrunning(): self.terminate(now=True) time.sleep(4) if reset_bg_data: self.bg_data = None if reset_cur_exp: timestamp = datetime.datetime.now().strftime(r'%Y%m%d_%H%M_') self.savename = timestamp+savename # self.savename = savename self.cur_exp = sequence # First check if the sequence is pulsespel compatible if not test_if_MPFU_compatability(sequence): print("Launching complex sequence") self._launch_complex_thread(sequence,1,tune) return None if self.MPFU is None: SPFU_flip_power,ELDOR_flip_angle = _SPFU_channels(sequence) if tune: self.tuning = True if ELDOR_flip_angle is not None: dif_freq=None for pulse in sequence.pulses: if isinstance(pulse,Detection) or isinstance(pulse,Delay): continue if pulse.pcyc["Channels"] != "ELDOR": continue elif dif_freq is None: dif_freq = pulse.freq.value elif pulse.freq.value != dif_freq: raise ValueError('Only one ELDOR frequency is possible') if ELDOR_flip_angle == np.inf: self.api.set_attenuator('ELDOR',0) self.api.set_ELDOR_freq(sequence.LO.value + dif_freq) else: ELDORtune(self,sequence,freq=dif_freq, MPFU=False) SPFUtune(self,sequence,SPFU_flip_power) self.tuning = False MPFU_chans=None else: channels = _MPFU_channels(sequence) # pcyc = PSPhaseCycle(sequence,self.MPFU) N_channels = len(channels) if N_channels > len(self.MPFU): raise RuntimeError( f"This sequence requires {N_channels} MPFU" "Channels." "Only {len(self.MPFU)} are avaliable on this spectrometer.") if tune: self.tuning = True if 'ELDOR' in channels: dif_freq=None for pulse in sequence.pulses: if pulse.freq.value == 0: continue elif dif_freq is None: dif_freq = pulse.freq.value elif pulse.freq.value != dif_freq: raise ValueError('Only one ELDOR frequency is possible') ELDORtune(self,sequence,freq=dif_freq) MPFUtune(self,sequence, channels) self.tuning = False if MPFU_overwrite is None: MPFU_chans = self.MPFU else: MPFU_chans = MPFU_overwrite PSpel_file = self.temp_dir + "/autoDEER_PulseSpel" if update_pulsespel: # PSpel = PulseSpel(sequence, MPFU=self.MPFU) def_text, exp_text = write_pulsespel_file(sequence,self.d0,False,MPFU_chans) verbMsgParam = self.api.cur_exp.getParam('*ftEPR.PlsSPELVerbMsg') plsSPELCmdParam = self.api.cur_exp.getParam('*ftEPR.PlsSPELCmd') self.api.XeprCmds.aqPgSelectBuf(1) self.api.XeprCmds.aqPgSelectBuf(2) self.api.XeprCmds.aqPgSelectBuf(3) self.api.cur_exp.getParam('*ftEpr.PlsSPELGlbTxt').value = def_text self.api.XeprCmds.aqPgShowDef() plsSPELCmdParam.value=3 time.sleep(5) # while not "The variable values are set up" in verbMsgParam.value: # time.sleep(0.1) self.api.XeprCmds.aqPgSelectBuf(2) self.api.cur_exp.getParam('*ftEpr.PlsSPELPrgTxt').value = exp_text plsSPELCmdParam.value=7 time.sleep(5) # while not "Second pass ended" in verbMsgParam.value: # time.sleep(0.1) self.api.set_field(sequence.B.value) self.api.set_freq(sequence.LO.value) if 'B' in sequence.progTable['Variable']: idx = sequence.progTable['Variable'].index('B') B_axis = sequence.progTable['axis'][idx] self.api.set_sweep_width(B_axis.max()-B_axis.min()) self.api.set_ReplaceMode(False) self.api.set_Acquisition_mode(1) self.api.set_PhaseCycle(True) pg = sequence.pulses[-1].tp.value pg = round_step(pg/2,self.bridge_config['Pulse dt']) d0 = self.d0-pg if d0 <0: d0=0 d0 = round_step(d0,self.bridge_config['Pulse dt']) self.api.set_PulseSpel_var('d0', d0) self.api.set_PulseSpel_var('pg', pg*2) self.api.set_PulseSpel_experiment('auto') self.api.set_PulseSpel_phase_cycling('auto') if start: self.api.run_exp() pass def tune_rectpulse(self,*, tp,**kwargs): """Mocks the tune_rectpulse command and returns a pair of RectPulses with the given tp and 2*tp respectively. No scale is set.""" p90 = RectPulse(tp=tp,freq=0,t=0,flipangle=np.pi/2) p180 = RectPulse(tp=tp*2,freq=0,t=0,flipangle=np.pi) self.pulses[f"p90_{tp}"] = p90 self.pulses[f"p180_{tp*2}"] = p180 return self.pulses[f"p90_{tp}"], self.pulses[f"p180_{tp*2}"] def tune_pulse(self, pulse, *args, **kwargs): """Mocks the tune_pulse command and returns the pulse unchanged. """ return pulse def tune(self, sequence, B0, LO) -> None: channels = _MPFU_channels(sequence) for i,channel in enumerate(channels): ps_length = int(np.pi /(channel[0]*2)) phase = channel[1] if phase == 0: echo = "R+" elif phase == np.pi/2: echo = "I+" elif phase == np.pi: echo = "R-" elif (phase == -np.pi/2) or (phase == 3*np.pi/2): echo = "I-" mpfu_tune = MPFUtune(self.api,B0=B0,LO=LO, echo="Hahn",ps_length=ps_length) mpfu_tune.tune({self.MPFU[i]: echo}) pass def isrunning(self) -> bool: if self.tuning: return True if self.bg_thread is None: return self.api.is_exp_running() else: return self.bg_thread.running() def terminate(self,now=False) -> None: self.tuning = False if self.bg_thread is None: if now: return self.api.abort_exp() else: return self.api.stop_exp() else: attempt = self.bg_thread.cancel() if not attempt: raise RuntimeError("Thread failed to be canceled!") def calc_d0(self): """ This creates an initial guess for d0. A better estimate can only be found after the field sweep. """ hw_log.info('Calcuating d0') hw_log.debug('Setting Detection = TM') self.api.hidden['Detection'].value = 'TM' B = self.api.get_field() LO = self.api.get_counterfreq() self.api.set_attenuator('+<x>',100) d0=0 self.d0=d0 seq = Sequence(name='single_pulse',B=B,LO=LO,reptime=3e3,averages=1,shots=4) det_tp = Parameter('tp',value=16,dim=4,step=0) seq.addPulse(RectPulse(tp=det_tp,t=0,flipangle=np.pi,pcyc={'phases':[0],'dets':[1]})) seq.addPulse(Detection(tp=16,t=d0)) seq.evolution([det_tp]) self.launch(seq,savename='test',tune=False) time.sleep(5) # self.terminate(now=True) # time.sleep(3) self.api.cur_exp['ftEPR.StartPlsPrg'].value = True self.api.hidden['specJet.NoOfAverages'].value = 20 self.api.hidden['specJet.NOnBoardAvgs'].value = 20 if not self.api.hidden['specJet.AverageStart'].value: self.api.hidden['specJet.AverageStart'].value = True self.api.hidden['specJet.NoOfPoints'].value = 1024 time.sleep(3) optimal = False while not optimal: max_value = np.abs(get_specjet_data(self)).max() y_max = np.abs(self.api.hidden['specjet.DataRange'][0]) vg =self.api.get_video_gain() if max_value > 0.7* y_max: self.api.set_video_gain(vg - 3) time.sleep(0.5) elif max_value < 0.3* y_max: self.api.set_video_gain(vg + 3) time.sleep(0.5) else: optimal=True specjet_data = np.abs(get_specjet_data(self)) calc_d0 = d0 + self.api.hidden['specJet.Abs1Data'][specjet_data.argmax()] d0 = calc_d0 - 256 seq = Sequence(name='single_pulse',B=B,LO=LO,reptime=3e3,averages=1,shots=20) det_tp = Parameter('tp',value=16,dim=4,step=0) seq.addPulse(RectPulse(tp=det_tp,t=0,flipangle=np.pi)) seq.addPulse(Detection(tp=16,t=d0)) seq.evolution([det_tp]) self.launch(seq,savename='test',tune=False) self.terminate() self.api.cur_exp['ftEPR.StartPlsPrg'].value = True if not self.api.hidden['specJet.AverageStart'].value: self.api.hidden['specJet.AverageStart'].value = True self.api.hidden['specJet.NoOfPoints'].value = 512 time.sleep(3) specjet_data = np.abs(get_specjet_data(self)) calc_d0 = d0 + self.api.hidden['specJet.Abs1Data'][specjet_data.argmax()] self.d0 = calc_d0 + 64 # 64ns added to compensate for hahn echo center in field sweep hw_log.info(f"d0 set to {self.d0}") self.api.hidden['Detection'].value = 'Signal' def calc_d0_from_Hahn_Echo(self, B=None, LO=None): B = self.api.get_field() LO = self.api.get_counterfreq() if B is not None: self.api.set_field(B) if LO is not None: self.api.set_freq(LO) d0 = self.d0 # self.api.set_PulseSpel_var('d0',d0) self.api.run_exp() self.api.abort_exp() while self.api.is_exp_running(): time.sleep(1) self.api.cur_exp['ftEPR.StartPlsPrg'].value = True self.api.hidden['specJet.NoOfAverages'].value = 20 self.api.hidden['specJet.NOnBoardAvgs'].value = 20 if not self.api.hidden['specJet.AverageStart'].value: self.api.hidden['specJet.AverageStart'].value = True self.api.hidden['specJet.NoOfPoints'].value = 512 optimal = False while not optimal: max_value = np.abs(get_specjet_data(self)).max() y_max = np.abs(self.api.hidden['specjet.DataRange'][0]) vg =self.api.get_video_gain() if max_value > 0.7* y_max: self.api.set_video_gain(vg - 3) time.sleep(0.5) elif max_value < 0.3* y_max: self.api.set_video_gain(vg + 3) time.sleep(0.5) else: optimal=True specjet_data = np.abs(get_specjet_data(self)) calc_d0 = d0 - 64 + self.api.hidden['specJet.Abs1Data'][specjet_data.argmax()] self.d0 = calc_d0 hw_log.info(f"d0 set to {self.d0}") return self.d0 # =============================================================================
[docs] def step_parameters(interface, reduced_seq, dim, variables): for i in range(dim): new_seq =reduced_seq.copy() # Change all variables in the sequence for var in variables: if var['variable'][0] is not None: raise ValueError('Only exp parameters are supported at the moment') attr = getattr(new_seq,var['variable'][1]) shift = attr.get_axis()[i] attr.value = (shift) setattr(new_seq,var['variable'][1],attr) print(f"{var['variable'][1]}: {getattr(new_seq,var['variable'][1]).value} ") # self.launch(new_seq,savename='test',tune=False, update_pulsespel=False) interface.api.set_field(new_seq.B.value) interface.api.set_freq(new_seq.LO.value) interface.api.run_exp() while interface.api.is_exp_running(): time.sleep(1) single_scan_data = interface.api.acquire_dataset() interface.bg_data[:,i] += single_scan_data.data
[docs] def _MPFU_channels(sequence): """Idenitifies how many unique MPFU channels are needed for a sequence and applies the correct Channel infomation to each pulse. """ channels = [] for iD, pulse in enumerate(sequence.pulses): if type(pulse) is Delay: continue if type(pulse) is Detection: continue if ('Channels' in pulse.pcyc) and (pulse.pcyc['Channels'] == 'ELDOR'): continue if pulse.tp.value == 0: flip_power = np.inf else: flip_power = pulse.flipangle.value / pulse.tp.value if (pulse.freq.value != 0): # This is an ELDOR pulse pulse.pcyc["Channels"] = "ELDOR" if "ELDOR" not in channels: channels.append("ELDOR") continue if not "Channels" in pulse.pcyc: pulse.pcyc["Channels"] = [] elif pulse.pcyc is None: pulse.pcyc = {} pulse.pcyc["Channels"] = [] for phase in pulse.pcyc["Phases"]: power_phase = (flip_power, phase) if power_phase in channels: channel = channels.index(power_phase) else: channels.append(power_phase) channel = channels.index(power_phase) pulse.pcyc["Channels"].append(channel) return channels
[docs] def _SPFU_channels(sequence,ELDOR=True): """Idenitifies how many unique MPFU channels are needed for a sequence and applies the correct Channel infomation to each pulse. """ channels = [] SPFU_flip_power = None ELDOR_flip_power = None for iD, pulse in enumerate(sequence.pulses): if type(pulse) is Delay: continue if type(pulse) is Detection: continue if ('Channels' in pulse.pcyc) and (pulse.pcyc['Channels'] == 'ELDOR'): continue if (pulse.tp.value == 0) or (pulse.freq.value != 0): if pulse.flipangle.value == 'Hard': flip_power = np.inf else: flip_power = pulse.flipangle.value / pulse.tp.value if ELDOR: if ELDOR_flip_power is None: ELDOR_flip_power = flip_power if ELDOR_flip_power != flip_power: raise RuntimeError("All pulses on the ELDOR channel must have equal flip power.") pulse.pcyc["Channels"] = "ELDOR" else: raise ValueError("An ELDOR Channel is needed for this sequence") continue else: flip_power = pulse.flipangle.value / pulse.tp.value if SPFU_flip_power is None: SPFU_flip_power = flip_power if SPFU_flip_power != flip_power: raise RuntimeError("In SPFU mode all pulses must have equal filp power") pulse.pcyc["Channels"] = "SPFU" return SPFU_flip_power, ELDOR_flip_power
# =============================================================================
[docs] def get_specjet_data(interface): n_points = interface.api.hidden['specJet.Data'].aqGetParDimSize(0) array = np.zeros(n_points,dtype=np.complex128) for i in range(n_points): array[i] = interface.api.hidden['specJet.Data'][i] + 1j* interface.api.hidden['specJet.Data1'][i] return array
[docs] def tune_power( interface, channel: str, tol=0.1, maxiter=30, bounds=[0, 100],hardware_wait=3, echo='abs',save=True) -> float: """Tunes the attenuator of a given channel to a given target using the standard scipy optimisation scripts. Parameters ---------- channel : str The chosen MPFU channel. Options: ['+<x>', '-<x>', '+<y>', '-<y>'] tol : float, optional The tolerance in attenuator parameter, by default 0.1 maxiter : int, optional The maximum number of iterations in the optimisation, by default 30 Returns ------- float The optimal value of the attenuator parameter """ channel_opts = ['+<x>', '-<x>', '+<y>', '-<y>','ELDOR','Main'] if channel not in channel_opts: raise ValueError(f'Channel must be one of: {channel_opts}') lb = bounds[0] ub = bounds[1] if channel == '+<x>': atten_channel = 'BrXAmp' dx=51 elif channel == '-<x>': atten_channel = 'BrMinXAmp' dx=51 elif channel == '+<y>': atten_channel = 'BrYAmp' dx=51 elif channel == '-<y>': atten_channel = 'BrMinYAmp' dx=51 elif channel == 'ELDOR': atten_channel = 'ELDORAtt' dx=31 elif channel == 'Main': atten_channel = 'PowerAtten' dx=ub+1 if echo=='abs': tran_sum = lambda x: -1 * np.sum(np.abs(x)) elif echo=='R+': tran_sum = lambda x: -1 * np.sum(np.real(x)) elif echo=='R-': tran_sum = lambda x: 1 * np.sum(np.real(x)) elif echo=='I+': tran_sum = lambda x: -1 * np.sum(np.real(x)) elif echo=='I-': tran_sum = lambda x: 1 * np.sum(np.real(x)) def objective(x, *args): interface.api.hidden[atten_channel].value = x # Set phase to value time.sleep(hardware_wait) data = get_specjet_data(interface) val = tran_sum(data) print(f'Power Setting = {x:.1f} \t Echo Amplitude = {-1*val:.2f}') return val # Rough Scan start_point = 0 loops = 0 limit = np.abs(interface.api.hidden['specjet.DataRange'][0]) while start_point ==0: overflow_flag = False x = np.linspace(lb,ub,dx,endpoint=True) y = np.zeros_like(x) vg = interface.api.get_video_gain() interface.api.hidden[atten_channel].value = x[0] time.sleep(2) for i,xi in enumerate(x): interface.api.hidden[atten_channel].value = xi time.sleep(0.1) data = get_specjet_data(interface) if (np.abs(data.real).max() > 0.7*limit) or (np.abs(data.imag).max() > 0.7*limit): interface.api.set_video_gain(vg - 9) overflow_flag= True print('overflow') break val = tran_sum(data) print(f'Power Setting = {xi:.1f} \t Echo Amplitude = {-1*val:.2f}') y[i] = val if not overflow_flag: if y[np.abs(y).argmax()].max() < 1: y *= -1 start_point = x[np.argmax(y)] elif (np.abs(y.real).max() < 0.3*limit) and (np.abs(y.imag).max() < 0.3*limit): interface.api.set_video_gain(vg + 6) overflow_flag= True print('underflow') continue loops += 1 if loops > 10: raise RuntimeError('Failed to optimise videogain') # span = ub-lb # ub = np.min([start_point+0.25*span,ub]) # lb = np.max([start_point-0.25*span,lb]) # output = minimize_scalar( # objective, method='bounded', bounds=[lb, ub], # options={'xatol': tol, 'maxiter': maxiter}) # result = output.x if save: dataset = create_dataset_from_axes(y,x,axes_labels=['amp']) timestamp = datetime.datetime.now().strftime(r'%Y%m%d_%H%M_') fullname = timestamp + channel + 'amptune.h5' dataset.to_netcdf(os.path.join(interface.savefolder,fullname),engine='h5netcdf',invalid_netcdf=True) result = start_point print(f"Optimal Power Setting for {atten_channel} is: {result:.1f}") hw_log.debug(f"Optimal Power Setting for {atten_channel} is: {result:.1f}") interface.api.hidden[atten_channel].value = result return result
[docs] def tune_phase(interface, channel: str, target: str, tol=0.1, maxiter=30,bounds=[0, 100],hardware_wait=3) -> float: """Tunes the phase of a given channel to a given target using the standard scipy optimisation scripts. Parameters ---------- channel : str The chosen MPFU channel. Options: ['+<x>', '-<x>', '+<y>', '-<y>'] target : str The target echo position, this can either be maximising (+) or minimising (-) either the real (R) or imaginary (I) of the echo. Options: ['R+', 'R-', 'I+', 'I-'] tol : float, optional The tolerance in phase parameter, by default 0.1 maxiter : int, optional The maximum number of iterations in the optimisation, by default 30 Returns ------- float The optimal value of the phase parameter """ channel_opts = ['+<x>', '-<x>', '+<y>', '-<y>','Main'] phase_opts = ['R+', 'R-', 'I+', 'I-'] if channel not in channel_opts: raise ValueError(f'Channel must be one of: {channel_opts}') if target not in phase_opts: raise ValueError(f'Phase target must be one of: {phase_opts}') if channel == '+<x>': phase_channel = 'BrXPhase' elif channel == '-<x>': phase_channel = 'BrMinXPhase' elif channel == '+<y>': phase_channel = 'BrYPhase' elif channel == '-<y>': phase_channel = 'BrMinYPhase' elif channel == 'Main': phase_channel = 'SignalPhase' if target == 'R+': test_fun = lambda x: -1 * np.real(x) elif target == 'R-': test_fun = lambda x: 1 * np.real(x) elif target == 'I+': test_fun = lambda x: -1 * np.imag(x) elif target == 'I-': test_fun = lambda x: 1 * np.imag(x) lb = bounds[0] ub = bounds[1] def objective(x, *args): # x = x[0] interface.api.hidden[phase_channel].value = x # Set phase to value time.sleep(hardware_wait) data = get_specjet_data(interface) val = test_fun(np.sum(data)) print(f'Phase Setting = {x:.1f} \t Echo Amplitude = {-1*val:.2f}') return val output = minimize_scalar( objective, method='bounded', bounds=[lb, ub], options={'xatol': tol, 'maxiter': maxiter}) result = output.x print(f"Optimal Phase Setting for {phase_channel} is: {result:.1f}") hw_log.debug(f"Optimal Phase Setting for {phase_channel} is: {result:.1f}") try: interface.api.hidden[phase_channel].value = result except: pass return result
[docs] def MPFUtune(interface, sequence, channels, echo='Hahn',tol: float = 0.1, bounds=[0, 100],tau_value=550): hardware_wait=1 def phase_to_echo(phase): if np.isclose(phase,0): return 'R+' elif np.isclose(phase,np.pi): return 'R-' elif np.isclose(phase,np.pi/2): return 'I+' elif np.isclose(phase,3*np.pi/2) or np.isclose(phase,-np.pi/2): return 'I-' else: raise ValueError('Phase must be a multiple of pi/2') for i,channel in enumerate(channels): if channel =='ELDOR': continue MPFU_chanel = interface.MPFU[i] if MPFU_chanel == '+<x>': phase_cycle = 'BrXPhase' elif MPFU_chanel == '-<x>': phase_cycle = 'BrMinXPhase' elif MPFU_chanel == '+<y>': phase_cycle = 'BrYPhase' elif MPFU_chanel == '-<y>': phase_cycle = 'BrMinYPhase' if channel[0] == np.inf: interface.api.set_attenuator(MPFU_chanel,100) continue tau = Parameter("tau",tau_value,dim=4,step=0) exc_pulse = RectPulse(freq=0, tp = np.around(np.pi/2 /channel[0]), scale=1, flipangle = np.pi/2) ref_pulse = RectPulse(freq=0, tp = np.around(np.pi / channel[0] ), scale=1, flipangle = np.pi) seq = HahnEchoSequence( B=sequence.B,LO=sequence.LO,reptime=sequence.reptime,averages=1, shots=10, tau=tau, pi2_pulse=exc_pulse, pi_pulse=ref_pulse) seq.pulses[0].pcyc = {'Phases': [0], 'DetSigns': [1.0]} seq._buildPhaseCycle() seq.evolution([tau]) interface.launch(seq, savename="autoTUNE", start=True, tune=False, MPFU_overwrite=[MPFU_chanel,MPFU_chanel],reset_bg_data=False,reset_cur_exp=False) time.sleep(3) interface.terminate() interface.api.cur_exp['ftEPR.StartPlsPrg'].value = True interface.api.hidden['specJet.NoOfPoints'].value = 512 interface.api.hidden['specJet.NoOfAverages'].value = 20 interface.api.hidden['specJet.NOnBoardAvgs'].value = 20 if not interface.api.hidden['specJet.AverageStart'].value: interface.api.hidden['specJet.AverageStart'].value = True # interface.api.set_PulseSpel_phase_cycling('auto') print(f"Tuning channel: {MPFU_chanel}") tune_power(interface, MPFU_chanel, tol=tol, bounds=bounds,hardware_wait=hardware_wait) tune_phase(interface, MPFU_chanel, phase_to_echo(channel[1]), tol=tol)
[docs] def ELDORtune(interface, sequence, freq, MPFU=True, tau_value=550,test_tp = 16,plot=False,save=True): sequence_gyro = sequence.B.value / sequence.LO.value new_freq = sequence.LO.value + freq new_B = new_freq * sequence_gyro interface.api.set_ELDOR_freq(new_freq) ref_echoseq = Sequence(name='ELDOR tune',B=new_B, LO=new_freq, reptime=sequence.reptime, averages=1, shots=10) # tune a pair of 90/180 pulses at the eldor frequency if MPFU: channels = [(np.pi/2 / test_tp,0),(np.pi / test_tp,0)] MPFUtune(interface, ref_echoseq, channels,echo='Hahn') test_tp_pi=test_tp else: test_tp_pi=test_tp*2 flip_angle = np.pi/2 / test_tp SPFUtune(interface,ref_echoseq,flip_angle) tp = Parameter("tp",test_tp) long_delay = Parameter("long_delay",2000) tau = Parameter("tau",tau_value,dim=4,step=0) ref_echoseq.addPulse(RectPulse(freq=0, t=0, tp=tp, flipangle=np.pi)) ref_echoseq.addPulse(RectPulse(freq=0, t=long_delay,tp=tp, flipangle=np.pi/2)) ref_echoseq.addPulse(RectPulse(freq=0, t=long_delay+tau,tp=test_tp_pi, flipangle=np.pi)) ref_echoseq.addPulse(Detection(t=long_delay+2*tau, tp=512)) ref_echoseq.evolution([tau]) ref_echoseq.pulses[0].pcyc["Channels"] = "ELDOR" interface.launch(ref_echoseq, savename="autoTUNE", start=True, tune=False,reset_bg_data=False,reset_cur_exp=False) time.sleep(3) interface.terminate() interface.api.cur_exp['ftEPR.StartPlsPrg'].value = True interface.api.hidden['specJet.NoOfPoints'].value = 512 interface.api.hidden['specJet.NoOfAverages'].value = 20 interface.api.hidden['specJet.NOnBoardAvgs'].value = 20 if not interface.api.hidden['specJet.AverageStart'].value: interface.api.hidden['specJet.AverageStart'].value = True print(f"Tuning channel: ELDOR") # tune_phase(interface, 'Main', tol=5, bounds=[0,4096],target='R-') atten_axis = np.arange(30,0,-1) data = np.zeros(atten_axis.shape,dtype=np.complex128) for i,x in enumerate(atten_axis): interface.api.set_attenuator('ELDOR',x) # Set phase to value time.sleep(1) data[i] = np.trapz(get_specjet_data(interface)) data = correctphase(data) if data[np.abs(data).argmax()].max() < 1: data = -1*data if plot: plt.plot(atten_axis,data) plt.xlabel('Attenuator (dB)') if save: dataset = create_dataset_from_axes(data,atten_axis,axes_labels=['amp']) timestamp = datetime.datetime.now().strftime(r'%Y%m%d_%H%M_') fullname = timestamp + 'ELDOR_amptune.h5' dataset.to_netcdf(os.path.join(interface.savefolder,fullname),engine='h5netcdf',invalid_netcdf=True) new_value = np.around(atten_axis[data.argmin()],2) print(f"ELDOR Atten Set to: {new_value}") hw_log.debug(f"ELDOR Atten Set to: {new_value}") interface.api.set_attenuator('ELDOR',new_value)
# tune_power(interface, 'ELDOR', tol=1, bounds=[0,30],echo='R-')
[docs] def SPFUtune(interface, sequence, flip_power, echo='Hahn',tol: float = 0.1, bounds=[0, 60],tau_value=550): hardware_wait=1 tau = Parameter("tau",tau_value,dim=4,step=0) exc_pulse = RectPulse(freq=0, tp = np.around(np.pi/2 /flip_power), scale=1, flipangle = np.pi/2) ref_pulse = RectPulse(freq=0, tp = np.around(np.pi / flip_power ), scale=1, flipangle = np.pi) seq = HahnEchoSequence( B=sequence.B,LO=sequence.LO,reptime=sequence.reptime,averages=1, shots=10, tau=tau, pi2_pulse=exc_pulse, pi_pulse=ref_pulse) seq.pulses[0].pcyc = {'Phases': [0], 'DetSigns': [1.0]} seq._buildPhaseCycle() seq.evolution([tau]) interface.launch(seq, savename="autoTUNE", start=True, tune=False,reset_bg_data=False,reset_cur_exp=False) time.sleep(3) interface.terminate() interface.api.cur_exp['ftEPR.StartPlsPrg'].value = True interface.api.hidden['specJet.NoOfPoints'].value = 512 interface.api.hidden['specJet.NoOfAverages'].value = 20 interface.api.hidden['specJet.NOnBoardAvgs'].value = 20 if not interface.api.hidden['specJet.AverageStart'].value: interface.api.hidden['specJet.AverageStart'].value = True # interface.api.set_PulseSpel_phase_cycling('auto') print(f"Tuning SPFU channels:") tune_power(interface, 'Main', tol=tol, bounds=bounds,hardware_wait=hardware_wait)
[docs] def test_if_MPFU_compatability(seq): table = seq.progTable if 'LO' in table['Variable']: return False elif np.unique(table['axID']).size > 2: return False else: return True