from pyepr.classes import Interface
from pyepr.hardware.XeprAPI_link import XeprAPILink
from pyepr.hardware.Bruker_tools import PulseSpel, run_general, build_unique_progtable,write_pulsespel_file,step_parameters, get_specjet_data
from pyepr.dataset import *
from pyepr.utils import transpose_list_of_dicts, transpose_dict_of_list,round_step
from pyepr.pulses import RectPulse, Delay
from pyepr.sequences import HahnEchoSequence
import tempfile
import time
from scipy.optimize import minimize_scalar, curve_fit
import numpy as np
import os
from pathlib import Path
import datetime
from deerlab import correctphase
import matplotlib.pyplot as plt
import logging
import warnings
# ===================
# =============================================================================
[docs]
hw_log = logging.getLogger('interface.Xepr')
class BrukerAWG(Interface):
"""
Represents the interface for connecting to AWG based Bruker ELEXSYS-II
Spectrometers.
"""
def __init__(self, config_file) -> None:
"""An interface for connecting to AWG based Bruker ELEXSYS-II
Spectrometers.
Getting Started
------------------
Before a connection can be made an appropriate configuration file first
needs to be written.
1. Open Xepr
2. Processing -> XeprAPI -> Enable XeprAPI
3. `BrukerAWG.connect()`
Parameters
----------
config_file : str
The path to a YAML configuration file.
Attributes
----------
bg_thread: None or threading.Thread
If a background thread is needed, it is stored here.
"""
self.api = XeprAPILink(config_file)
self.spec_config = self.api.config["Spectrometer"]
self.bridge_config = self.api.spec_config["Bridge"]
self.temp_dir = tempfile.mkdtemp("autoDEER")
self.d0 = self.bridge_config["d0"]
self.bg_thread = None
self.bg_data = None
self.cur_exp = None
self.tuning = False
self.pool = None # QThreadPool
self.savename = ''
self.savefolder = str(Path.home())
self.setup_flag=False
super().__init__(config_file)
def connect(self, d0=None) -> None:
self.api.connect()
time.sleep(1)
self.setup(d0)
return super().connect()
def setup(self,d0=None):
self.api.hidden['BrPlsMode'].value = True
self.api.hidden['OpMode'].value = 'Operate'
# self.api.hidden['RefArm'].value = 'On'
# TODO add detection of VAMP-III model
video_bw = self.bridge_config.get('Video BW')
# self.api.cur_exp['VideoBW'].value = 20
self.time_base = 1/(video_bw*2e-3)
self.api.hidden['specJet.TimeBase'].value = self.time_base
if d0 is None:
self.calc_d0()
else:
self.d0 = d0
self.api.hidden['Detection'].value = 'Signal'
self.setup_flag = True
pass
def acquire_dataset(self, **kwargs):
if self.bg_data is None:
if not self.isrunning():
if (self.savename is not None) and (self.savename != ''):
full_path = os.path.join(self.savefolder,self.savename)
if Path(full_path).is_file():
self.savename = self.savename+"_1"
self.api.xepr_save(os.path.join(self.savefolder,self.savename))
data = self.api.acquire_dataset(self.cur_exp)
else:
data = self.api.acquire_scan(self.cur_exp,**kwargs)
else:
data = create_dataset_from_sequence(self.bg_data, self.cur_exp)
return super().acquire_dataset(data)
def _launch_complex_thread(self,sequence,axID=1,tune=True):
uProgTable = build_unique_progtable(sequence)
reduced_seq = sequence.copy()
reduced_seq.progTable = transpose_list_of_dicts([transpose_dict_of_list(sequence.progTable)[0]])
uProgTable_py = uProgTable[axID]
axis = uProgTable_py['axis']
reduced_seq.averages.value = 1
py_ax_dim = uProgTable_py['axis']['dim']
self.bg_data = np.zeros((uProgTable[0]['axis']['dim'],py_ax_dim),dtype=np.complex128)
print("Initial PulseSpel Launch")
self.launch(reduced_seq,savename='test',tune=tune, update_pulsespel=True, start=False,reset_bg_data=False,reset_cur_exp=False)
self.terminate()
variables = uProgTable_py['variables']
step_parameters(self,reduced_seq,py_ax_dim,variables)
# thread.start()
pass
def launch(self, sequence: Sequence, savename: str, start=True, tune=True,
MPFU_overwrite=None,update_pulsespel=True, reset_bg_data = True,
reset_cur_exp=True, IFgain=None, **kwargs):
sequence.shift_detfreq_to_zero()
if self.isrunning():
self.terminate(now=True)
time.sleep(4)
if reset_bg_data:
self.bg_data = None
if reset_cur_exp:
timestamp = datetime.datetime.now().strftime(r'%Y%m%d_%H%M_')
if Path(os.path.join(self.savefolder,timestamp+savename+'.DSC')).is_file():
timestamp = datetime.datetime.now().strftime(r'%Y%m%d_%H%M%S_')
self.savename = timestamp+savename
self.cur_exp = sequence
# First check if the sequence is pulsespel compatible
if not test_if_MPFU_compatability(sequence):
print("Launching complex sequence")
self._launch_complex_thread(sequence,1,tune)
return None
pg = sequence.pulses[-1].tp.value
pg = round_step(pg/2,self.bridge_config['Pulse dt'])
d0 = self.d0-pg
if d0 <0:
d0=0
d0 = round_step(d0,self.bridge_config['Pulse dt'])
if update_pulsespel:
try:
self.log.debug(sequence.__str__())
except:
pass
MaxGate = np.min([40,sequence.reptime.value*self.bridge_config['DutyCycle']*1e-2])
def_text, exp_text, pulse_shapes = write_pulsespel_file(sequence,d0,AWG=True,MaxGate=MaxGate)
verbMsgParam = self.api.cur_exp.getParam('*ftEPR.PlsSPELVerbMsg')
plsSPELCmdParam = self.api.cur_exp.getParam('*ftEPR.PlsSPELCmd')
self.api.XeprCmds.aqPgSelectBuf(1)
self.api.XeprCmds.aqPgSelectBuf(2)
self.api.XeprCmds.aqPgSelectBuf(3)
# Merge the pulse shapes into one string
pulse_shapes = '\n'.join(pulse_shapes)
self.api.cur_exp.getParam('*ftEpr.PlsSPELShpTxt').value = pulse_shapes
plsSPELCmdParam.value=9 # Processing shapes
time.sleep(2)
self.api.XeprCmds.aqPgSelectBuf(1)
self.api.cur_exp.getParam('*ftEpr.PlsSPELGlbTxt').value = def_text
self.api.XeprCmds.aqPgShowDef()
plsSPELCmdParam.value=3 # Compile variables
time.sleep(2)
# while not "The variable values are set up" in verbMsgParam.value:
# time.sleep(0.1)
self.api.XeprCmds.aqPgSelectBuf(2)
self.api.cur_exp.getParam('*ftEpr.PlsSPELPrgTxt').value = exp_text
plsSPELCmdParam.value=5 # Compile experiments
time.sleep(4)
# while not "Second pass ended" in verbMsgParam.value:
# time.sleep(0.1)
self.api.set_field(sequence.B.value)
self.api.set_freq(sequence.freq.value)
self.api.hidden['specJet.TimeBase'].value = self.time_base
if IFgain is not None:
self.api.set_video_gain(IFgain)
self.IFgain = self.api.get_video_gain()
self.api.set_PhaseCycle(self.bridge_config.get('On Board Pcyc',False))
if 'B' in sequence.progTable['Variable']:
idx = sequence.progTable['Variable'].index('B')
B_axis = sequence.progTable['axis'][idx]
self.api.set_sweep_width(B_axis.max()-B_axis.min())
self.api.set_PhaseCycle(self.bridge_config.get('On Board Pcyc (EDFS)',False))
# self.api.set_field(sequence.B.value + B_axis.min())
self.api.set_field(sequence.B.value)
self.api.cur_exp['fieldCtrl.AtLeftBorder'].value = True
time.sleep(5)
self.api.set_ReplaceMode(False)
self.api.set_Acquisition_mode(1)
pg = sequence.pulses[-1].tp.value
pg = round_step(pg/2,self.bridge_config['Pulse dt'])
d0 = self.d0-pg
if d0 <0:
d0=0
d0 = round_step(d0,self.bridge_config['Pulse dt'])
time.sleep(1.0)
self.api.set_PulseSpel_var('d0', d0)
self.api.set_PulseSpel_var('pg', pg*2)
self.api.set_PulseSpel_experiment('auto')
self.api.set_PulseSpel_phase_cycling('auto')
if start:
# if start and not ('B' in sequence.progTable['Variable']):
# self.api.run_exp()
# elif start:
# self.api.set_PulseSpel_var('m', 1)
# self.api.set_PulseSpel_var('h',100)
# hw_log.warning('NOT USING PULSE_SPEL FOR FIELDSWEEP')
# self.api.set_Acquisition_mode(1)
# self.api.run_exp()
# for i in range (120):
# if self.isrunning():
# time.sleep(1)
# else:
# break
# self.api.set_Acquisition_mode(0)
# self.api.set_xaxis_quanity('Magnetic Field')
self.api.run_exp()
pass
def tune_rectpulse(self,*,tp, freq, B, reptime, shots=400):
"""Generates a rectangular pi and pi/2 pulse of the given length at
the given field position. This value is stored in the pulse cache.
Parameters
----------
tp : float
Pulse length in ns
freq : float
Central frequency of this pulse in GHz
B : float
Magnetic B0 field position in Gauss
reptime: float
Shot repetion time in us.
shots: int
The number of shots
Returns
-------
p90: RectPulse
A tuned rectangular pi/2 pulse of length tp
p180: RectPulse
A tuned rectangular pi pulse of length tp
"""
time.sleep(5)
amp_tune =HahnEchoSequence(
B=B, freq=freq, reptime=reptime, averages=1, shots=shots
)
scale = Parameter("scale",0,dim=45,step=0.02)
amp_tune.pulses[0].tp.value = tp
amp_tune.pulses[0].scale = scale
amp_tune.pulses[1].tp.value = tp * 2
amp_tune.pulses[1].scale = scale
amp_tune.evolution([scale])
overflow_flag= True
while overflow_flag:
vg = self.api.get_video_gain()
self.launch(amp_tune, "autoDEER_amptune")
time.sleep(10)
while self.isrunning():
time.sleep(5)
time.sleep(3)
dataset = self.acquire_dataset()
dataset = dataset.epr.correctphase
data = np.abs(dataset.data)
scale_amp = np.around(dataset.pulse0_scale[data.argmax()].data,2)
if scale_amp > 0.95:
raise RuntimeError("Not enough power avaliable.")
if scale_amp == 0:
warnings.warn("Pulse tuned with a scale of zero!")
p90 = amp_tune.pulses[0].copy(
scale=scale_amp)
p180 = amp_tune.pulses[1].copy(
scale=scale_amp)
# Now tune the corresponding videoGain
amp_tune_short = amp_tune.copy()
scale = Parameter("scale",scale_amp,dim=4,step=0.01)
amp_tune_short.pulses[0].scale = scale
amp_tune_short.pulses[1].scale = scale
amp_tune_short.evolution([scale])
self.launch(amp_tune_short, "autoDEER_amptune")
time.sleep(5)
self.terminate()
time.sleep(2)
while self.api.is_exp_running():
time.sleep(1)
self.api.cur_exp['ftEPR.StartPlsPrg'].value = True
TimeBase = self.api.hidden['specJet.TimeBase'].value
self.api.hidden['specJet.NoOfPoints'].value = int(512/TimeBase)
self.api.hidden['specJet.NoOfAverages'].value = 1
self.api.hidden['specJet.NOnBoardAvgs'].value = 1
if not self.api.hidden['specJet.AverageStart'].value:
self.api.hidden['specJet.AverageStart'].value = True
time.sleep(3)
limit = np.abs(self.api.hidden['specjet.DataRange'][0])
data = get_specjet_data(self)
if (np.abs(data.real).max() > 0.95*limit) or (np.abs(data.imag).max() > 0.95*limit):
self.api.set_video_gain(vg - 12)
overflow_flag= True
print(f'overflow, videogain now {vg - 12}')
elif (np.abs(data.real).max() < 0.02*limit) and (np.abs(data.imag).max() < 0.02*limit):
self.api.set_video_gain(vg + 6)
overflow_flag= True
print(f'underflow, videogain now {vg + 6}')
else:
overflow_flag = False
return p90, p180
def tune_pulse(self, pulse, mode, freq, B , reptime, shots=400,tp=12):
"""Tunes a single pulse a range of methods.
Parameters
----------
pulse : Pulse
The Pulse object in need of tuning.
mode : str
The method to be used.
freq : float
The local oscilator frequency in GHz
B : float
Magnetic B0 field position in Gauss
reptime : us
Shot repetion time in us.
shots: int
The number of shots
tp: float
The length of the pi/2 pulse used for Hahn Echo, by default 12 ns. The pi pulse will be twice the length
Returns
-------
Tunned Pulse: Pulse
The returned pulse object that is now tunned.
"""
# Check pulse is a pulse
if type(pulse) == Delay:
pass
if type(pulse) == Detection:
pass
# Get absolute central frequency
if hasattr(pulse,"freq"):
c_frq = pulse.freq.value + freq
elif hasattr(pulse, "init_freq") & hasattr(pulse, "BW"):
c_frq = pulse.init_freq.value + 0.5*pulse.BW.value + freq
elif hasattr(pulse, "final_freq") & hasattr(pulse, "BW"):
c_frq = pulse.final_freq.value - 0.5*pulse.BW.value + freq
elif hasattr(pulse, "init_freq") & hasattr(pulse, "final_freq"):
c_frq = 0.5*(pulse.final_freq.value + pulse.final_freq.value) + freq
pi2_pulse, pi_pulse = self.tune_rectpulse(tp=tp, B=B, freq=c_frq, reptime=reptime)
if mode == "amp_hahn":
amp_tune =HahnEchoSequence(
B=B, freq=freq,
reptime=reptime, averages=1, shots=shots,
pi2_pulse = pulse, pi_pulse=pi_pulse
)
scale = Parameter('scale',0,unit=None,step=0.02, dim=51, description='The amplitude of the pulse 0-1')
amp_tune.pulses[0].scale = scale
amp_tune.evolution([scale])
self.launch(amp_tune, "autoDEER_amptune")
while self.isrunning():
time.sleep(10)
dataset = self.acquire_dataset()
new_amp = np.around(dataset.pulse0_scale[dataset.data.argmax()].data,2)
pulse.scale = Parameter('scale',new_amp,unit=None,description='The amplitude of the pulse 0-1')
return pulse
elif mode == "amp_nut":
nut_tune = Sequence(
name="nut_tune", B=(B/freq*c_frq), freq=freq, reptime=reptime,
averages=1,shots=shots
)
nut_tune.addPulse(pulse.copy(
t=0, pcyc={"phases":[0],"dets":[1]}, scale=0))
nut_tune.addPulse(
pi2_pulse.copy(t=2e3,
pcyc={"phases":[0, np.pi],"dets":[1, -1]},
freq=c_frq-freq))
nut_tune.addPulse(
pi_pulse.copy(t=2.5e3, pcyc={"phases":[0],"dets":[1]},
freq=c_frq-freq))
nut_tune.addPulse(Detection(t=3e3, tp=512, freq=c_frq-freq))
scale = Parameter('scale',0,unit=None,step=0.02, dim=51, description='The amplitude of the pulse 0-1')
nut_tune.pulses[0].scale = scale
nut_tune.evolution([scale])
self.launch(nut_tune, "autoDEER_amptune")
while self.isrunning():
time.sleep(10)
dataset = self.acquire_dataset()
dataset = dataset.epr.correctphase
data = dataset.data
axis = scale.get_axis()
# data = correctphase(dataset.data)
if data[0] < 0:
data *= -1
if np.isclose(pulse.flipangle.value, np.pi):
new_amp = np.around(axis[data.argmin()].data,2)
elif np.isclose(pulse.flipangle.value, np.pi/2):
sign_changes = np.diff(np.sign(np.real(data)))
new_amp = np.around(axis[np.nonzero(sign_changes)[0][0]].data,2)
else:
raise RuntimeError("Target pulse can only have a flip angle of either: ",
"pi or pi/2.")
pulse.scale = Parameter('scale',new_amp,unit=None,description='The amplitude of the pulse 0-1')
return pulse
def phasetune_pulse(self, pulse):
# Bruker SpecJet-I has a phase issue. The pulse is observed and optimised through the transmision mode.
if isinstance(pulse, Detection):
raise RuntimeError("Detection pulses cannot be phase tuned.")
current_B = self.api.get_field()
current_freq = self.api.get_counterfreq()
test_seq = Sequence(name='test_seq',B=current_B,freq=current_freq,reptime=250,averages=1,shots=20)
test_seq.addPulse(Detection(tp=1024,t=0))
test_seq.addPulse(pulse.copy(t=200))
test_seq.evolution([])
self.launch(test_seq, "autoDEER_phasetune")
time.sleep(1)
self.terminate()
self.api.cur_exp['ftEPR.StartPlsPrg'].value = True
self.api.hidden['specJet.NoOfAverages'].value = 20
self.api.hidden['specJet.NOnBoardAvgs'].value = 20
self.api.hidden['specJet.NoOfPoints'].value = 1024
def isrunning(self) -> bool:
if self.tuning:
return True
if self.bg_thread is None:
return self.api.is_exp_running()
else:
return self.bg_thread.running()
def terminate(self,now=False) -> None:
self.tuning = False
if self.bg_thread is None:
if now:
return self.api.abort_exp()
else:
return self.api.stop_exp()
else:
attempt = self.bg_thread.cancel()
if not attempt:
raise RuntimeError("Thread failed to be canceled!")
# Override the terminate method to allow for a more complex termination, that reduces the number of call to the XeprAPI
def terminate_at(self, criterion, test_interval=2, keep_running=True, verbosity=0,autosave=True):
"""Terminates the experiment upon a specific condition being
satisified.
Parameters
----------
criterion : _type_
The criteria to be tested.
test_interval : int, optional
How often should the criteria be tested in minutes, by default 10.
keep_running : bool, optional
If True, an error will not be raised if the experiment finishes before the criteria is met, by default True.
verbosity : int, optional
The verbosity level, by default 0.
autosave : bool, optional
If True, the data will be autosaved, by default True.
"""
test_interval_seconds = test_interval * 60
condition = False
last_scan = 0
seq_time_scan = self.cur_exp._estimate_time() / self.cur_exp.averages.value
while not condition:
time.sleep(np.max([seq_time_scan/4,10]))
if not self.isrunning():
if keep_running:
self.terminate()
return None
else:
msg = "Experiments has finished before criteria met."
raise RuntimeError(msg)
start_time = time.time()
# TODO: make this behavour a property of the criteria not the sequence
if any(cls.__name__ == 'DEERSequence' for cls in self.cur_exp.__class__.__mro__):
# Special handling for DEERSequence
restart_exp= True
else:
restart_exp = False
restart_exp=True
data = self.acquire_dataset(after_scan=last_scan,restart_exp=restart_exp)
if autosave:
self.log.debug(f"Autosaving to {os.path.join(self.savefolder,self.savename)}")
data.to_netcdf(os.path.join(self.savefolder,self.savename),engine='h5netcdf',invalid_netcdf=True)
try:
# nAvgs = data.num_scans.value
nAvgs = data.attrs['nAvgs']
except AttributeError or KeyError:
self.log.warning("WARNING: Dataset missing number of averages(nAvgs)!")
nAvgs = 1
finally:
if nAvgs < 1:
time.sleep(seq_time_scan/2) # TODO: Replace with single scan time
continue
elif nAvgs <= last_scan:
time.sleep(seq_time_scan/2)
continue
last_scan = nAvgs
if verbosity > 0:
print("Testing")
if isinstance(criterion,list):
conditions = [crit.test(data, verbosity) for crit in criterion]
condition = any(conditions)
else:
condition = criterion.test(data, verbosity)
if not condition:
if not restart_exp:
self.api.rerun_exp()
end_time = time.time()
if (end_time - start_time) < test_interval_seconds:
if verbosity > 0:
print("Sleeping")
time.sleep(test_interval_seconds - (end_time - start_time))
if isinstance(criterion,list):
for i,crit in enumerate(criterion):
if conditions[i]:
if callable(crit.end_signal):
crit.end_signal()
else:
if callable(criterion.end_signal):
criterion.end_signal()
self.terminate()
time.sleep(3)
pass
def calc_d0(self):
"""
This creates an initial guess for d0. A better estimate can only be found after the field sweep.
"""
hw_log.info('Calcuating d0')
hw_log.debug('Setting Detection = TM')
self.api.hidden['Detection'].value = 'TM'
B = self.api.get_field()
freq = self.api.get_counterfreq()
self.api.set_attenuator('+<x>',100)
d0=0
self.d0=d0
seq = Sequence(name='single_pulse',B=B,freq=freq,reptime=3e3,averages=1,shots=20)
det_tp = Parameter('tp',value=16,dim=4,step=0)
seq.addPulse(RectPulse(tp=det_tp,t=0,flipangle=np.pi,scale=1))
seq.addPulse(Detection(tp=16,t=d0))
seq.evolution([det_tp])
self.launch(seq,savename='test',tune=False)
time.sleep(3)
while self.isrunning():
time.sleep(1)
# self.terminate(now=True)
# time.sleep(1.5)
self.api.cur_exp['ftEPR.StartPlsPrg'].value = True
self.api.hidden['specJet.NoOfAverages'].value = 20
self.api.hidden['specJet.NOnBoardAvgs'].value = 20
if not self.api.hidden['specJet.AverageStart'].value:
self.api.hidden['specJet.AverageStart'].value = True
# Set the time base of specjet to a minimum
time_base = self.api.hidden['specJet.TimeBase'].aqGetParMinValue()
self.api.hidden['specJet.TimeBase'].value = time_base
n_points = int(1024/time_base)
self.api.hidden['specJet.NoOfPoints'].value = n_points
time.sleep(3)
optimal = False
while not optimal:
max_value = np.abs(get_specjet_data(self)).max()
y_max = np.abs(self.api.hidden['specjet.DataRange'][0])
vg =self.api.get_video_gain()
vg_step = self.api.get_video_gain_step()
if max_value > 0.7* y_max:
self.api.set_video_gain(vg - vg_step)
time.sleep(0.5)
elif max_value < 0.3* y_max:
self.api.set_video_gain(vg + vg_step)
time.sleep(0.5)
else:
optimal=True
specjet_data = np.abs(get_specjet_data(self))
calc_d0 = d0 + self.api.hidden['specJet.Abs1Data'][specjet_data.argmax()]
d0 = calc_d0 - 256
seq = Sequence(name='single_pulse',B=B,freq=freq,reptime=3e3,averages=1,shots=20)
det_tp = Parameter('tp',value=16,dim=4,step=0)
seq.addPulse(RectPulse(tp=det_tp,t=0,flipangle=np.pi,scale=1))
seq.addPulse(Detection(tp=16,t=d0))
seq.evolution([det_tp])
self.launch(seq,savename='test',tune=False)
time.sleep(3)
while self.isrunning():
time.sleep(1)
# self.terminate()
self.api.cur_exp['ftEPR.StartPlsPrg'].value = True
if not self.api.hidden['specJet.AverageStart'].value:
self.api.hidden['specJet.AverageStart'].value = True
self.api.hidden['specJet.NoOfPoints'].value = int(512/time_base)
time.sleep(3)
specjet_data = np.abs(get_specjet_data(self))
calc_d0 = d0 + self.api.hidden['specJet.Abs1Data'][specjet_data.argmax()]
self.d0 = calc_d0 + 64 # 64ns added to compensate for hahn echo center in field sweep
hw_log.info(f"d0 set to {self.d0}")
self.api.hidden['Detection'].value = 'Signal'
def calc_d0_from_Hahn_Echo(self, B=None, freq=None):
B = self.api.get_field()
freq = self.api.get_counterfreq()
if B is not None:
self.api.set_field(B)
time.sleep(5)
else:
B = self.api.get_field()
if freq is not None:
self.api.set_freq(freq)
else:
freq = self.api.get_counterfreq()
d0 = self.d0
# self.api.set_PulseSpel_var('d0',d0)
# self.api.run_exp()
# self.api.abort_exp()
while self.api.is_exp_running():
time.sleep(1)
self.api.cur_exp['ftEPR.StartPlsPrg'].value = True
self.api.hidden['specJet.NoOfAverages'].value = 20
self.api.hidden['specJet.NOnBoardAvgs'].value = 20
if not self.api.hidden['specJet.AverageStart'].value:
self.api.hidden['specJet.AverageStart'].value = True
time_base = self.api.hidden['specJet.TimeBase'].aqGetParMinValue()
self.api.hidden['specJet.TimeBase'].value = time_base
self.api.hidden['specJet.NoOfPoints'].value = int(512/time_base)
optimal = False
while not optimal:
max_value = np.abs(get_specjet_data(self)).max()
y_max = np.abs(self.api.hidden['specjet.DataRange'][0])
vg =self.api.get_video_gain()
vg_step = self.api.get_video_gain_step()
if max_value > 0.7* y_max:
self.api.set_video_gain(vg - vg_step)
time.sleep(0.5)
elif max_value < 0.3* y_max:
self.api.set_video_gain(vg + vg_step)
time.sleep(0.5)
else:
optimal=True
specjet_data = np.abs(get_specjet_data(self))
calc_d0 = d0 - 64 + self.api.hidden['specJet.Abs1Data'][specjet_data.argmax()]
self.d0 = calc_d0
hw_log.info(f"d0 set to {self.d0}")
return self.d0
# =============================================================================
[docs]
def test_if_MPFU_compatability(seq):
table = seq.progTable
if 'freq' in table['Variable']:
return False
elif np.unique(table['axID']).size > 2:
return False
else:
return True