Commit f3433057 authored by Romain Baptiste Dominique Albert's avatar Romain Baptiste Dominique Albert
Browse files

Modification of the server/client for the use of different SignalHound...

Modification of the server/client for the use of different SignalHound instrument. Start of the implementation of the SM200C.
parent a70daeeb
# Version
import numpy as np
import Instruments.Drivers.SignalHound.api_sm as api
version = '1.0.0'
print('SM200C {}'.format(version))
class SM200C(object):
global version
def __init__(self, addr_host = '192.168.2.2', addr_device = '192.168.2.10', port=51665):
self.addr_host = addr_host
self.addr_device = addr_device
self.port = port
self.device = None
self.open_device()
print('End init SM200C')
def close_driver(self):
""" Close the driver. """
print('Close')
if self.device is not None:
api.smCloseDevice(self.device)
self.device = None
def open_device(self):
""" Open the driver. """
self.device = api.smOpenNetworkedDevice('192.168.2.2', '192.168.2.10', 51665)
def raise_exception(self):
raise Exception('Test')
def identify(self):
""" Return the device type, its serial number and the firmware version."""
info_device_serial = api.smGetDeviceInfo(self.device)
return 'Device type: '+info_device_serial['Device type']+\
', serial number: '+str(info_device_serial['Serial'])+\
', Firmware: '+api.smGetFirmwareVersion(self.device)
import ctypes
import os
import numpy as np
# API to access the SM type SignalHound spectrum analyzer
folder = os.path.dirname(os.path.realpath(__file__))
sm = ctypes.cdll.LoadLibrary(folder+'/libsm_api.so.2.1.17')
SM_MAX_DEVICES = 9
c_int_p = ctypes.POINTER(ctypes.c_int)
c_float_p = ctypes.POINTER(ctypes.c_float)
c_double_p = ctypes.POINTER(ctypes.c_double)
def check_status(status):
# Still to be completed
if status == 0:
return
elif status == -200:
raise Exception('The calibration file is not recognized.')
else:
raise Exception('Unknown error %d'%status)
def decode_device_type(type):
if type == 0:
return 'SM200A'
elif type == 1:
return 'SM200B'
elif type == 2:
return 'SM200C'
elif type == 3:
return 'SM435B'
else:
raise ValueError('The type of the device was not recognized')
# --------------- Connection and disconnection of the spectrum analyzer ------------------------ #
# For USB devices only:
# Notice that these function where not extensively tested as we do not have any USB instruments
def smGetDeviceList():
# Return a list of the serials number of connected usb signal hound devices (of SM type)
serials = np.zeros(SM_MAX_DEVICES, dtype = np.int32)
device_count = ctypes.c_int()
sm.smGetDeviceList.argtypes = [np.ctypeslib.ndpointer(ctypes.c_int32, flags='C_CONTIGUOUS'),
c_int_p]
sm.smGetDeviceList.restype = ctypes.c_int
status = sm.smGetDeviceList(serials, ctypes.byref(device_count))
check_status(status)
return [serials[i] for i in range(device_count.value)]
def smGetDeviceList2():
# Return a list of the serials number and device type of connected usb signal hound devices (of SM type)
serials = np.zeros(SM_MAX_DEVICES, dtype = np.int32)
device_types = np.zeros(SM_MAX_DEVICES, dtype = np.int32)
device_count = ctypes.c_int()
sm.smGetDeviceList2.argtypes = [np.ctypeslib.ndpointer(ctypes.c_int32, flags='C_CONTIGUOUS'),
np.ctypeslib.ndpointer(ctypes.c_int32, flags='C_CONTIGUOUS'),
c_int_p]
sm.smGetDeviceList2.restype = ctypes.c_int
status = sm.smGetDeviceList2(serials, device_types, ctypes.byref(device_count))
check_status(status)
return [[serials[i], decode_device_type(device_types[i])] for i in range(device_count.value)]
def smOpenDevice():
# Open the first USB spectrum analyzer detected
device_handle = ctypes.c_int()
sm.smOpenDevice.argtypes = [c_int_p]
sm.smOpenDevice.restype = ctypes.c_int
status = sm.smOpenDevice(ctypes.byref(device_handle))
check_status(status)
return device_handle
def smOpenDeviceBySerial(serial_number):
# Open the USB spectrum analyzer with the precised serials number
device_handle = ctypes.c_int()
sm.smOpenDeviceBySerial.argtypes = [c_int_p, c_int]
sm.smOpenDeviceBySerial.restype = ctypes.c_int
status = sm.smOpenDeviceBySerial(ctypes.byref(device_handle), serial_number)
check_status(status)
return device_handle
def smPresetSerial(serial_number):
# Reset the spectrum anbalyzer identify by the serial number. Note the spectrum analyzer MUST
# be cloed when doing this operation.
sm.smPresetSerial.argtypes = [ctypes.c_int]
sm.smPresetSerial.restype = ctypes.c_int
status = sm.smPresetSerial(serial_number)
check_status(status)
# For ethernet decices only:
def smBroadcastNetworkConfig(host_addr, device_addr, port, non_volatile = False):
# Fix the ip address of the spectrum analyzer connected to the host_addr with the specified port.
# the non_volatile option change permanently the configuration of the device but damaged over time
# the flash memory, DON'T use repeteably.
sm.smBroadcastNetworkConfig.argtypes = [ctypes.c_char_p,
ctypes.c_char_p,
ctypes.c_uint16,
ctypes.c_int]
sm.smBroadcastNetworkConfig.restype = ctypes.c_int
status = sm.smGetDeviceList2(host_addr.encode('ascii'), device_addr.encode('ascii'), port, int(non_volatile))
check_status(status)
def smOpenNetworkedDevice(host_addr, device_addr, port):
# Connect to the device
device_handle = ctypes.c_int()
sm.smOpenNetworkedDevice.argtypes = [c_int_p,
ctypes.c_char_p,
ctypes.c_char_p,
ctypes.c_uint16]
sm.smOpenNetworkedDevice.restype = ctypes.c_int
status = sm.smOpenNetworkedDevice(ctypes.byref(device_handle),
host_addr.encode('ascii'),
device_addr.encode('ascii'),
port)
check_status(status)
return device_handle
def smNetworkedSpeedTest(device_handle, duration):
# Perform a network test for the duration in seconds. It returns the transfer speed in
# bytes per seconds
speed = ctypes.c_double()
sm.smNetworkedSpeedTest.argtypes = [ctypes.c_int, ctypes.c_double, c_double_p]
sm.smNetworkedSpeedTest.restype = ctypes.c_int
status = sm.smNetworkedSpeedTest(device_handle, duration, ctypes.byref(speed))
check_status(status)
return speed.value
def smGetSFPDiagnostics(device_handle):
# Show diagnostic for the SFP
sm.smGetSFPDiagnostics.argtypes = [ctypes.c_int,
c_float_p,
c_float_p,
c_float_p]
sm.smGetSFPDiagnostics.restype = ctypes.c_int
temperature = ctypes.c_float()
voltage = ctypes.c_float()
transmission_power = ctypes.c_float()
reception_power = ctypes.c_float()
status = sm.smGetSFPDiagnostics(device_handle,
ctypes.byref(temperature),
ctypes.byref(voltage),
ctypes.byref(transmission_power),
ctypes.byref(reception_power))
check_status(status)
return {'Temperature': temperature.value,
'Voltage': voltage.value,
'Transmission power': transmission_power.value,
'Reception power': reception_power.value}
# For all devices:
def smCloseDevice(device_handle):
# Closed the device
sm.smCloseDevice.argtypes = [ctypes.c_int]
sm.smCloseDevice.restype = ctypes.c_int
status = sm.smCloseDevice(device_handle)
check_status(status)
def smPreset(device_handle):
# Reset the spectrum anbalyzer and CLOSED the device. You will need to open the device again
# to reuse it!
sm.smPreset.argtypes = [ctypes.c_int]
sm.smPreset.restype = ctypes.c_int
status = sm.smPreset(device_handle)
check_status(status)
# ----------------------------- Get information from the device state ----------------------------- #
def smGetDeviceInfo(device_handle):
# Return the serials number and the type of the device connected.
sm.smGetDeviceInfo.argtypes = [ctypes.c_int, c_int_p, c_int_p]
sm.smGetDeviceInfo.restype = ctypes.c_int
device_type = ctypes.c_int()
serial = ctypes.c_int()
status = sm.smGetDeviceInfo(device_handle, ctypes.byref(device_type), ctypes.byref(serial))
check_status(status)
return {'Serial': serial.value, 'Device type': decode_device_type(device_type.value)}
def smGetFirmwareVersion(device_handle):
# Return the firmware version
sm.smGetFirmwareVersion.argtypes = [ctypes.c_int, c_int_p, c_int_p, c_int_p]
sm.smGetFirmwareVersion.restype = ctypes.c_int
major = ctypes.c_int()
minor = ctypes.c_int()
revision = ctypes.c_int()
status = sm.smGetFirmwareVersion(device_handle, ctypes.byref(major), ctypes.byref(minor), ctypes.byref(revision))
check_status(status)
return "%d.%d.%d"%(major.value, minor.value, revision.value)
def smGetDeviceDiagnostics(device_handle):
# Return the input voltage, input current and internal temperature of the device
sm.smGetDeviceDiagnostics.argtypes = [ctypes.c_int, c_float_p, c_float_p, c_float_p]
sm.smGetDeviceDiagnostics.restype = ctypes.c_int
voltage = ctypes.c_float()
current = ctypes.c_float()
temperature = ctypes.c_float()
status = sm.smGetDeviceDiagnostics(device_handle, ctypes.byref(voltage), ctypes.byref(current), ctypes.byref(temperature))
check_status(status)
return {'Voltage power supply': voltage.value,
'Current power supply': current.value,
'Device temperature': temperature.value}
class SmDeviceDiagnostics(ctypes.Structure):
_fields_ = [("voltage", ctypes.c_float),
("currentInput", ctypes.c_float),
("currentOCXO", ctypes.c_float),
("current58", ctypes.c_float),
("tempFPGAInternal", ctypes.c_float),
("tempFPGANear", ctypes.c_float),
("tempOCXO", ctypes.c_float),
("tempVCO", ctypes.c_float),
("tempRFBoardLO", ctypes.c_float),
("tempPowerSupply", ctypes.c_float)]
SmDeviceDiagnostics_p = ctypes.POINTER(SmDeviceDiagnostics)
def smGetFullDeviceDiagnostics(device_handle):
# Return the full measure parameter of the device.
sm.smGetFullDeviceDiagnostics.argtypes = [ctypes.c_int, SmDeviceDiagnostics_p]
sm.smGetFullDeviceDiagnostics.restype = ctypes.c_int
diagnostic = SmDeviceDiagnostics()
status = sm.smGetFullDeviceDiagnostics(device_handle, ctypes.byref(diagnostic))
check_status(status)
return {'Voltage power supply': diagnostic.voltage,
'Current power supply': diagnostic.currentInput,
'Current to internal oscillator': diagnostic.currentOCXO,
'Current to 58?': diagnostic.current58,
'Internal FPGA temperature': diagnostic.tempFPGAInternal,
'Device temperature': diagnostic.tempFPGANear,
'Internal oscillator temperature': diagnostic.tempOCXO,
'Voltage control oscillator temperature': diagnostic.tempVCO,
'Local oscillator board temperature': diagnostic.tempRFBoardLO,
'Power supply temperature': diagnostic.tempPowerSupply}
# ---------------------- General configuration of the instrument ----------------------------- #
def smSetPowerState(device_handle, power_state):
# Set the power state of the instrument. power_state must be either 'on' or 'standby' (only
# possible in sweep mode)
if power_state not in ['on', 'standby']:
raise ValueError('The power state must be either "On" or "Standby".')
sm.smSetPowerState.argtypes = [ctypes.c_int, ctypes.c_int]
sm.smSetPowerState.restype = ctypes.c_int
state = ctypes.c_int(0) if power_state == 'on' else ctypes.c_int(1)
status = sm.smSetPowerState(device_handle, state)
check_status(status)
def smGetPowerState(device_handle):
# Get the power state of the instrument
sm.smGetPowerState.argtypes = [ctypes.c_int, c_int_p]
sm.smGetPowerState.restype = ctypes.c_int
state = ctypes.c_int()
status = sm.smGetPowerState(device_handle, ctypes.byref(state))
check_status(status)
return 'on' if state.value == 0 else 'standby'
def smSetAttenuator(device_handle, attenuation):
# Set the attenuation at the input of the device. Can be either "auto" or can be any value
# between 0 and 30dB.
if attenuation not in [*5*np.arange(7, dtype=np.int32), 'auto']:
raise ValueError('Attenuation must be between 0 and 30dB with 5dB steps or fix to "auto".')
sm.smSetAttenuator.argtypes = [ctypes.c_int, ctypes.c_int]
sm.smSetAttenuator.restype = ctypes.c_int
if attenuation == 'auto':
atten = ctypes.c_int(-1)
else:
atten = ctypes.c_int(int(attenuation/5))
status = sm.smSetAttenuator(device_handle, atten)
check_status(status)
def smGetAttenuator(device_handle):
# Get the current state of the attenuator
sm.smGetAttenuator.argtypes = [ctypes.c_int, c_int_p]
sm.smGetAttenuator.restype = ctypes.c_int
atten = ctypes.c_int()
status = sm.smGetAttenuator(device_handle, ctypes.byref(atten))
check_status(status)
return 'auto' if atten.value == -1 else 5*atten.value
def smSetRefLevel(device_handle, reference_level):
# Set the refrence level of the instrument. Notice that it should be below +20dBm as any power
# over +20dBm will damage the instrument.
if reference_level > 20.:
raise ValueError('Reference level has to be smaller than +20dBm. ')
sm.smSetRefLevel.argtypes = [ctypes.c_int, ctypes.c_double]
sm.smSetRefLevel.restype = ctypes.c_int
ref = ctypes.c_double(reference_level)
status = sm.smSetRefLevel(device_handle, ref)
check_status(status)
def smGetRefLevel(device_handle):
# Get the power state of the instrument
sm.smGetRefLevel.argtypes = [ctypes.c_int, c_double_p]
sm.smGetRefLevel.restype = ctypes.c_int
ref = ctypes.c_double()
status = sm.smGetRefLevel(device_handle, ctypes.byref(ref))
check_status(status)
return ref.value
def smSetPreselector(device_handle, preselector_state):
# Activate or disactivate the preselector (filter at the input of the spectrum analyzer to
# remove noise and signal out of the analysis bandwidth) by sending "on" or "off"
# Notice that preselector is always on for frequency higher than 645MHz!
if preselector_state not in ['on', 'off']:
raise ValueError('Preselector has to be either "on" or "off".')
sm.smSetPreselector.argtypes = [ctypes.c_int, ctypes.c_int]
sm.smSetPreselector.restype = ctypes.c_int
if preselector_state == 'on':
state = ctypes.c_int(1)
else:
state = ctypes.c_int(0)
status = sm.smSetPreselector(device_handle, state)
check_status(status)
def smGetPreselector(device_handle):
# Get the state of the preselector.
sm.smGetPreselector.argtypes = [ctypes.c_int, c_int_p]
sm.smGetPreselector.restype = ctypes.c_int
state = ctypes.c_int()
status = sm.smGetPreselector(device_handle, ctypes.byref(state))
check_status(status)
return 'on' if state.value == 1 else 'off'
def smSetExternalReference(device_handle, external_reference):
# Activate or disactivate the external reference OUTPUT
if external_reference not in ['on', 'off']:
raise ValueError('The external reference has to be either "on" or "off".')
sm.smSetExternalReference.argtypes = [ctypes.c_int, ctypes.c_int]
sm.smSetExternalReference.restype = ctypes.c_int
if external_reference == 'on':
state = ctypes.c_int(1)
else:
state = ctypes.c_int(0)
status = sm.smSetExternalReference(device_handle, state)
check_status(status)
def smGetExternalReference(device_handle):
# Get the state of the external reference OUTPUT
sm.smGetExternalReference.argtypes = [ctypes.c_int, c_int_p]
sm.smGetExternalReference.restype = ctypes.c_int
state = ctypes.c_int()
status = sm.smGetExternalReference(device_handle, ctypes.byref(state))
check_status(status)
return 'on' if state.value == 1 else 'off'
def smSetReference(device_handle, reference):
# Set the reference to the "internal" reference or to the "external" reference
if reference not in ['internal', 'external']:
raise ValueError('The reference has to be either "internal" or "external".')
sm.smSetReference.argtypes = [ctypes.c_int, ctypes.c_int]
sm.smSetReference.restype = ctypes.c_int
if reference == 'external':
ref = ctypes.c_int(1)
else:
ref = ctypes.c_int(0)
status = sm.smSetReference(device_handle, ref)
check_status(status)
def smGetReference(device_handle):
# Get the state of the external reference OUTPUT
sm.smGetReference.argtypes = [ctypes.c_int, c_int_p]
sm.smGetReference.restype = ctypes.c_int
ref = ctypes.c_int()
status = sm.smGetReference(device_handle, ctypes.byref(ref))
check_status(status)
return 'external' if ref.value == 1 else 'internal'
# ------------------------------ Specific about the use of the GPIO ----------------------------- #
def smSetGPIOState():
raise NotImplemented()
# SM_API SmStatus smSetGPIOState(int device, SmGPIOState lowerState, SmGPIOState upperState);
def smGetGPIOState():
raise NotImplemented()
#SM_API SmStatus smGetGPIOState(int device, SmGPIOState *lowerState, SmGPIOState *upperState);
def smWriteGPIOImm():
raise NotImplemented()
#SM_API SmStatus smWriteGPIOImm(int device, uint8_t data);
def smReadGPIOImm():
raise NotImplemented()
#SM_API SmStatus smReadGPIOImm(int device, uint8_t *data);
def smWriteSPI():
raise NotImplemented()
#SM_API SmStatus smWriteSPI(int device, uint32_t data, int byteCount);
def smSetGPIOSweepDisabled():
raise NotImplemented()
#SM_API SmStatus smSetGPIOSweepDisabled(int device);
def smSetGPIOSweep():
raise NotImplemented()
#SM_API SmStatus smSetGPIOSweep(int device, SmGPIOStep *steps, int stepCount);
def smSetGPIOSwitchingDisabled():
raise NotImplemented()
#SM_API SmStatus smSetGPIOSwitchingDisabled(int device);
def smSetGPIOSwitching():
raise NotImplemented()
#SM_API SmStatus smSetGPIOSwitching(int device, uint8_t *gpio, uint32_t *counts, int gpioSteps);
# ---------------------- Use of the GPS receiver for disiplined OCXO -------------------------- #
def smSetGPSTimebaseUpdate():
raise NotImplemented()
#SM_API SmStatus smSetGPSTimebaseUpdate(int device, SmBool enabled);
def smGetGPSTimebaseUpdate():
raise NotImplemented()
#SM_API SmStatus smGetGPSTimebaseUpdate(int device, SmBool *enabled);
def smGetGPSHoldoverInfo():
raise NotImplemented()
#SM_API SmStatus smGetGPSHoldoverInfo(int device, SmBool *usingGPSHoldover, uint64_t *lastHoldoverTime);
def smGetGPSState():
raise NotImplemented()
#SM_API SmStatus smGetGPSState(int device, SmGPSState *GPSState);
# ---------------------- General configuration for the acquistion -------------------------- #
def smConfigure(device_handle, mode):
# Set the mode of operation of the device, it must be either "idle", "sweep", "real_time",
# "IQ_streaming", "IQ_segmented", "IQ_sweep", "audio".
# Of these mode, only the "idle", "sweep" and "real_time" are currently fully implemented
if mode not in ['idle', 'sweep', 'real_time', 'IQ_streaming', 'IQ_segmented', 'IQ_sweep', 'audio']:
raise ValueError('The mode must be either idle", "sweep", "real_time", '\
'"IQ_streaming", "IQ_segmented", "IQ_sweep", "audio".')
sm.smConfigure.argtypes = [ctypes.c_int, ctypes.c_int]
sm.smConfigure.restype = ctypes.c_int
if mode == 'idle':
mode = ctypes.c_int(0)
elif mode == 'sweep':
mode = ctypes.c_int(1)
elif mode == 'real_time':
mode = ctypes.c_int(2)
elif mode == 'IQ_streaming':
mode = ctypes.c_int(3)
raise NotImplemented() # Remove after implementing the function for this mode!
elif mode == 'IQ_segmented':
mode = ctypes.c_int(5)
raise NotImplemented() # Remove after implementing the function for this mode!
elif mode == 'IQ_sweep':
mode = ctypes.c_int(6)
raise NotImplemented() # Remove after implementing the function for this mode!
elif mode == 'audio':
mode = ctypes.c_int(4)
raise NotImplemented() # Remove after implementing the function for this mode!
status = sm.smConfigure(device_handle, mode)
check_status(status)
def smGetCurrentMode(device_handle):
# Get the current mode of operation
sm.smGetCurrentMode.argtypes = [ctypes.c_int, c_int_p]
sm.smGetCurrentMode.restype = ctypes.c_int
mode = ctypes.c_int()
status = sm.smGetCurrentMode(device_handle, ctypes.byref(mode))
check_status(status)
if mode.value == 0:
return 'idle'
elif mode.value == 1:
return 'sweep'
elif mode.value == 2:
return 'real_time'
elif mode.value == 3:
return 'IQ_streaming'
elif mode.value == 5:
return 'IQ_segmented'
elif mode.value == 6:
return 'IQ_sweep'
elif mode.value == 4:
return 'audio'
def smAbort(device_handle):
# Stop the current measurement
sm.smAbort.argtypes = [ctypes.c_int]
sm.smAbort.restype = ctypes.c_int
status = sm.smAbort(device_handle)
check_status(status)
# ---------------------- Sweep configuration function -------------------------- #
def smSetSweepSpeed(device_handle, speed):
# Set the sweep speed. Must be either auto, normal or fast. Auto maximize the speed, normal
# maximize acuracy and fast allow to do even faster acquisition for some specific parameter.
if speed not in ['auto', 'normal', 'fast']:
raise ValueError('The speed must be either "auto", "normal" or "fast".')
sm.smSetSweepSpeed.argtypes = [ctypes.c_int, ctypes.c_int]
sm.smSetSweepSpeed.restype = ctypes.c_int
if speed == 'auto':
speed = ctypes.c_int(0)
elif speed == 'normal':
speed = ctypes.c_int(1)
elif speed == 'fast':
speed = ctypes.c_int(2)
status = sm.smSetSweepSpeed(device_handle, speed)
check_status(status)
def smSetSweepCenterSpan(device_handle, center_frequency, span):
# Set the frequency of the acquisitionby fixing the center and the span of the measurement.
if center_frequency > 20e9 or center_frequency < 100e3:
raise ValueError('The central frequency must bewteen 100kHz and 20GHz for our spectrum '\
'analyzer.')
if span > 10e9:
raise ValueError('The span cannot be bigger than 10GHz.')
sm.smSetSweepCenterSpan.argtypes = [ctypes.c_int, ctypes.c_double, ctypes.c_double]
sm.smSetSweepCenterSpan.restype = ctypes.c_int
status = sm.smSetSweepCenterSpan(device_handle, center_frequency, span)
check_status(status)
def smSetSweepStartStop(device_handle, start_frequency, stop_frequency):
# Set the frequency of the acquisitionby fixing the start and the stop frequency of the
# measurement
if start_frequency > 20e9 or start_frequency < 100e3:
raise ValueError('The start frequency must bewteen 100kHz and 20GHz for our spectrum '\
'analyzer.')
if stop_frequency > 20e9 or stop_frequency < 100e3:
raise ValueError('The stop frequency must bewteen 100kHz and 20GHz for our spectrum '\
'analyzer.')
sm.smSetSweepStartStop.argtypes = [ctypes.c_int, ctypes.c_double, ctypes.c_double]
sm.smSetSweepStartStop.restype = ctypes.c_int
status = sm.smSetSweepStartStop(device_handle, start_frequency, stop_frequency)
check_status(status)
def smSetSweepCoupling(device_handle, resolution_bandwidth, video_bandwidth = None, sweep_time = None):
# Set the resolution bandwidth, video bandwidth and sweep time. If the video bandwidth is None,
# then the video bandwidth is identical to the resolution bandwidth. If sweep time is None, then
# the minimal time for the acquisition is taken.
# It should be note that the videos bandwidth is a concept coming from old spectrum analyzer
# and is modifying here the number of FFT averaged.
if resolution_bandwidth > 10e6 or resolution_bandwidth < 0.1:
raise ValueError('The resolution must be between 0.1Hz and 10MHz. Be careful limitations '\
'exist depending on the span and measurement mode.')
if video_bandwidth is None:
video_bandwidth = resolution_bandwidth
else:
if resolution_bandwidth < video_bandwidth:
raise ValueError('The video bandwidth has to be smaller than the resolution bandwidth.')
if sweep_time is None: