Commit 53fcfcdf authored by Romain Baptiste Dominique Albert's avatar Romain Baptiste Dominique Albert
Browse files

- VNA drivers refarctoring

- Add the possibility to perform sequential or simultaneous measurement using the sequential argument.
parent 9dcefde5
# -*- coding: utf-8 -*-
"""
Created on Fri Mar 28 09:43:05 2014
@author: Seyed Iman Mirzaei, Oscar Gargiulo, Christian Schneider, David Zoepfl, Romain Albert
@author: Seyed Iman Mirzaei, Oscar Gargiulo, Christian Schneider, David Zoepfl
Driver for ENA E5071C and
Driver for ENA E5071C
v2.2.0 - OSC:
- changed all units to SI
v2.1.1 - CHR:
- Changed to binary data format (to be able to use pyvisa)
"""
v2.0.0 - CHR:
- Adapted to Driver/Instrument structure
v2.0.1 - OSC:
- migrated to VISA
"""
import visa
import pyvisa as visa
import time
import numpy as np
from tqdm import tqdm_notebook
import struct
from Instruments.Drivers.Keysight.VNA_Keysight import VNAKeysight
# Versoin
version = '2.2.0'
version = '3.0.0'
print('E5071C {}'.format(version))
class E5071C(object):
class E5071C(VNAKeysight):
global version
def __init__(self, ip, *pars, **kwargs):
rm = visa.ResourceManager('@py')
self._inst = rm.open_resource('TCPIP::{}::INSTR'.format(ip))
self.version = version
self.ip = ip
# Set timeout to a minute
......@@ -44,9 +34,8 @@ class E5071C(object):
else:
self._inst.timeout = 60*1000 # Set to a minute
print(self._inst.timeout)
self.average()
super().__init__()
def com(self, command, arg="?", raw=False):
"""Function to communicate with the device. Gives the current status
if no arg is given
......@@ -89,6 +78,10 @@ class E5071C(object):
self._inst.write("{} {}".format(command, arg))
return 0
def close(self):
'''Close connection to the instrument'''
self._inst.close()
def get_complex_value(self, command):
"""Function to communicate with the device. Gives the current status
if no arg is given
......@@ -102,73 +95,16 @@ class E5071C(object):
self._inst.write(':form:bord norm')
return resp
def identify(self):
"""Returns Identification String of the device"""
return self.com('*OPT')
def close(self):
'''Close connection to the instrument'''
self._inst.close()
def output(self, arg='?'):
"""Turns RF output power on/off
Give no argument to query current status.
Parameters
-----------
arg : int, str
Set state to 'ON', 'OFF', 1, 0
"""
return self.com(":OUTP", arg)
def power(self, power='?', channel=''):
"""Set or read current power"""
return float(self.com(":SOUR{}:POW:LEV:IMM:AMPL".format(channel), power))
def average(self, number_average=1):
"""Set/reset all parameters for doing an average measurement"""
self.number_average = number_average
super().average(number_average)
if number_average == 1:
self.average_state(0)
self.com(':TRIG:SEQ:AVER', 'OFF')
else:
self.average_state(1)
self.average_count(number_average)
self.com(':TRIG:SEQ:AVER', 'ON')
def average_reset(self, channel=''):
"""Reset averages"""
return self.com(":SENS{}:AVER:CLE".format(channel), "")
def average_count(self, count='?', channel=''):
"""Set/query number of averages"""
return int(self.com(":SENS{}:AVER:COUN".format(channel), count))
def average_state(self, state='?', channel=''):
"""Sets/query averaging state"""
return self.com(":SENS{}:AVER:STAT".format(channel), state)
def freq_start(self, freq='?', channel=''):
"""Set/query start frequency"""
return float(self.com(":SENS{}:FREQ:STAR".format(channel), freq))
def freq_stop(self, freq='?', channel=''):
"""Set/query stop frequency"""
return float(self.com(":SENS{}:FREQ:STOP".format(channel), freq))
def freq_center(self, freq='?', channel=''):
"""Set/query center frequency in Hz"""
return float(self.com(":SENS{}:FREQ:CENT".format(channel), freq))
def freq_span(self, freq='?', channel=''):
"""Set/query span in Hz"""
return float(self.com(":SENS{}:FREQ:SPAN".format(channel), freq))
def freq_npoints(self, points='?', channel=''):
"""Set/Query number of points"""
return int(self.com(":SENS{}:SWE:POIN".format(channel), points))
def freq_semgents(self, segments, BW=100, power=-50):
"""Defined the VNA frequency through segments.
......@@ -202,14 +138,6 @@ class E5071C(object):
self.com(':SENS:SWE:TYPE', 'SEGM')
self.com(':SENS:SEGM:ARB', 1) # Allow arb. segments (reversed, etc)
self.com(':SENS:SEGM:DATA', segment_str)
def IFBW(self, BW='?', channel=''):
"""Set/query IF Bandwidth for specified channel"""
return self.com(":SENS{}:BAND:RES".format(channel), BW)
def sweep_type(self, type='?', channel=''):
"""Set/query the type of sweep. Must be either linear, logartihmic, segmented or power."""
return self.com(":SENS{}:SWE:TYPE".format(channel), type)
def Spar(self, Par='?', trace=1):
if type(Par) != str:
......@@ -223,13 +151,13 @@ class E5071C(object):
else:
raise Exception('''S parameter must be either 'S11', 'S12', 'S21' or 'S22'.''')
def traces_number(self, num='1', channel='1'):
"""Set number of traces"""
return int(self.com("CALC{}:PAR:COUN".format(channel), num))
def trace_select(self, num=1, channel='1'):
"""Select trace number num"""
self.com('CALC{}:PAR{}:SEL'.format(channel, num), '')
def set_electrical_delay(self, delay=0., num=1, channel=''):
""" Set the electrical delay of the trace. Notice that a trace should be already selected."""
self.com("CALC{}:SEL:CORR:EDEL:TIME".format(channel), delay)
def Format(self, Format='?', Trace=1):
"""Set Data Format
......@@ -247,7 +175,7 @@ class E5071C(object):
| 'PPH': Positive phase
| '' (def): the format will be queried
"""
self.com("CALC:SEL:CORR:EDEL:TIME", 0)
return self.com('CALC1:SEL:FORM', Format)
# READING data
......@@ -266,71 +194,6 @@ class E5071C(object):
"""Return the last raw S measurement (before correction) in complex form."""
return self.get_complex_value(':CALC:TRACe{}:DATA:SDATa?'.format(trace))
def read_corrected_measurement(self, spar='S21'):
"""Return the last calibrated S measurement (after correction) in complex form."""
data = self.com(':SENSe:DATA:CORRdata', '? {}'.format(spar), raw=True)
return np.asarray(data[0::2], dtype=np.float) +1.j*np.asarray(data[1::2], dtype=np.float)
def read_settings(self):
"""Returns current state of VNA parameters as dict
Frequency in GHz.
"""
freq_start = self.freq_start()
freq_stop = self.freq_stop()
freq_span = freq_stop - freq_start
freq_npoints = self.freq_npoints()
BW = self.IFBW()
Spar = self.Spar()
format_meas = self.Format()
power = self.power()
output = self.output()
avg = self.average_count()
par = {'f_start (Hz)': freq_start,
'f_stop (Hz)': freq_stop,
'f_start (GHz)': freq_start * 1e-9,
'f_stop (GHz)': freq_stop * 1e-9,
'IF - BW (Hz)': BW,
'S_parameter ': Spar,
'Format': format_meas,
'Span (Hz)': freq_span,
'Points': freq_npoints,
'power (dBm)': power,
'output': output,
'averages': avg,
'averaging': self.average_state(),
'correction': self.correction()}
return par
def set_settings(self, **kwargs):
com_dict = {
'f_start (Hz)': self.freq_start,
'f_stop (Hz)': self.freq_stop,
'IF - BW (Hz)': self.IFBW,
'S_parameter ': self.Spar,
'Format': self.Format,
'Points': self.freq_npoints,
'power (dBm)': self.power,
'averaging': self.average_state,
'averages': self.average_count
}
for k in kwargs.keys():
try:
com_dict[k](kwargs[k])
except KeyError:
pass
def correction(self, state='?'):
"""Query or set correction status (to ON/OFF)
Parameters
-----------
state : '?', 'ON', 'OFF', 1, 0
State of correction. Use nothing or '?' for query
"""
return self.com('SENS:CORR:STAT', state)
def current_correction(self):
"""Return the current type correction used."""
......@@ -346,6 +209,13 @@ class E5071C(object):
self.com(':TRIG:AVER', 'OFF')
self.com(':TRIG:SEQ:SOUR', 'INT')
def send_trigger(self):
""" Send the trigger to initiate the measurement. """
self.com(':TRIG:SEQ:SINGLE', '')
def start_and_wait_end_measurement(self):
"""Start the measurement and wait up to the end of the measurment.
......
......@@ -23,6 +23,7 @@ class P5003A(object):
def __init__(self, ip, *pars, **kwargs):
self._inst = pyhislip.HiSLIP()
ip = ip.split('::')[0]
self._inst.connect(ip)
self._inst.set_max_message_size(int(1<<10))
self.version = version
......@@ -69,68 +70,10 @@ class P5003A(object):
'''close connection to the instrument'''
return
def identify(self):
"""Returns Identification String of the device"""
str1 = '*IDN'
return self.com(str1)
def output(self, arg='?'):
"""Turns RF output power on/off
Give no argument to query current status.
Parameters
-----------
arg : int, str
Set state to 'ON', 'OFF', 1, 0
"""
return self.com(":OUTP", arg)
def power(self, power='?', channel=''):
"""Set or read current power"""
return float(self.com(":SOUR{}:POW:LEV:IMM:AMPL".format(channel), power))
def average(self, number_average=1):
"""Set/reset all parameters for doing an average measurement"""
self.number_average = number_average
if number_average == 1:
self.average_state(0)
##self.com(':TRIG:SEQ:AVER', 'OFF')
else:
self.average_state(1)
self.average_count(number_average)
##self.com(':TRIG:SEQ:AVER', 'ON')
def average_reset(self, channel=''):
"""Reset averages"""
return self.com(":SENS{}:AVER:CLE".format(channel), "")
def average_count(self, count='?', channel=''):
"""Set/query number of averages"""
return int(self.com(":SENS{}:AVER:COUN".format(channel), count))
def average_state(self, state='?', channel=''):
"""Sets/query averaging state"""
return self.com(":SENS{}:AVER:STAT".format(channel), state)
def freq_start(self, freq='?', channel=''):
"""Set/query start frequency in Hz"""
return float(self.com(":SENS{}:FREQ:STAR".format(channel), freq))
def freq_stop(self, freq='?', channel=''):
"""Set/query stop frequency in Hz"""
return float(self.com(":SENS{}:FREQ:STOP".format(channel), freq))
def freq_center(self, freq='?', channel=''):
"""Set/query center frequency in Hz"""
return float(self.com(":SENS{}:FREQ:CENT".format(channel), freq))
def freq_span(self, freq='?', channel=''):
"""Set/query span in Hz"""
return float(self.com(":SENS{}:FREQ:SPAN".format(channel), freq))
def freq_npoints(self, points='?', channel=''):
"""Set/Query number of points"""
return int(self.com(":SENS{}:SWE:POIN".format(channel), points))
super().average_count(count, channel)
self.com('SENS:SWE:GRO:COUNT', count)
def freq_semgents(self, segments, BW=100, power=-50):
"""Defined the VNA frequency through segments.
......@@ -146,21 +89,11 @@ class P5003A(object):
'BW':1000, 'power': -20}
"""
raise NotImplemented
def IFBW(self, BW='?', channel=''):
"""Set/query IF Bandwidth for specified channel"""
return float(self.com(":SENS{}:BAND:RES".format(channel), BW))
def sweep_type(self, type='?', channel=''):
"""Set/query the type of sweep. Must be either linear, logartihmic, segmented or power."""
return self.com(":SENS{}:SWE:TYPE".format(channel), type)
def Spar(self, Par='?', Trace=1):
if type(Par) != str:
print('Par must be a string')
raise Exception('PAREXC')
raise Exception('S parameter must be a string.')
elif ((Par == 'S11') or (Par == 'S12') or (Par == 'S21') or
(Par == 'S22')):
......@@ -168,16 +101,15 @@ class P5003A(object):
elif Par == '?':
return self.com(':CALC1:MEAS{}:PAR'.format(Trace)).strip('\"')
else:
print('No valid Par inserted')
raise Exception('PAREXC')
def traces_number(self, num=1, channel=''):
"""Set number of traces"""
return int(self.com("CALC{}:PAR:COUN".format(channel), num))
raise Exception('''S parameter must be either 'S11', 'S12', 'S21' or 'S22'.''')
def trace_select(self, num=1, channel=''):
"""Select trace number num"""
self.com("CALC{}:PAR:MNUM".format(channel), num)
def set_electrical_delay(self, delay=0., num=1, channel=''):
""" Set the electrical delay of the trace."""
self.com("CALC{}:MEAS{}:CORR:EDEL".format(channel, num), delay)
def Format(self, Format='?', Trace=1):
"""Set Data Format
......@@ -195,8 +127,7 @@ class P5003A(object):
| 'PPH': Positive phase
| '' (def): the format will be queried
"""
com_str = 'CALC1:MEAS{}:FORM'.format(Trace)
return self.com(com_str, Format)
return self.com('CALC1:MEAS{}:FORM'.format(Trace), Format)
# READING data
def freq_read(self, channel=''):
......@@ -209,67 +140,28 @@ class P5003A(object):
data = self.com('CALC{}:DATA'.format(channel), arg='?', suffix='SDATA', raw=True)
return data.view(np.complex128)
def read_settings(self):
"""Returns current state of VNA parameters as dict
def current_correction(self):
"""Return the current type correction used."""
raise NotImplemented
return self._inst.query("SENS1:CORR:TYPE:CAT?").split(",")
def set_trigger(self):
"""Set trigger to bus and activated the continuous measurement."""
self.com('TRIG:SOUR', 'IMM')
self.com('TRIG:SCOP', 'ALL')
self.com('SENS:SWE:MODE', 'HOLD')
"""
freq_start = self.freq_start()
freq_stop = self.freq_stop()
freq_span = freq_stop - freq_start
freq_npoints = self.freq_npoints()
BW = self.IFBW()
Spar = self.Spar()
format_meas = self.Format()
power = self.power()
output = self.output()
avg = self.average_count()
par = {'f_start (Hz)': freq_start,
'f_stop (Hz)': freq_stop,
'f_start (GHz)': freq_start * 1e-9,
'f_stop (GHz)': freq_stop * 1e-9,
'IF - BW (Hz)': BW,
'S_parameter ': Spar,
'Format': format_meas,
'Span (Hz)': freq_span,
'Points': freq_npoints,
'power (dBm)': power,
'output': output,
'averages': avg,
'averaging': self.average_state(),
'correction': self.correction()}
return par
def set_settings(self, **kwargs):
com_dict = {
'f_start (Hz)': self.freq_start,
'f_stop (Hz)': self.freq_stop,
'IF - BW (Hz)': self.IFBW,
'S_parameter ': self.Spar,
'Format': self.Format,
'Points': self.freq_npoints,
'power (dBm)': self.power,
'averaging': self.average_state,
'averages': self.average_count
}
for k in kwargs.keys():
try:
com_dict[k](kwargs[k])
except KeyError:
pass
def correction(self, state='?'):
"""Query or set correction status (to ON/OFF)
Parameters
-----------
state : '?', 'ON', 'OFF', 1, 0
State of correction. Use nothing or '?' for query
"""
return self.com('SENS:CORR:STAT', state)
def reset_trigger(self):
"""Set trigger to internal and turn off the averaging trigger."""
self.com('TRIG:SOUR', 'IMM')
self.com('SENS:SWE:MODE', 'CONT')
def send_trigger(self):
""" Send the trigger to initiate the measurement. """
if self.number_average > 1:
self.com('SENS:SWE:MODE', 'GROUPS')
else:
self.com('SENS:SWE:MODE', 'SING')
def meas(self, f_range, npoints=1601, navg=10, power=-50,
Spar='S21', BW=1e3, Format='MLOG', power_port2=False,
......
This diff is collapsed.
......@@ -301,9 +301,12 @@ class VNA(Instrument):
self.devtype = 'VNA'
# Load driver ##########################################################
if id_string == 'VNA3' or id_string[:4] == "VNA4":
if id_string == 'VNA3':
from .Drivers.Keysight.E5080 import E5080
self.driver = E5080
elif id_string[:4] == "VNA4":
from .Drivers.Keysight.P5003A import P5003A
self.driver = P5003A
elif id_string == 'VNA1' or id_string == 'VNA2':
from .Drivers.Keysight.E5071C import E5071C
self.driver = E5071C
......@@ -327,6 +330,8 @@ class VNA(Instrument):
## We need to keep track of them to be able to store them in the metadata of the measurement
self.segments = None
self.S_parameters = None
def start(self):
self.instr.set_trigger()
......@@ -373,7 +378,8 @@ class VNA(Instrument):
S_parameters = ['S21'],
correction = False,
ignore_security = False,
format_vna = 'MLOG'):
format_vna = 'MLOG',
synchrone = True):
"""
Measure the S scattering matrix and return a DataSet with the result (complex number).
......@@ -399,49 +405,78 @@ class VNA(Instrument):
| 'IMAG' imaginary part of the complex data
| 'UPH': Extended phase
| 'PPH': Positive phase
synchrone: bool
If True, this method will perform the measurement and wait for the end of the measurement.
In that case, this method will send back the dataset with the result.
If False, this method only start the measurement and the data will need to be retrieve
using the get_data methods later.
"""
S_parameters = [spar.upper() for spar in S_parameters]
if correction:
self.instr.correction('ON')
else:
self.instr.correction('OFF')
if not ignore_security:
calibration = self.instr.current_correction()
if correction and calibration[0] == "SOLT2":
raise Exception('The correction is activated and will send power to port 2. If '\
'it is volontary, please put ignore_security to True.')
for spar in S_parameters:
if spar in ['S12', 'S22']:
raise Exception('One of the S parameter you want to measure will send power '\
'to port 2. If it is volontary, please put ignore_security to True.')
if before_measurement:
## We define all the important parameter before the first measurement
number_trace = len(S_parameters)
self.instr.traces_number(number_trace)
self.S_parameters = [spar.upper() for spar in S_parameters]
for i, spar in enumerate(S_parameters):
self.instr.trace_select(i+1)
self.instr.Spar(spar, i+1)
self.instr.Format(format_vna)
if correction:
self.instr.correction('ON')
else:
self.instr.correction('OFF')
if not ignore_security:
calibration = self.instr.current_correction()
if correction and calibration[0] == "SOLT2":
raise Exception('The correction is activated and will send power to port 2. If '\
'it is volontary, please put ignore_security to True.')
for spar in S_parameters:
if spar in ['S12', 'S22']:
raise Exception('One of the S parameter you want to measure will send power '\
'to port 2. If it is volontary, please put ignore_security to True.')
number_trace = len(S_parameters)
self.instr.traces_number(number_trace)
for i, spar in enumerate(S_parameters):
self.instr.trace_select(i+1)
self.instr.Spar(spar, i+1)
self.instr.Format(format_vna)
self.set_electrical_delay(0, i+1)
if before_measurement:
print(DataSet({'VNA Frequency': Parameter('VNA Frequency', np.asarray(self.get_frequencies(), dtype='float'), 'Hz')},
{spar : Measurement(spar, np.empty((1), dtype=np.complex128), '', ['VNA Frequency']) for spar in S_parameters},
{'Correction VNA': correction, 'Segments': str(self.segments)}).parameters['VNA Frequency'].unit)
return DataSet({'VNA Frequency': Parameter('VNA Frequency', np.asarray(self.get_frequencies(), dtype='float'), 'Hz')},
{spar : Measurement(spar, np.empty((1), dtype=np.complex128), '', ['VNA Frequency']) for spar in S_parameters},
{spar : Measurement(spar, np.empty((1), dtype=np.complex128), '', ['VNA Frequency']) for spar in self.S_parameters},
{'Correction VNA': correction, 'Segments': str(self.segments)})
else:
self.instr.start_and_wait_end_measurement()
if synchrone:
self.instr.start()