# External imports
import numpy as np
import time
import re
import warnings
import requests
import pickle
import datetime
import logging
# PyEPR imports
from pyepr.classes import Interface, Parameter
from pyepr.pulses import Delay, Detection
from pyepr.sequences import Sequence, HahnEchoSequence
[docs]
log = logging.getLogger("interface")
class PyEPRControlInterface(Interface):
"""
Represents the interface for spectrometers built using PyEPR and connected with a local flask server.
"""
def __init__(self,config_file_path:str):
"""
Parameters
-----------
config_file_path : float
The file path for the spectrometer configuration file.
"""
super().__init__()
self.IFgain_options = np.array([0, 20, 40])
self.IFgain = 2
self.config_file = config_file_path
self.server = None
@property
def savefolder(self):
return self._savefolder
@savefolder.setter
def savefolder(self, folder):
self._savefolder = folder
if hasattr(self,'server') and self.server is not None:
response = requests.post(self.server + '/set_savefolder', json={'savefolder': folder})
if 'error' in response.json():
raise RuntimeError(response.json()['error'])
def connect(self,ip='localhost',port=5000):
## Connect to the server
self.ip = ip
self.port = port
self.server = f'http://{ip}:{port}'
try:
response = requests.get(self.server + '/isconfigured')
if response.json()['isconfigured']:
log.info("Spectrometer already configured")
print("Spectrometer already configured")
else:
if self.config_file is None:
raise ValueError("No config file provided. Please provide a config file to connect to the spectrometer.")
response = requests.post(self.server + '/connect', json={'config_file': self.config_file})
if 'error' in response.json():
raise RuntimeError(response.json()['detail'])
else:
log.info(response.json()['message'])
except requests.exceptions.ConnectionError:
raise RuntimeError(f"Could not connect to server at {self.server}. Please check the server is running.")
if self.savefolder is not None:
self.savefolder = self.savefolder
def disconnect(self):
response = requests.post(self.server + '/disconnect')
log.info(response.json()['message'])
self.server = None
return True
def acquire_dataset(self,verbosity=0,sum_scans=True, **kwargs):
for i in range(60):
args = kwargs.copy()
args['downconvert'] = True
response = requests.get(self.server + '/get_data', json={'downconvert': True})
if 'error' in response.json():
if verbosity > 0:
log.warning(response.json()['error'])
time.sleep(5)
continue
elif 'data' in response.json():
break
time.sleep(2)
if 'data' in response.json():
data = pickle.loads(response.json()['data'].encode('latin1')) #xr.datarray
if sum_scans and 'Scan' in data.dims:
return data.sum('Scan',keep_attrs = True)
else:
return data
else:
raise RuntimeError("No data returned from server")
def get_buffer(self,verbosity=0,sum_scans=True, **kwargs):
for i in range(60):
args = kwargs.copy()
args['downconvert'] = False
response = requests.get(self.server + '/get_databuffer', json=args)
if 'error' in response.json():
if verbosity > 0:
log.warning(response.json()['error'])
time.sleep(5)
continue
elif 'data' in response.json():
break
time.sleep(2)
if 'data' in response.json():
data = pickle.loads(response.json()['data'].encode('latin1')) #xr.datarray
if sum_scans and 'Scan' in data.dims:
return data.sum('Scan',keep_attrs = True)
else:
return data
else:
raise RuntimeError("No data returned from server")
def set_param(self, param: str, value: float):
"""Set a parameter for the spectrometer.
Parameters
----------
param : str
The parameter to set. Possible values are 'LO', 'temp', 'reptime', 'field', 'videoGain'.
value : float
The value to set the parameter to.
"""
response = requests.post(self.server + '/set_param', json={'param': param, 'value': value})
if 'error' in response.json():
raise RuntimeError(response.json()['error'])
else:
log.info(response.json()['message'])
return True
def terminate(self):
response = requests.post(self.server + '/terminate')
log.debug(response.json()['message'])
return True
def launch(self, sequence: Sequence , savename: str, IFgain=None, *args,**kwargs):
# increase the detection length to a minimum 1024ns
for pulse in sequence.pulses:
if isinstance(pulse, Detection):
if pulse.tp.value < 128:
pulse.tp.value = 128
if (IFgain is None) or (IFgain is True):
test_IF = True
while test_IF:
self.terminate()
self._launch(sequence,savename,self.IFgain, *args,**kwargs)
scan_time = sequence._estimate_time() / sequence.averages.value
check_1stScan = True
while check_1stScan:
time.sleep(10)
dataset = self.acquire_dataset()
# if dataset.nAvgs == 0:
# time.sleep(np.min([scan_time//10,2]))
# continue
dig_level = dataset.attrs['diglevel']
pos_levels = dig_level * self.IFgain_options / self.IFgain_options[self.IFgain]
pos_levels[pos_levels > 0.85] = 0
if dig_level == 0:
continue
if (pos_levels[self.IFgain] > 0.85) or (pos_levels[self.IFgain] < 0.03):
best_IFgain = np.argmax(pos_levels)
else:
best_IFgain = self.IFgain
if np.all(pos_levels==0):
log.critical('Saturation detected with IF gain 0. Please check the power levels.')
raise RuntimeError('Saturation detected with IF gain 0. Please check the power levels.')
elif (best_IFgain < self.IFgain) and (dataset.nAvgs == 0):
new_IFgain = np.max([self.IFgain - 1,best_IFgain])
log.info(f"IF gain changed from {self.IFgain} to {new_IFgain}")
self.IFgain = new_IFgain
check_1stScan = False
elif (best_IFgain != self.IFgain) and (dataset.nAvgs >= 1):
new_IFgain = np.min([self.IFgain +1,best_IFgain])
log.info(f"IF gain changed from {self.IFgain} to {new_IFgain}")
self.IFgain = new_IFgain
check_1stScan = False
elif dataset.nAvgs >=1:
log.debug(f"IF gain {self.IFgain} is optimal")
check_1stScan = False
test_IF = False
elif (IFgain is not None) and isinstance(IFgain, (int, float)):
self._launch(sequence,savename,IFgain, *args,**kwargs)
else:
raise ValueError(f"IFgain must be of type [None, bool, int, float]. {IFgain} is not valid.")
def _launch(self, sequence: Sequence , savename: str, IFgain=0,reset_cur_exp=True,*args,**kwargs):
timestamp = datetime.datetime.now().strftime(r'%Y%m%d_%H%M_')
self.savename = timestamp + savename + '.h5'
if reset_cur_exp:
self.cur_exp = sequence
new_IFgain = self.IFgain_options[IFgain]
if isinstance(new_IFgain, np.ndarray):
log.warning(f"IF gain {new_IFgain} is not a valid option. Using {new_IFgain[-1]}")
new_IFgain = new_IFgain[-1]
log.debug(f"Launching sequence {savename} with IF gain {new_IFgain}")
launch_arg = {
'seq': sequence,
'savefile': self.savename,
'IFgain': new_IFgain
}
launch_arg.update(kwargs)
response = requests.post(self.server + '/launch', data= pickle.dumps(launch_arg))
if 'error' in response.json():
raise RuntimeError(response.json()['error'])
elif response.status_code == 500:
raise RuntimeError(response.json()['detail'])
pass
def isrunning(self) -> bool:
response = requests.get(self.server + '/isrunning')
return response.json()['isrunning']
def tune_rectpulse(self,*,tp, freq, B, reptime, shots=400):
"""Generates a rectangular pi and pi/2 pulse of the given length at
the given field position. This value is stored in the pulse cache.
Parameters
----------
tp : float
Pulse length of pi/2 pulse in ns
freq : float
Central frequency of this pulse in GHz
B : float
Magnetic B0 field position in Gauss
reptime: float
Shot repetion time in us.
shots: int
The number of shots
Returns
-------
p90: RectPulse
A tuned rectangular pi/2 pulse of length tp
p180: RectPulse
A tuned rectangular pi pulse of length tp
"""
amp_tune =HahnEchoSequence(
B=B, freq=freq, reptime=reptime, averages=1, shots=shots
)
scale = Parameter("scale",0,dim=45,step=0.02)
amp_tune.pulses[0].tp.value = tp
amp_tune.pulses[0].scale = scale
amp_tune.pulses[1].tp.value = tp * 2
amp_tune.pulses[1].scale = scale
amp_tune.evolution([scale])
self.launch(amp_tune, "autoDEER_amptune")
while self.isrunning():
time.sleep(10)
dataset = self.acquire_dataset()
dataset = dataset.epr.correctphase
data = np.abs(dataset.data)
scale = np.around(dataset.pulse0_scale[data.argmax()].data,2)
log.debug(f"Optimal scale for {tp} ns pulse is {scale}")
if scale > 0.9:
raise RuntimeError("Not enough power avaliable.")
if scale == 0:
warnings.warn("Pulse tuned with a scale of zero!")
p90 = amp_tune.pulses[0].copy(
scale=scale, freq=amp_tune.freq)
p180 = amp_tune.pulses[1].copy(
scale=scale, freq=amp_tune.freq)
return p90, p180
def tune_pulse(self, pulse, mode, freq, B , reptime, shots=400):
"""Tunes a single pulse a range of methods.
Parameters
----------
pulse : Pulse
The Pulse object in need of tuning.
mode : str
The method to be used.
freq : float
The local oscilator frequency in GHz
B : float
Magnetic B0 field position in Gauss
reptime : us
Shot repetion time in us.
shots: int
The number of shots
Returns
-------
Tunned Pulse: Pulse
The returned pulse object that is now tunned.
"""
# Check pulse is a pulse
if type(pulse) == Delay:
pass
if type(pulse) == Detection:
pass
# Get absolute central frequency
if hasattr(pulse,"freq"):
c_frq = pulse.freq.value + freq
elif hasattr(pulse, "init_freq") & hasattr(pulse, "BW"):
c_frq = pulse.init_freq.value + 0.5*pulse.BW.value + freq
elif hasattr(pulse, "final_freq") & hasattr(pulse, "BW"):
c_frq = pulse.final_freq.value - 0.5*pulse.BW.value + freq
elif hasattr(pulse, "init_freq") & hasattr(pulse, "final_freq"):
c_frq = 0.5*(pulse.final_freq.value + pulse.final_freq.value) + freq
# Find rect pulses
if mode == "amp_hahn":
if pulse.flipangle.value == np.pi:
tp = pulse.tp.value / 2
elif pulse.flipangle.value == np.pi/2:
tp = pulse.tp.value
pi2_pulse, pi_pulse = self.tune_rectpulse(tp=tp, B=B, freq=c_frq, reptime=reptime)
amp_tune =HahnEchoSequence(
B=B, freq=freq,
reptime=reptime, averages=1, shots=shots,
pi2_pulse = pulse, pi_pulse=pi_pulse
)
scale = Parameter('scale',0,unit=None,step=0.02, dim=45, description='The amplitude of the pulse 0-1')
amp_tune.pulses[0].scale = scale
amp_tune.evolution([scale])
self.launch(amp_tune, "autoDEER_amptune")
while self.isrunning():
time.sleep(10)
dataset = self.acquire_dataset()
dataset = dataset.epr.correctphase
data = np.abs(dataset.data)
new_amp = np.around(dataset.pulse0_scale[dataset.data.argmax()].data,2)
if new_amp > 0.9:
raise RuntimeError("Not enough power avaliable.")
if new_amp == 0:
warnings.warn("Pulse tuned with a scale of zero!")
pulse.scale = Parameter('scale',new_amp,unit=None,description='The amplitude of the pulse 0-1')
return pulse
elif mode == "amp_nut":
pi2_pulse, pi_pulse = self.tune_rectpulse(tp=12, B=B, freq=c_frq, reptime=reptime)
nut_tune = Sequence(
name="nut_tune", B=(B/freq*c_frq), freq=freq, reptime=reptime,
averages=1,shots=shots
)
nut_tune.addPulse(pulse.copy(
t=0, pcyc={"phases":[0],"dets":[1]}, scale=0))
nut_tune.addPulse(
pi2_pulse.copy(t=2e3,
pcyc={"phases":[0, np.pi],"dets":[1, -1]},
freq=c_frq-freq))
nut_tune.addPulse(
pi_pulse.copy(t=2.5e3, pcyc={"phases":[0],"dets":[1]},
freq=c_frq-freq))
nut_tune.addPulse(Detection(t=3e3, tp=512, freq=c_frq-freq))
scale = Parameter('scale',0,unit=None,step=0.02, dim=45, description='The amplitude of the pulse 0-1')
nut_tune.pulses[0].scale = scale
nut_tune.evolution([scale])
# nut_tune.addPulsesProg(
# pulses=[0],
# variables=["scale"],
# axis_id = 0,
# axis= np.arange(0,0.9,0.02)
# )
self.launch(nut_tune, "autoDEER_amptune")
while self.isrunning():
time.sleep(10)
dataset = self.acquire_dataset()
dataset = dataset.epr.correctphase
data = dataset.data
axis = dataset.pulse0_scale
# data = correctphase(dataset.data)
if data[0] < 0:
data *= -1
if np.isclose(pulse.flipangle.value, np.pi):
new_amp = np.around(axis[data.argmin()].data,2)
elif np.isclose(pulse.flipangle.value, np.pi/2):
sign_changes = np.diff(np.sign(np.real(data)))
new_amp = np.around(axis[np.nonzero(sign_changes)[0][0]].data,2)
else:
raise RuntimeError("Target pulse can only have a flip angle of either: ",
"pi or pi/2.")
pulse.scale = Parameter('scale',new_amp,unit=None,description='The amplitude of the pulse 0-1')
return pulse
def tune(self,*, sequence=None, mode="amp_hahn", freq=None, gyro=None):
if mode == "rect_tune":
if freq is None:
raise ValueError("freq must be given for rect_tune")
if gyro is None:
raise ValueError("gyro must be give")
elif gyro >1:
raise ValueError("Gyromagnetic ratio must be give in GHz/G")
amp_tune =HahnEchoSequence(
B=freq/gyro, freq=freq, reptime=2e3, averages=1, shots=400
)
tp = 12
amp_tune.pulses[0].tp.value = tp
amp_tune.pulses[0].scale.value = 0
amp_tune.pulses[1].tp.value = tp*2
amp_tune.pulses[1].scale.value = 0
amp_tune.addPulsesProg(
pulses=[0,1],
variables=['scale','scale'],
axis_id=0,
axis=np.arange(0,0.9,0.02),
)
self.launch(amp_tune, "autoDEER_amptune", IFgain=0)
while self.isrunning():
time.sleep(10)
dataset = self.acquire_dataset()
scale = np.around(dataset.pulse0_scale[dataset.data.argmax()].data,2)
if scale > 0.9:
raise RuntimeError("Not enough power avaliable.")
self.pulses[f"p90_{tp}"] = amp_tune.pulses[0].copy(
scale=scale, freq=amp_tune.freq)
self.pulses[f"p180_{tp*2}"] = amp_tune.pulses[1].copy(
scale=scale, freq=amp_tune.freq)
elif mode == "amp_hahn":
for pulse in sequence.pulses:
if type(pulse) == Delay:
continue
if type(pulse) == Detection:
continue
all_pulses = list(self.pulses.keys())
pulse_matches = []
for pulse_name in all_pulses:
if not re.match(r"^p180_",pulse_name):
continue
if not np.abs((self.pulses[pulse_name].freq.value + self.pulses[pulse_name].freq.value) - (sequence.freq.value + pulse.freq.value)) < 0.01:
continue
pulse_matches.append(pulse_name)
ps_length_best =1e6
for pulse_name in pulse_matches:
ps_length = int(re.search(r"p180_(\d+)",pulse_name).groups()[0])
if ps_length < ps_length_best:
ps_length_best = ps_length
pi_pulse = self.pulses[f"p180_{ps_length_best}"]
amp_tune =HahnEchoSequence(
B=sequence.B.value, freq=sequence.freq.value,
reptime=sequence.reptime.value, averages=1, shots=400,
pi2_pulse = pulse, pi_pulse=pi_pulse
)
amp_tune.pulses[0].scale.value = 0
amp_tune.addPulsesProg(
pulses=[0],
variables=['scale'],
axis_id=0,
axis=np.arange(0,0.9,0.02),
)
self.launch(amp_tune, "autoDEER_amptune")
while self.isrunning():
time.sleep(10)
dataset = self.acquire_dataset()
scale = np.around(dataset.pulse0_scale[dataset.data.argmax()].data,2)
pulse.scale.value = scale
return sequence