import PyQt6.QtCore as QtCore
from autodeer import RectPulse, ChirpPulse, HSPulse, Detection, DEERCriteria, SNRCriteria, TimeCriteria
from autodeer.sequences import *
import time
import numpy as np
from threadpoolctl import threadpool_limits
[docs]
class autoDEERSignals(QtCore.QObject):
'''
Defines the signals available from a running worker thre
Supported signals are:
finished
No data
error
tuple (exctype, value, traceback.format_exc() )
result
object data returned from processing, anything
progress
int indicating % progress
'''
[docs]
finished = QtCore.pyqtSignal()
[docs]
error = QtCore.pyqtSignal(tuple)
[docs]
result = QtCore.pyqtSignal(object)
[docs]
progress = QtCore.pyqtSignal(int)
[docs]
status = QtCore.pyqtSignal(str)
[docs]
fsweep_result = QtCore.pyqtSignal(object)
[docs]
respro_result = QtCore.pyqtSignal(object)
# optimise_pulses = QtCore.pyqtSignal(object)
[docs]
relax_result = QtCore.pyqtSignal(object)
[docs]
Relax2D_result = QtCore.pyqtSignal(object)
[docs]
T2_result = QtCore.pyqtSignal(object)
[docs]
quickdeer_result = QtCore.pyqtSignal(object)
[docs]
longdeer_result = QtCore.pyqtSignal(object)
[docs]
quickdeer_update = QtCore.pyqtSignal(object)
[docs]
longdeer_update = QtCore.pyqtSignal(object)
[docs]
reptime_scan_result = QtCore.pyqtSignal(object)
[docs]
timeout = QtCore.pyqtSignal()
[docs]
class autoDEERWorker(QtCore.QRunnable):
'''
Worker thread
Inherits from QRunnable to handler worker thread setup, signals and wrap-up.
:param callback: The function callback to run on this worker thre Supplied args and
kwargs will be passed through to the runner.
:type callback: function
:param args: Arguments to pass to the callback function
:param kwargs: Keywords to pass to the callback function
'''
def __init__(self, interface, wait:QtCore.QWaitCondition, mutex:QtCore.QMutex,pulses:dict, results:dict, LO, gyro,AWG=True, user_inputs:dict = None, *args, **kwargs):
super(autoDEERWorker,self).__init__()
# Store constructor arguments (re-used for processing)
[docs]
self.interface = interface
[docs]
self.signals = autoDEERSignals()
[docs]
self.samplename = user_inputs['sample']
[docs]
self.project = user_inputs['project']
[docs]
self.quick_deer_state = True
if (self.project is None) or (self.project == ''):
def savename(exp, suffix=""):
if suffix != "":
return f"({self.samplename})_({exp})_({suffix})"
else:
return f"({self.samplename})_({exp})"
else:
def savename(exp,suffix=""):
if suffix != "":
return f"({self.project})_({self.samplename})_({exp})_({suffix})"
else:
return f"({self.project})_({self.samplename})_({exp})"
[docs]
self.savename = savename
if 'SampleConc' in self.user_inputs:
self.noise_mode = self.user_inputs['SampleConc']
else:
self.noise_mode = 1
if not 'DEER_update_func' in self.user_inputs:
self.user_inputs['DEER_update_func'] = None
if 'cores' in kwargs:
self.cores = kwargs['cores']
else:
self.cores = 1
if 'tp' in kwargs:
self.tp = kwargs['tp']
else:
self.tp=12
if "night_hours" in kwargs:
night_hours = kwargs['night_hours']
[docs]
self.EndTimeCriteria = TimeCriteria('End Time',time.time() + self.user_inputs['MaxTime']*3600, "Overall end time",end_signal=self.signals.timeout.emit,night_hours=night_hours)
[docs]
self.test_interval = 0.5
# # Add the callback to our kwargs
# self.kwargs['progress_callback'] = self.signals.progress
[docs]
def pause_and_wait(self):
self.mutex.lock()
self.wait.wait(self.mutex)
self.mutex.unlock()
[docs]
def run_fsweep(self):
'''
Initialise the runner function with passed args, kwargs.
'''
LO = self.LO
gyro_N = self.gyro
reptime = self.reptime
p90, p180 = self.interface.tune_rectpulse(tp=self.tp, LO=LO, B=LO/gyro_N, reptime = reptime,shots=int(100*self.noise_mode))
shots = int(50*self.noise_mode)
shots = np.min([shots,20])
fsweep = FieldSweepSequence(
B=LO/gyro_N, LO=LO,reptime=reptime,averages=50,shots=int(50*self.noise_mode),
Bwidth = 250,
pi2_pulse=p90, pi_pulse=p180,
)
self.interface.launch(fsweep,savename=self.savename("EDFS_Q"),IFgain=1)
self.signals.status.emit('Field-sweep running')
self.interface.terminate_at(SNRCriteria(30),test_interval=self.test_interval)
while self.interface.isrunning():
time.sleep(self.updaterate)
self.signals.status.emit('Field-sweep complete')
self.signals.fsweep_result.emit(self.interface.acquire_dataset())
[docs]
def run_respro(self):
'''
Initialise the runner function for resonator profile.
'''
LO = self.LO
gyro=self.gyro
reptime = self.reptime
p90, p180 = self.interface.tune_rectpulse(tp=self.tp, LO=LO, B=LO/gyro, reptime = reptime,shots=int(100*self.noise_mode))
RPseq = ResonatorProfileSequence(
B=LO/gyro, LO=LO,reptime=reptime,averages=10,shots=int(50*self.noise_mode),
pi2_pulse=p90, pi_pulse=p180,fwidth=0.15
)
self.interface.launch(RPseq,savename=self.savename("ResPro"),IFgain=1)
self.signals.status.emit('Resonator Profile running')
self.interface.terminate_at(SNRCriteria(5),test_interval=self.test_interval)
while self.interface.isrunning():
time.sleep(self.updaterate)
self.signals.status.emit('Resonator Profile complete')
self.signals.respro_result.emit(self.interface.acquire_dataset())
self.pause_and_wait()
if np.abs(LO-self.LO) > 0.1:
# Rerun Resonator Profile
self.run_respro()
return 'skip'
[docs]
def run_CP_relax(self,dt=200):
'''
Initialise the runner function for relaxation.
'''
self.signals.status.emit('Running Carr-Purcell Experiment')
LO = self.LO
gyro = self.gyro
reptime = self.reptime
shots = int(40*self.noise_mode)
shots = np.min([shots,10])
relax = DEERSequence(
B=LO/gyro, LO=LO,reptime=reptime,averages=10,shots=shots,
tau1=0.5, tau2=0.5, tau3=0.2, dt=15,
exc_pulse=self.pulses['exc_pulse'], ref_pulse=self.pulses['ref_pulse'],
pump_pulse=self.pulses['pump_pulse'], det_event=self.pulses['det_event']
)
relax.five_pulse(relaxation=True, re_step=dt)
if self.AWG:
relax.select_pcyc("16step_5p")
else:
relax.select_pcyc("DC")
relax.shots.value *= 8
relax._estimate_time();
relax.pulses[1].scale.value = 0
relax.pulses[3].scale.value = 0
self.interface.launch(relax,savename=self.savename("CP_Q"),IFgain=1)
self.interface.terminate_at(SNRCriteria(50),test_interval=self.test_interval)
while self.interface.isrunning():
time.sleep(self.updaterate)
self.signals.relax_result.emit(self.interface.acquire_dataset())
self.signals.status.emit('Carr-Purcell experiment complete')
[docs]
def run_T2_relax(self,dt=60):
self.signals.status.emit('Running T2 experiment')
LO = self.LO
gyro = self.gyro
reptime = self.reptime
shots = int(40*self.noise_mode)
shots = np.min([shots,10])
seq = T2RelaxationSequence(
B=LO/gyro, LO=LO,reptime=reptime,averages=10,shots=shots,
step=dt,dim=200,pi2_pulse=self.pulses['exc_pulse'],
pi_pulse=self.pulses['ref_pulse'], det_event=self.pulses['det_event'])
self.interface.launch(seq,savename=self.savename("T2_Q"),IFgain=1)
self.interface.terminate_at(SNRCriteria(50),test_interval=self.test_interval)
while self.interface.isrunning():
time.sleep(self.updaterate)
self.signals.T2_result.emit(self.interface.acquire_dataset())
self.signals.status.emit('T2relax experiment complete')
[docs]
def run_2D_relax(self):
self.signals.status.emit('Running 2D decoherence experiment')
LO = self.LO
gyro = self.gyro
reptime = self.reptime
seq = RefocusedEcho2DSequence(
B=LO/gyro, LO=LO,reptime=reptime,averages=10,shots=int(25*self.noise_mode),
tau=self.max_tau, pi2_pulse=self.pulses['exc_pulse'],
pi_pulse=self.pulses['ref_pulse'], det_event=self.pulses['det_event'])
self.interface.launch(seq,savename=self.savename("2D_DEC"),IFgain=2)
self.interface.terminate_at(SNRCriteria(15),test_interval=self.test_interval)
while self.interface.isrunning():
time.sleep(self.updaterate)
self.signals.Relax2D_result.emit(self.interface.acquire_dataset())
self.signals.status.emit('2D decoherence experiment complete')
[docs]
def run_quick_deer(self):
if not self.quick_deer_state:
return 'skip'
self.signals.status.emit('Running QuickDEER')
DEER_crit = DEERCriteria(mode="speed",verbosity=2,update_func=self.signals.quickdeer_update.emit)
time_crit = TimeCriteria('Max time criteria', time.time() + 4*60*60 ,'Max time criteria')
total_crit = [DEER_crit, time_crit]
signal = self.signals.quickdeer_result.emit
self.run_deer(total_crit,signal, dt=16,shot=15,averages=150 )
[docs]
def run_long_deer(self):
self.signals.status.emit('Running LongDEER')
DEER_crit = DEERCriteria(mode="high",verbosity=2,update_func=self.signals.longdeer_update.emit)
total_crit = [DEER_crit, self.EndTimeCriteria]
signal = self.signals.longdeer_result.emit
self.run_deer(total_crit,signal, dt=16,shot=50,averages=1e4)
[docs]
def run_deer(self,end_criteria,signal, dt=16,shot=50,averages=1000,):
LO = self.LO
reptime = self.reptime
if ('tau1' in self.user_inputs) and (self.user_inputs['tau1'] != 0):
tau1 = self.user_inputs['tau1']
tau2 = self.user_inputs['tau2']
deertype = self.user_inputs['ExpType']
if deertype == '5pDEER':
tau3 = self.user_inputs['tau3']
else:
tau3 = None
elif self.deer_inputs != {}:
tau1 = self.deer_inputs['tau1']
tau2 = self.deer_inputs['tau2']
deertype = self.deer_inputs['ExpType']
if deertype == '5pDEER':
tau3 = self.deer_inputs['tau3']
else:
tau3 = None
dt = self.deer_inputs['dt']
else:
rec_tau = self.results['quickdeer'].rec_tau_max
dt = self.results['quickdeer'].rec_dt * 1e3
max_tau = self.results['relax'].max_tau
tau = np.min([rec_tau,max_tau])
deertype = '5pDEER'
tau1 = tau
tau2 = tau
tau3 = 0.3
if 'ESEEM' in self.deer_inputs:
ESEEM = self.deer_inputs['ESEEM']
else:
ESEEM = None
deer = DEERSequence(
B=LO/self.gyro, LO=LO,reptime=reptime,averages=averages,shots=int(50*self.noise_mode),
tau1=tau1, tau2=tau2, tau3=tau3, dt=dt,
exc_pulse=self.pulses['exc_pulse'], ref_pulse=self.pulses['ref_pulse'],
pump_pulse=self.pulses['pump_pulse'], det_event=self.pulses['det_event'],
ESEEM_avg = ESEEM
)
if deertype == '5pDEER':
deer.five_pulse()
savename_type = 'DEER_5P_Q'
savename_suffix = f"{tau1:.3f}us_{tau2:.3f}us_{tau3:.3f}us"
elif deertype == '4pDEER':
deer.four_pulse()
savename_type = 'DEER_4P_Q'
savename_suffix = f"{tau1:.3f}us_{tau2:.3f}us"
elif deertype == '3pDEER':
deer.four_pulse()
savename_type = 'DEER_3P_Q'
savename_suffix = f"{tau1:.3f}u"
if not self.AWG:
deer.select_pcyc('DC')
deer.shots.value *= 8
elif deertype == '5pDEER':
deer.select_pcyc("16step_5p")
elif deertype == '4pDEER':
deer.select_pcyc("16step_4p")
elif deertype == '3pDEER':
deer.select_pcyc("8step_3p")
deer._estimate_time();
self.interface.launch(deer,savename=self.savename(savename_type,savename_suffix),IFgain=2)
time.sleep(30) # Always wait for the experiment to properly start
with threadpool_limits(limits=self.cores, user_api='blas'):
self.interface.terminate_at(end_criteria,verbosity=2,test_interval=self.test_interval) # Change criteria backagain
while self.interface.isrunning():
time.sleep(self.updaterate)
signal(self.interface.acquire_dataset())
self.signals.status.emit('DEER experiment complete')
[docs]
def run_reptime_opt(self):
reptime_guess = self.reptime
LO = self.LO
p90, p180 = self.interface.tune_rectpulse(tp=self.tp, LO=LO, B=LO/self.gyro, reptime = reptime_guess,shots=int(100*self.noise_mode))
n_shots = int(np.min([int(50*self.noise_mode),50]))
scan = ReptimeScan(B=LO/self.gyro, LO=LO,reptime=reptime_guess, reptime_max=12e3, averages=10, shots=n_shots,
pi2_pulse=p90, pi_pulse=p180)
self.interface.launch(scan,savename=f"{self.samplename}_reptimescan",IFgain=1)
self.interface.terminate_at(SNRCriteria(15),verbosity=2,test_interval=self.test_interval)
while self.interface.isrunning():
time.sleep(self.updaterate)
self.signals.status.emit('Reptime scan complete')
self.signals.reptime_scan_result.emit(self.interface.acquire_dataset())
[docs]
def tune_pulses(self):
# Tune the pulses
self.signals.status.emit('Tuning pulses')
pump_pulse = self.pulses['pump_pulse']
ref_pulse = self.pulses['ref_pulse']
exc_pulse = self.pulses['exc_pulse']
det_event = self.pulses['det_event']
self.signals.status.emit('Tuning pulses')
exc_pulse = self.interface.tune_pulse(exc_pulse, mode="amp_nut", B=self.LO/self.gyro,LO=self.LO,reptime=self.reptime,shots=int(100*self.noise_mode))
ref_pulse = self.interface.tune_pulse(ref_pulse, mode="amp_nut", B=self.LO/self.gyro,LO=self.LO,reptime=self.reptime,shots=int(100*self.noise_mode))
pump_pulse = self.interface.tune_pulse(pump_pulse, mode="amp_nut", B=self.LO/self.gyro,LO=self.LO,reptime=self.reptime,shots=int(100*self.noise_mode))
return 'skip'
[docs]
def _build_methods(self):
seq = self.deer_inputs['ExpType']
if (self.deer_inputs['tau1'] == 0) and (self.deer_inputs['tau2'] == 0):
quick_deer = True
else:
quick_deer = False
methods = [self.run_fsweep,self.run_reptime_opt,self.run_respro,self.run_fsweep,
self.tune_pulses]
if (seq is None) or (seq == '5pDEER'):
methods.append(self.run_CP_relax)
methods.append(self.run_T2_relax)
elif (seq == '4pDEER') or (seq == 'Ref2D'):
methods.append(self.run_CP_relax)
methods.append(self.run_T2_relax)
methods.append(self.run_2D_relax)
if seq == 'Ref2D':
return methods
if quick_deer:
methods.append(self.run_quick_deer)
methods.append(self.run_long_deer)
return methods
@QtCore.pyqtSlot()
[docs]
def run(self):
self.reptime = 3e3
self.stop_flag = False
# methods = [self.run_fsweep,self.run_reptime_opt,self.run_respro,self.run_fsweep,
# self.tune_pulses,self.run_CP_relax,self.run_T2_relax,self.run_quick_deer,
# self.run_long_deer]
methods = self._build_methods()
for method in methods:
if self.stop_flag:
self.signals.finished.emit()
return None
flag = method()
if flag is None:
self.pause_and_wait()
elif flag == 'skip':
continue
if self.stop_flag:
self.signals.finished.emit()
return None
self.signals.finished.emit()
[docs]
def new_data(self, data):
self.results = data
[docs]
def new_pulses(self, pulses):
self.pulses = pulses
[docs]
def update_LO(self,LO):
self.LO = LO
[docs]
def update_reptime(self,reptime):
self.reptime = reptime
[docs]
def update_gyro(self,gyro):
self.gyro = gyro
[docs]
def set_2D_max_tau(self, max_tau):
self.max_tau = max_tau
[docs]
def update_deersettings(self,deer_settings):
self.deer_inputs = deer_settings
[docs]
def stop(self):
self.stop_flag = True
self.signals.status.emit('Stopping experiment')
[docs]
def set_noise_mode(self,level):
self.noise_mode = level