Commit 52ce600e authored by Christian Schneider's avatar Christian Schneider
Browse files

Merge branch 'integrated_swept' into resonator

parents 21b1b195 53fcfcdf
import numpy as np
from plotly.subplots import make_subplots
import plotly.graph_objects as go
class Parameter():
"""Structure to keep the name, data, unit and metadata associated to a Parameter together.
......@@ -97,7 +99,8 @@ class DataSet():
# Do it more clever
self.metadata.update(dataset.metadata)
"""
......
......@@ -7,6 +7,9 @@ from collections import ChainMap
import functools
from DataHandling.core import Parameter, Measurement, DataSet
import plotly
import plotly.graph_objects as go
class HDF5Saver(h5py.File):
"""Save a Dataset in HDF5 format using h5py.
......@@ -29,17 +32,14 @@ class HDF5Saver(h5py.File):
super().__init__(fname, mode='w-', libver='latest')
self.swmr_mode = True # Allow to read the file when it is being written.
print([[parameter.name, parameter.data] for parameter in dataset.parameters.values()])
self.parameters = self.create_group('Parameters')
for parameter in dataset.parameters.values():
parameter_data_set = self.parameters.create_dataset(parameter.name,
data=parameter.data)
parameter_data_set.attrs['Unit'] = parameter.unit
for key, value in parameter.metadata.items():
parameter_data_set.attrs[key] = value
parameter_data_set.attrs['Unit'] = parameter.unit
self.shape = {parameter.name: len(parameter.data) for parameter in dataset.parameters.values()}
......@@ -63,9 +63,9 @@ class HDF5Saver(h5py.File):
for i, parameter in enumerate(measurement.indexing):
measurement_data_set.dims[i].label = parameter
for key, value in dataset.metadata.items():
self.attrs[key] = value
print(key, value)
self.flush()
......@@ -79,6 +79,22 @@ class HDF5Saver(h5py.File):
def load_hdf5(fname):
return HDF5Loader(fname).dataset
def plot_hdf5(fname, swept_parameter, fixed_parameter, measurement):
dataset = load_hdf5(fname)
fig = go.Figure()
#for i, parameter in enumerate(dataset.measurements[measurement].indexing):
for i, value in enumerate(dataset.parameters[swept_parameter].data):
y = dataset.measurements[measurement].data[i]
x = dataset.parameters[fixed_parameter].data
fig.add_trace(go.Scatter(x = x, y = 20*np.log10(np.abs(y)),mode='lines', name=dataset.parameters[swept_parameter].name + str(value), line = dict(width = 2)))
fig.update_layout(plot_bgcolor='white')
fig.update_xaxes(showline = True, linewidth=2, linecolor='black',
showgrid=True, gridwidth=1, gridcolor='Black', title_text = 'f, GHz')
fig.update_yaxes(showline = True, linewidth=2, linecolor='black',
showgrid=True, gridwidth=1, gridcolor='Black', title_text = '{}'.format(measurement))
fig.show()
class HDF5Loader(h5py.File):
"""Load an HDF5 file to a Dataset
......
# -*- coding: utf-8 -*-
"""
Created on Fri Mar 28 09:43:05 2014
@author: Seyed Iman Mirzaei, Oscar Gargiulo, Christian Schneider, David Zoepfl, Romain Albert
@author: Seyed Iman Mirzaei, Oscar Gargiulo, Christian Schneider, David Zoepfl
Driver for ENA E5071C and
Driver for ENA E5071C
v2.2.0 - OSC:
- changed all units to SI
v2.1.1 - CHR:
- Changed to binary data format (to be able to use pyvisa)
"""
v2.0.0 - CHR:
- Adapted to Driver/Instrument structure
v2.0.1 - OSC:
- migrated to VISA
"""
import visa
import pyvisa as visa
import time
import numpy as np
from tqdm import tqdm_notebook
import struct
from Instruments.Drivers.Keysight.VNA_Keysight import VNAKeysight
# Versoin
version = '2.2.0'
version = '3.0.0'
print('E5071C {}'.format(version))
class E5071C(object):
class E5071C(VNAKeysight):
global version
def __init__(self, ip, *pars, **kwargs):
rm = visa.ResourceManager('@py')
self._inst = rm.open_resource('TCPIP::{}::INSTR'.format(ip))
self.version = version
self.ip = ip
# Set timeout to a minute
......@@ -44,9 +34,8 @@ class E5071C(object):
else:
self._inst.timeout = 60*1000 # Set to a minute
print(self._inst.timeout)
self.average()
super().__init__()
def com(self, command, arg="?", raw=False):
"""Function to communicate with the device. Gives the current status
if no arg is given
......@@ -89,6 +78,10 @@ class E5071C(object):
self._inst.write("{} {}".format(command, arg))
return 0
def close(self):
'''Close connection to the instrument'''
self._inst.close()
def get_complex_value(self, command):
"""Function to communicate with the device. Gives the current status
if no arg is given
......@@ -102,73 +95,16 @@ class E5071C(object):
self._inst.write(':form:bord norm')
return resp
def identify(self):
"""Returns Identification String of the device"""
return self.com('*OPT')
def close(self):
'''Close connection to the instrument'''
self._inst.close()
def output(self, arg='?'):
"""Turns RF output power on/off
Give no argument to query current status.
Parameters
-----------
arg : int, str
Set state to 'ON', 'OFF', 1, 0
"""
return self.com(":OUTP", arg)
def power(self, power='?', channel=''):
"""Set or read current power"""
return float(self.com(":SOUR{}:POW:LEV:IMM:AMPL".format(channel), power))
def average(self, number_average=1):
"""Set/reset all parameters for doing an average measurement"""
self.number_average = number_average
super().average(number_average)
if number_average == 1:
self.average_state(0)
self.com(':TRIG:SEQ:AVER', 'OFF')
else:
self.average_state(1)
self.average_count(number_average)
self.com(':TRIG:SEQ:AVER', 'ON')
def average_reset(self, channel=''):
"""Reset averages"""
return self.com(":SENS{}:AVER:CLE".format(channel), "")
def average_count(self, count='?', channel=''):
"""Set/query number of averages"""
return int(self.com(":SENS{}:AVER:COUN".format(channel), count))
def average_state(self, state='?', channel=''):
"""Sets/query averaging state"""
return self.com(":SENS{}:AVER:STAT".format(channel), state)
def freq_start(self, freq='?', channel=''):
"""Set/query start frequency"""
return float(self.com(":SENS{}:FREQ:STAR".format(channel), freq))
def freq_stop(self, freq='?', channel=''):
"""Set/query stop frequency"""
return float(self.com(":SENS{}:FREQ:STOP".format(channel), freq))
def freq_center(self, freq='?', channel=''):
"""Set/query center frequency in Hz"""
return float(self.com(":SENS{}:FREQ:CENT".format(channel), freq))
def freq_span(self, freq='?', channel=''):
"""Set/query span in Hz"""
return float(self.com(":SENS{}:FREQ:SPAN".format(channel), freq))
def freq_npoints(self, points='?', channel=''):
"""Set/Query number of points"""
return int(self.com(":SENS{}:SWE:POIN".format(channel), points))
def freq_semgents(self, segments, BW=100, power=-50):
"""Defined the VNA frequency through segments.
......@@ -202,14 +138,6 @@ class E5071C(object):
self.com(':SENS:SWE:TYPE', 'SEGM')
self.com(':SENS:SEGM:ARB', 1) # Allow arb. segments (reversed, etc)
self.com(':SENS:SEGM:DATA', segment_str)
def IFBW(self, BW='?', channel=''):
"""Set/query IF Bandwidth for specified channel"""
return self.com(":SENS{}:BAND:RES".format(channel), BW)
def sweep_type(self, type='?', channel=''):
"""Set/query the type of sweep. Must be either linear, logartihmic, segmented or power."""
return self.com(":SENS{}:SWE:TYPE".format(channel), type)
def Spar(self, Par='?', trace=1):
if type(Par) != str:
......@@ -223,13 +151,13 @@ class E5071C(object):
else:
raise Exception('''S parameter must be either 'S11', 'S12', 'S21' or 'S22'.''')
def traces_number(self, num='1', channel='1'):
"""Set number of traces"""
return int(self.com("CALC{}:PAR:COUN".format(channel), num))
def trace_select(self, num=1, channel='1'):
"""Select trace number num"""
self.com('CALC{}:PAR{}:SEL'.format(channel, num), '')
def set_electrical_delay(self, delay=0., num=1, channel=''):
""" Set the electrical delay of the trace. Notice that a trace should be already selected."""
self.com("CALC{}:SEL:CORR:EDEL:TIME".format(channel), delay)
def Format(self, Format='?', Trace=1):
"""Set Data Format
......@@ -247,7 +175,7 @@ class E5071C(object):
| 'PPH': Positive phase
| '' (def): the format will be queried
"""
self.com("CALC:SEL:CORR:EDEL:TIME", 0)
return self.com('CALC1:SEL:FORM', Format)
# READING data
......@@ -266,71 +194,6 @@ class E5071C(object):
"""Return the last raw S measurement (before correction) in complex form."""
return self.get_complex_value(':CALC:TRACe{}:DATA:SDATa?'.format(trace))
def read_corrected_measurement(self, spar='S21'):
"""Return the last calibrated S measurement (after correction) in complex form."""
data = self.com(':SENSe:DATA:CORRdata', '? {}'.format(spar), raw=True)
return np.asarray(data[0::2], dtype=np.float) +1.j*np.asarray(data[1::2], dtype=np.float)
def read_settings(self):
"""Returns current state of VNA parameters as dict
Frequency in GHz.
"""
freq_start = self.freq_start()
freq_stop = self.freq_stop()
freq_span = freq_stop - freq_start
freq_npoints = self.freq_npoints()
BW = self.IFBW()
Spar = self.Spar()
format_meas = self.Format()
power = self.power()
output = self.output()
avg = self.average_count()
par = {'f_start (Hz)': freq_start,
'f_stop (Hz)': freq_stop,
'f_start (GHz)': freq_start * 1e-9,
'f_stop (GHz)': freq_stop * 1e-9,
'IF - BW (Hz)': BW,
'S_parameter ': Spar,
'Format': format_meas,
'Span (Hz)': freq_span,
'Points': freq_npoints,
'power (dBm)': power,
'output': output,
'averages': avg,
'averaging': self.average_state(),
'correction': self.correction()}
return par
def set_settings(self, **kwargs):
com_dict = {
'f_start (Hz)': self.freq_start,
'f_stop (Hz)': self.freq_stop,
'IF - BW (Hz)': self.IFBW,
'S_parameter ': self.Spar,
'Format': self.Format,
'Points': self.freq_npoints,
'power (dBm)': self.power,
'averaging': self.average_state,
'averages': self.average_count
}
for k in kwargs.keys():
try:
com_dict[k](kwargs[k])
except KeyError:
pass
def correction(self, state='?'):
"""Query or set correction status (to ON/OFF)
Parameters
-----------
state : '?', 'ON', 'OFF', 1, 0
State of correction. Use nothing or '?' for query
"""
return self.com('SENS:CORR:STAT', state)
def current_correction(self):
"""Return the current type correction used."""
......@@ -346,6 +209,13 @@ class E5071C(object):
self.com(':TRIG:AVER', 'OFF')
self.com(':TRIG:SEQ:SOUR', 'INT')
def send_trigger(self):
""" Send the trigger to initiate the measurement. """
self.com(':TRIG:SEQ:SINGLE', '')
def start_and_wait_end_measurement(self):
"""Start the measurement and wait up to the end of the measurment.
......
# -*- coding: utf-8 -*-
"""
Python Library driver for basic functionalities for the P5003A
Author: Christian Schneider <c.schneider@uibk.ac.at>
Romain Albert <romain.albert@uibk.ac.at>
"""
import struct
import pyhislip
import time
import numpy as np
from tqdm import tqdm_notebook
# Version
version = '3.0.0'
print('P5003A {}'.format(version))
class P5003A(object):
global version
def __init__(self, ip, *pars, **kwargs):
self._inst = pyhislip.HiSLIP()
ip = ip.split('::')[0]
self._inst.connect(ip)
self._inst.set_max_message_size(int(1<<10))
self.version = version
self.ip = ip
# Set timeout to a minute
if 'timeout' in kwargs:
self._inst.timeout = kwargs['timeout']
else:
self._inst.timeout = 60*1000 # Set to a minute
def com(self, command, arg="?", raw=False, suffix=''):
"""Function to communicate with the device. Gives the current status
if no arg is given
Use raw = True for binary data. Data will be acquired using binary
and converted to float.
Suffix is used to get the data of the trace
"""
if arg == "?":
if raw:
self._inst.write('FORM REAL,64')
self._inst.write('FORM:BORD SWAP')
resp = self._inst.ask("{}? {}".format(command, suffix), reqRaw=raw)
if raw:
n_digits = int(resp[1:2]) # digits for number of bytes
n_bytes = int(resp[2:2 + n_digits])
return np.frombuffer(resp[2+n_digits:-1], dtype=np.float64)
else:
try:
return float(resp)
except:
return resp
else:
self._inst.write("{} {} {}".format(command, arg, suffix))
return 0
def close(self):
'''close connection to the instrument'''
return
def average_count(self, count='?', channel=''):
"""Set/query number of averages"""
super().average_count(count, channel)
self.com('SENS:SWE:GRO:COUNT', count)
def freq_semgents(self, segments, BW=100, power=-50):
"""Defined the VNA frequency through segments.
If optional entries in segment dictionary are not given it will take
the overall BW and power set the arguments of this function.
Parameters
-----------
segments : list
[segment1, segment2, ...] where
segment1 = {'start': 1, 'stop': 10, 'npoints': 1601,
'BW':1000, 'power': -20}
"""
raise NotImplemented
def Spar(self, Par='?', Trace=1):
if type(Par) != str:
raise Exception('S parameter must be a string.')
elif ((Par == 'S11') or (Par == 'S12') or (Par == 'S21') or
(Par == 'S22')):
self.com(':CALC1:MEAS{}:PAR'.format(Trace), "\'{}\'".format(Par))
elif Par == '?':
return self.com(':CALC1:MEAS{}:PAR'.format(Trace)).strip('\"')
else:
raise Exception('''S parameter must be either 'S11', 'S12', 'S21' or 'S22'.''')
def trace_select(self, num=1, channel=''):
"""Select trace number num"""
self.com("CALC{}:PAR:MNUM".format(channel), num)
def set_electrical_delay(self, delay=0., num=1, channel=''):
""" Set the electrical delay of the trace."""
self.com("CALC{}:MEAS{}:CORR:EDEL".format(channel, num), delay)
def Format(self, Format='?', Trace=1):
"""Set Data Format
Parameters
-----------
Format : str
Choose from
| 'MLOG' : magntidue in dB
| 'PHAS': phase
| 'MLIN': linear magnitude
| 'REAL': real part of the complex data
| 'IMAG' imaginary part of the complex data
| 'UPH': Extended phase
| 'PPH': Positive phase
| '' (def): the format will be queried
"""
return self.com('CALC1:MEAS{}:FORM'.format(Trace), Format)
# READING data
def freq_read(self, channel=''):
"""Return the frequency currently used for the measurement."""
return self.com("CALC{}:MEAS:X".format(channel), raw=True)
def read_measurement(self, trace=1, channel=''):
"""Return the S measurement on the last selected trace;"""
self.trace_select(trace)
data = self.com('CALC{}:DATA'.format(channel), arg='?', suffix='SDATA', raw=True)
return data.view(np.complex128)
def current_correction(self):
"""Return the current type correction used."""
raise NotImplemented
return self._inst.query("SENS1:CORR:TYPE:CAT?").split(",")
def set_trigger(self):
"""Set trigger to bus and activated the continuous measurement."""
self.com('TRIG:SOUR', 'IMM')
self.com('TRIG:SCOP', 'ALL')
self.com('SENS:SWE:MODE', 'HOLD')
def reset_trigger(self):
"""Set trigger to internal and turn off the averaging trigger."""
self.com('TRIG:SOUR', 'IMM')
self.com('SENS:SWE:MODE', 'CONT')
def send_trigger(self):
""" Send the trigger to initiate the measurement. """
if self.number_average > 1:
self.com('SENS:SWE:MODE', 'GROUPS')
else:
self.com('SENS:SWE:MODE', 'SING')
def meas(self, f_range, npoints=1601, navg=10, power=-50,
Spar='S21', BW=1e3, Format='MLOG', power_port2=False,
scale="Lin"):
"""Measure and return data
Parameters
-----------
f_range : array
Frequency range [[f_start, f_stop]]
npoints : int
Number of Points
navg : int
Number of averages
power : float
VNA output power in dBm
Spar : str
S Parameter to measure. E.g. 'S21'
BW : int
IF Bandwidth in Hz
Format : str
Format for measurement. Choose from
| 'MLOG' : magnitude in dB
| 'PHAS': phase
| 'MLIN': linear magnitude
| 'REAL': real part of the complex data
| 'IMAG' imaginary part of the complex data
| 'UPH': Extended phase
| 'PPH': Positive phase
scale : "Log", "Lin"
Choose "Log" or "Lin" scale for frequencies
Returns
--------
np.array, np.array
Frequencies (GHz), Data
"""
# Check calibration and put error if power is also on port2
# cal_type = self.com("CALC:CORR:TYPE")
# if cal_type[0] == "SOLT2" and not power_port2:
# raise Exception("Excitation on Port2. Careful with HEMTS! " +
# "If you want to measure S22 and S21 set " +
# "power_port2=True. Else use Enhanced " +
# "Calibration of VNA.")
# Save currents pars
VNA_pars = self.read_settings()
# Set VNA parameters
self.IFBW(BW)
self.freq_npoints(npoints)
self.freq_start(f_range[0] * 1e9)
self.freq_stop(f_range[1] * 1e9)
self.power(power)
self.traces_number(1)
self.Spar(Spar, 1)
self.Format(Format)
# Set averages
if navg == 0:
self.average_state(0)
else:
self.com(':SENS1:AVER', 'ON')
self.average_count(navg)
# Set device to external Control
self.com('INIT:CONT', 'OFF')
self.com("SENS1:AVER:CLE",'')
self.com("SENS1:SWE:MODE", "CONT")
# Turn off delay
self.com("CALC1:MEAS1:CORR:EDEL", "0")
self.com("CALC1:MEAS2:CORR:EDEL", "0")
# Set sweep type
if scale.lower() == "log":
self.com("sense:sweep:type", "logarithmic")
else:
self.com("sense:sweep:type", "linear")
# Make the sweep
self.output(1)
self.com('INIT:CONT', 'ON')
# Wait until averaging finished
sweep_time = self.com("SENS:SWE:TIME")
if navg > 1:
self.com('sens:swe:gro:count', navg)
self.com('sens:swe:mode', 'groups')
self.com('*ESE', '1')
self.com('*CLS', '')
self.com('*OPC'