Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
Institute of Experimental Physics
Quantum Circuits
python-repo
Commits
55a7a5d9
Commit
55a7a5d9
authored
Feb 05, 2021
by
Romain Baptiste Dominique Albert
Browse files
Merge branch 'phase_shifter' into 'master'
Phase shifter See merge request
exphys/quantum-circuits/python-repo!12
parents
613189ad
71bf76f0
Changes
7
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
924 additions
and
0 deletions
+924
-0
Instruments/Drivers/Vaunix/VNX_dps64.dll
Instruments/Drivers/Vaunix/VNX_dps64.dll
+0
-0
Instruments/Drivers/Vaunix/api_phase_shifter.py
Instruments/Drivers/Vaunix/api_phase_shifter.py
+312
-0
Instruments/Drivers/Vaunix/phase_shifter.py
Instruments/Drivers/Vaunix/phase_shifter.py
+99
-0
Instruments/Drivers/Vaunix/server.py
Instruments/Drivers/Vaunix/server.py
+233
-0
Instruments/Drivers/Vaunix/vnx_lps_api.h
Instruments/Drivers/Vaunix/vnx_lps_api.h
+126
-0
Instruments/PS.py
Instruments/PS.py
+153
-0
Instruments/__init__.py
Instruments/__init__.py
+1
-0
No files found.
Instruments/Drivers/Vaunix/VNX_dps64.dll
0 → 100644
View file @
55a7a5d9
File added
Instruments/Drivers/Vaunix/api_phase_shifter.py
0 → 100644
View file @
55a7a5d9
import
ctypes
import
os
import
numpy
as
np
# This is the binding for the vaunix api which is compatible with LPS-402,
# LPS-802, and LPS-123
folder
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
vaunix
=
ctypes
.
cdll
.
LoadLibrary
(
folder
+
'\VNX_dps64.dll'
)
# ----------- Global Equates ------------
MAXDEVICES
=
64
MAX_MODELNAME
=
32
# Bit masks and equates for the Ramp command byte (stored in Sweep_mode, and reported also in Status)
LPS_SWP_BIDIR
=
ctypes
.
c_int
(
0x10
).
value
# MASK: bit = 0 for ramp style sweep, 1 for triangle style sweep
LPS_SWP_DIRECTION
=
ctypes
.
c_int
(
0x04
).
value
# MASK: bit = 0 for sweep up, 1 for sweep down
LPS_SWP_CONTINUOUS
=
ctypes
.
c_int
(
0x02
).
value
# MASK: bit = 1 for continuous sweeping
LPS_SWP_ONCE
=
ctypes
.
c_int
(
0x01
).
value
# MASK: bit = 1 for single sweep
# ----------- Command Equates -----------
# Status returns for commands
STATUS_OK
=
ctypes
.
c_int
(
0
).
value
BAD_PARAMETER
=
ctypes
.
c_int
(
0x80010000
).
value
# out of range input -- frequency outside min/max etc.
BAD_HID_IO
=
ctypes
.
c_int
(
0x80020000
).
value
DEVICE_NOT_READY
=
ctypes
.
c_int
(
0x80030000
).
value
# device isn't open, no handle, etc.
# Status returns for DevStatus
INVALID_DEVID
=
ctypes
.
c_int
(
0x80000000
).
value
# MSB is set if the device ID is invalid
DEV_CONNECTED
=
ctypes
.
c_int
(
0x00000001
).
value
# LSB is set if a device is connected
DEV_OPENED
=
ctypes
.
c_int
(
0x00000002
).
value
# set if the device is opened
SWP_ACTIVE
=
ctypes
.
c_int
(
0x00000004
).
value
# set if the device is sweeping
SWP_UP
=
ctypes
.
c_int
(
0x00000008
).
value
# set if the device is ramping up
SWP_REPEAT
=
ctypes
.
c_int
(
0x00000010
).
value
# set if the device is in continuous ramp mode
SWP_BIDIRECTIONAL
=
ctypes
.
c_int
(
0x00000020
).
value
# set if the device is in bi-directional ramp mode
PROFILE_ACTIVE
=
ctypes
.
c_int
(
0x00000040
).
value
# set if a profile is playing
def
fnLPS_SetTestMode
(
test_mode
):
vaunix
.
fnLPS_SetTestMode
.
argtypes
=
[
ctypes
.
c_bool
]
vaunix
.
fnLPS_SetTestMode
.
restype
=
None
vaunix
.
fnLPS_SetTestMode
(
test_mode
)
def
fnLPS_GetNumDevices
():
vaunix
.
fnLPS_GetNumDevices
.
argtypes
=
[]
vaunix
.
fnLPS_GetNumDevices
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_GetNumDevices
()
def
fnLPS_GetDevInfo
():
vaunix
.
fnLPS_GetDevInfo
.
argtypes
=
\
[
np
.
ctypeslib
.
ndpointer
(
ctypes
.
c_uint
,
flags
=
'C_CONTIGUOUS'
)]
vaunix
.
fnLPS_GetDevInfo
.
restype
=
ctypes
.
c_int
data
=
np
.
zeros
(
MAXDEVICES
,
dtype
=
np
.
uint
)
returned_value
=
vaunix
.
fnLPS_GetDevInfo
(
data
)
return
returned_value
,
data
def
fnLPS_GetModelNameA
(
device_id
):
buffer
=
ctypes
.
create_string_buffer
(
MAX_MODELNAME
)
vaunix
.
fnLPS_GetModelNameA
.
argtypes
=
[
ctypes
.
c_uint
,
]
vaunix
.
fnLPS_GetModelNameA
.
restype
=
ctypes
.
c_int
vaunix
.
fnLPS_GetModelNameA
(
device_id
,
buffer
)
return
buffer
.
value
.
decode
(
'ascii'
)
def
fnLPS_InitDevice
(
device_id
):
vaunix
.
fnLPS_InitDevice
.
argtypes
=
[
ctypes
.
c_uint
]
vaunix
.
fnLPS_InitDevice
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_InitDevice
(
device_id
)
def
fnLPS_CloseDevice
(
device_id
):
vaunix
.
fnLPS_CloseDevice
.
argtypes
=
[
ctypes
.
c_uint
]
vaunix
.
fnLPS_CloseDevice
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_CloseDevice
(
device_id
)
def
fnLPS_GetSerialNumber
(
device_id
):
vaunix
.
fnLPS_GetSerialNumber
.
argtypes
=
[
ctypes
.
c_uint
]
vaunix
.
fnLPS_GetSerialNumber
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_GetSerialNumber
(
device_id
)
def
fnLPS_GetDeviceStatus
(
device_id
):
vaunix
.
fnLPS_GetDeviceStatus
.
argtypes
=
[
ctypes
.
c_uint
]
vaunix
.
fnLPS_GetDeviceStatus
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_GetDeviceStatus
(
device_id
)
def
fnLPS_GetDLLVersion
():
vaunix
.
fnLPS_GetDLLVersion
.
argtypes
=
[]
vaunix
.
fnLPS_GetDLLVersion
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_GetDLLVersion
()
def
fnLPS_SetPhaseAngle
(
device_id
,
phase
):
vaunix
.
fnLPS_SetPhaseAngle
.
argtypes
=
[
ctypes
.
c_uint
,
ctypes
.
c_int
]
vaunix
.
fnLPS_SetPhaseAngle
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_SetPhaseAngle
(
device_id
,
phase
)
def
fnLPS_SetWorkingFrequency
(
device_id
,
frequency
):
vaunix
.
fnLPS_SetWorkingFrequency
.
argtypes
=
[
ctypes
.
c_uint
,
ctypes
.
c_int
]
vaunix
.
fnLPS_SetWorkingFrequency
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_SetWorkingFrequency
(
device_id
,
frequency
)
def
fnLPS_SetRampStart
(
device_id
,
ramp_start
):
vaunix
.
fnLPS_SetRampStart
.
argtypes
=
[
ctypes
.
c_uint
,
ctypes
.
c_int
]
vaunix
.
fnLPS_SetRampStart
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_SetRampStart
(
device_id
,
ramp_start
)
def
fnLPS_SetRampEnd
(
device_id
,
ramp_stop
):
vaunix
.
fnLPS_SetRampEnd
.
argtypes
=
[
ctypes
.
c_uint
,
ctypes
.
c_int
]
vaunix
.
fnLPS_SetRampEnd
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_SetRampEnd
(
device_id
,
ramp_stop
)
def
fnLPS_SetPhaseAngleStep
(
device_id
,
phase_setp
):
vaunix
.
fnLPS_SetPhaseAngleStep
.
argtypes
=
[
ctypes
.
c_uint
,
ctypes
.
c_int
]
vaunix
.
fnLPS_SetPhaseAngleStep
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_SetPhaseAngleStep
(
device_id
,
phase_setp
)
def
fnLPS_SetPhaseAngleStepTwo
(
device_id
,
phase_setp
):
vaunix
.
fnLPS_SetPhaseAngleStepTwo
.
argtypes
=
[
ctypes
.
c_uint
,
ctypes
.
c_int
]
vaunix
.
fnLPS_SetPhaseAngleStepTwo
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_SetPhaseAngleStepTwo
(
device_id
,
phase_setp
)
def
fnLPS_SetDwellTime
(
device_id
,
dwell_time
):
vaunix
.
fnLPS_SetDwellTime
.
argtypes
=
[
ctypes
.
c_uint
,
ctypes
.
c_int
]
vaunix
.
fnLPS_SetDwellTime
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_SetDwellTime
(
device_id
,
dwell_time
)
def
fnLPS_SetDwellTimeTwo
(
device_id
,
dwell_time
):
vaunix
.
fnLPS_SetDwellTimeTwo
.
argtypes
=
[
ctypes
.
c_uint
,
ctypes
.
c_int
]
vaunix
.
fnLPS_SetDwellTimeTwo
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_SetDwellTimeTwo
(
device_id
,
dwell_time
)
def
fnLPS_SetIdleTime
(
device_id
,
idle_time
):
vaunix
.
fnLPS_SetIdleTime
.
argtypes
=
[
ctypes
.
c_uint
,
ctypes
.
c_int
]
vaunix
.
fnLPS_SetIdleTime
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_SetIdleTime
(
device_id
,
idle_time
)
def
fnLPS_SetHoldTime
(
device_id
,
hold_time
):
vaunix
.
fnLPS_SetHoldTime
.
argtypes
=
[
ctypes
.
c_uint
,
ctypes
.
c_int
]
vaunix
.
fnLPS_SetHoldTime
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_SetHoldTime
(
device_id
,
hold_time
)
def
fnLPS_SetProfileElement
(
device_id
,
index
,
phase_angle
):
vaunix
.
fnLPS_SetProfileElement
.
argtypes
=
[
ctypes
.
c_uint
,
ctypes
.
c_int
,
ctypes
.
c_int
]
vaunix
.
fnLPS_SetProfileElement
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_SetProfileElement
(
device_id
,
index
,
phase_angle
)
def
fnLPS_SetProfileCount
(
device_id
,
profile_count
):
vaunix
.
fnLPS_SetProfileCount
.
argtypes
=
[
ctypes
.
c_uint
,
ctypes
.
c_int
]
vaunix
.
fnLPS_SetProfileCount
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_SetProfileCount
(
device_id
,
profile_count
)
def
fnLPS_SetProfileIdleTime
(
device_id
,
idle_time
):
vaunix
.
fnLPS_SetProfileIdleTime
.
argtypes
=
[
ctypes
.
c_uint
,
ctypes
.
c_int
]
vaunix
.
fnLPS_SetProfileIdleTime
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_SetProfileIdleTime
(
device_id
,
idle_time
)
def
fnLPS_SetProfileDwellTime
(
device_id
,
dwell_time
):
vaunix
.
fnLPS_SetProfileDwellTime
.
argtypes
=
[
ctypes
.
c_uint
,
ctypes
.
c_int
]
vaunix
.
fnLPS_SetProfileDwellTime
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_SetProfileDwellTime
(
device_id
,
dwell_time
)
def
fnLPS_StartProfile
(
device_id
,
mode
):
vaunix
.
fnLPS_StartProfile
.
argtypes
=
[
ctypes
.
c_uint
,
ctypes
.
c_int
]
vaunix
.
fnLPS_StartProfile
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_StartProfile
(
device_id
,
mode
)
def
fnLPS_SetRampDirection
(
device_id
,
up
):
vaunix
.
fnLPS_SetRampDirection
.
argtypes
=
[
ctypes
.
c_uint
,
ctypes
.
c_bool
]
vaunix
.
fnLPS_SetRampDirection
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_SetRampDirection
(
device_id
,
up
)
def
fnLPS_SetRampMode
(
device_id
,
mode
):
vaunix
.
fnLPS_SetRampMode
.
argtypes
=
[
ctypes
.
c_uint
,
ctypes
.
c_bool
]
vaunix
.
fnLPS_SetRampMode
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_SetRampMode
(
device_id
,
mode
)
def
fnLPS_SetRampBidirectional
(
device_id
,
bidir_enable
):
vaunix
.
fnLPS_SetRampBidirectional
.
argtypes
=
[
ctypes
.
c_uint
,
ctypes
.
c_bool
]
vaunix
.
fnLPS_SetRampBidirectional
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_SetRampBidirectional
(
device_id
,
bidir_enable
)
def
fnLPS_StartRamp
(
device_id
,
go
):
vaunix
.
fnLPS_StartRamp
.
argtypes
=
[
ctypes
.
c_uint
,
ctypes
.
c_bool
]
vaunix
.
fnLPS_StartRamp
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_StartRamp
(
device_id
,
go
)
def
fnLPS_SaveSettings
(
device_id
):
vaunix
.
fnLPS_SaveSettings
.
argtypes
=
[
ctypes
.
c_uint
]
vaunix
.
fnLPS_SaveSettings
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_SaveSettings
(
device_id
)
def
fnLPS_GetPhaseAngle
(
device_id
):
vaunix
.
fnLPS_GetPhaseAngle
.
argtypes
=
[
ctypes
.
c_uint
]
vaunix
.
fnLPS_GetPhaseAngle
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_GetPhaseAngle
(
device_id
)
def
fnLPS_GetWorkingFrequency
(
device_id
):
vaunix
.
fnLPS_GetWorkingFrequency
.
argtypes
=
[
ctypes
.
c_uint
]
vaunix
.
fnLPS_GetWorkingFrequency
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_GetWorkingFrequency
(
device_id
)
def
fnLPS_GetRampStart
(
device_id
):
vaunix
.
fnLPS_GetRampStart
.
argtypes
=
[
ctypes
.
c_uint
]
vaunix
.
fnLPS_GetRampStart
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_GetRampStart
(
device_id
)
def
fnLPS_GetRampEnd
(
device_id
):
vaunix
.
fnLPS_GetRampEnd
.
argtypes
=
[
ctypes
.
c_uint
]
vaunix
.
fnLPS_GetRampEnd
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_GetRampEnd
(
device_id
)
def
fnLPS_GetDwellTime
(
device_id
):
vaunix
.
fnLPS_GetDwellTime
.
argtypes
=
[
ctypes
.
c_uint
]
vaunix
.
fnLPS_GetDwellTime
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_GetDwellTime
(
device_id
)
def
fnLPS_GetDwellTimeTwo
(
device_id
):
vaunix
.
fnLPS_GetDwellTimeTwo
.
argtypes
=
[
ctypes
.
c_uint
]
vaunix
.
fnLPS_GetDwellTimeTwo
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_GetDwellTimeTwo
(
device_id
)
def
fnLPS_GetIdleTime
(
device_id
):
vaunix
.
fnLPS_GetIdleTime
.
argtypes
=
[
ctypes
.
c_uint
]
vaunix
.
fnLPS_GetIdleTime
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_GetIdleTime
(
device_id
)
def
fnLPS_GetHoldTime
(
device_id
):
vaunix
.
fnLPS_GetHoldTime
.
argtypes
=
[
ctypes
.
c_uint
]
vaunix
.
fnLPS_GetHoldTime
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_GetHoldTime
(
device_id
)
def
fnLPS_GetPhaseAngleStep
(
device_id
):
vaunix
.
fnLPS_GetPhaseAngleStep
.
argtypes
=
[
ctypes
.
c_uint
]
vaunix
.
fnLPS_GetPhaseAngleStep
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_GetPhaseAngleStep
(
device_id
)
def
fnLPS_GetPhaseAngleStepTwo
(
device_id
):
vaunix
.
fnLPS_GetPhaseAngleStepTwo
.
argtypes
=
[
ctypes
.
c_uint
]
vaunix
.
fnLPS_GetPhaseAngleStepTwo
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_GetPhaseAngleStepTwo
(
device_id
)
def
fnLPS_GetProfileElement
(
device_id
,
index
):
vaunix
.
fnLPS_GetProfileElement
.
argtypes
=
[
ctypes
.
c_uint
,
ctypes
.
c_int
]
vaunix
.
fnLPS_GetProfileElement
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_GetProfileElement
(
device_id
)
def
fnLPS_GetProfileCount
(
device_id
):
vaunix
.
fnLPS_GetProfileCount
.
argtypes
=
[
ctypes
.
c_uint
]
vaunix
.
fnLPS_GetProfileCount
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_GetProfileCount
(
device_id
)
def
fnLPS_GetProfileDwellTime
(
device_id
):
vaunix
.
fnLPS_GetProfileDwellTime
.
argtypes
=
[
ctypes
.
c_uint
]
vaunix
.
fnLPS_GetProfileDwellTime
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_GetProfileDwellTime
(
device_id
)
def
fnLPS_GetProfileIdleTime
(
device_id
):
vaunix
.
fnLPS_GetProfileIdleTime
.
argtypes
=
[
ctypes
.
c_uint
]
vaunix
.
fnLPS_GetProfileIdleTime
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_GetProfileIdleTime
(
device_id
)
def
fnLPS_GetProfileIndex
(
device_id
):
vaunix
.
fnLPS_GetProfileIndex
.
argtypes
=
[
ctypes
.
c_uint
]
vaunix
.
fnLPS_GetProfileIndex
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_GetProfileIndex
(
device_id
)
def
fnLPS_GetMaxPhaseShift
(
device_id
):
vaunix
.
fnLPS_GetMaxPhaseShift
.
argtypes
=
[
ctypes
.
c_uint
]
vaunix
.
fnLPS_GetMaxPhaseShift
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_GetMaxPhaseShift
(
device_id
)
def
fnLPS_GetMinPhaseShift
(
device_id
):
vaunix
.
fnLPS_GetMinPhaseShift
.
argtypes
=
[
ctypes
.
c_uint
]
vaunix
.
fnLPS_GetMinPhaseShift
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_GetMinPhaseShift
(
device_id
)
def
fnLPS_GetMinPhaseStep
(
device_id
):
vaunix
.
fnLPS_GetMinPhaseStep
.
argtypes
=
[
ctypes
.
c_uint
]
vaunix
.
fnLPS_GetMinPhaseStep
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_GetMinPhaseStep
(
device_id
)
def
fnLPS_GetMaxWorkingFrequency
(
device_id
):
vaunix
.
fnLPS_GetMaxWorkingFrequency
.
argtypes
=
[
ctypes
.
c_uint
]
vaunix
.
fnLPS_GetMaxWorkingFrequency
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_GetMaxWorkingFrequency
(
device_id
)
def
fnLPS_GetMinWorkingFrequency
(
device_id
):
vaunix
.
fnLPS_GetMinWorkingFrequency
.
argtypes
=
[
ctypes
.
c_uint
]
vaunix
.
fnLPS_GetMinWorkingFrequency
.
restype
=
ctypes
.
c_int
return
vaunix
.
fnLPS_GetMinWorkingFrequency
(
device_id
)
Instruments/Drivers/Vaunix/phase_shifter.py
0 → 100644
View file @
55a7a5d9
# -*- coding: utf-8 -*-
"""
Library for Vaunix phase shifter
"""
import
socket
from
Instruments.Drivers.Vaunix.server
import
MSGLEN
,
PORT
class
LPS
(
object
):
"""Instrument Driver for Vaunix LPS-802 phase shifter
This code should be compatible with the other phase shifter of Vaunix
"""
def
__init__
(
self
,
ip
,
*
pars
):
"""
*pars are not used, however required
to be compatible to other devices.
Parameters
-----------
ip : str
IP of the windows machine to which is connected the phase shifter
Notice that the server for vaunix must be started before trying to
access to it.
"""
self
.
_ip
=
ip
self
.
__version__
=
'0.0'
def
com
(
self
,
command
,
arg
=
''
):
"""Function to communicate with the device. Gives the current status
if no arg is given
"""
command
=
"{} {}"
.
format
(
command
,
arg
).
encode
(
"ascii"
,
"ignore"
)
if
len
(
command
)
>
MSGLEN
:
raise
ValueError
(
'The command with the arg should'
\
' not excess {} characters'
.
format
(
MSGLEN
))
try
:
client
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
client
.
connect
((
self
.
_ip
,
PORT
))
except
ConnectionRefusedError
:
raise
Exception
(
'The phase shifter cannot be found at the ip {s}'
.
format
(
self
.
_ip
))
client
.
send
(
command
)
answer
=
client
.
recv
(
MSGLEN
).
decode
(
"ascii"
)
client
.
close
()
if
'ERROR:'
in
answer
:
raise
Exception
(
answer
)
return
answer
def
identify
(
self
):
"""Query identify string of the device"""
return
self
.
com
(
'IDN'
,
'?'
)
def
frequency
(
self
,
arg
=
'?'
):
"""Set/query working frequency.
Parameters
-----------
arg : '?', 'min?', 'max?', float
Frequency in Hz. Use '?' to query the current value, 'min?' for the
minimum working frequency and 'max?' for the maximum.
Note that the frequency step of the instrument is 100kHz, however you
can use it for other frequencies even if the phase step will not be
calibrated. If you frequency is not a multiple of 100kHz, the working
frequency is fixed to the nearest multiple.
"""
try
:
frequency
=
round
(
float
(
arg
)
/
100e3
)
self
.
com
(
'FREQ'
,
frequency
)
except
ValueError
:
return
float
(
self
.
com
(
'FREQ'
,
arg
))
*
100e3
def
phase
(
self
,
arg
=
'?'
):
"""Set/query phase
Parameters
-----------
arg : '?', 'min?', 'max?', 'step?', int
Phase in degree. Use '?' to query the current value, 'min?' for the
minimum phase, 'max?' for the maximum phase and 'step?' for the
minimum step value.
"""
try
:
phase
=
int
(
arg
)
self
.
com
(
'PHASE'
,
phase
)
except
ValueError
:
return
self
.
com
(
'PHASE'
,
arg
)
\ No newline at end of file
Instruments/Drivers/Vaunix/server.py
0 → 100644
View file @
55a7a5d9
# -*- coding: utf-8 -*-
"""
Server for the Vaunix phase shifter.
This code is divided in two parts, first there is an object LPSServer which is
always running and detects when a phase shifter is connected to the computer.
When it is detected, it create a NetworkServer thread which is handling the
communication through the network. The goal of this architecture is have a not
blocking server.
To shutdown the server you just need to press enter.
"""
from
threading
import
Thread
from
queue
import
Queue
,
Empty
import
socket
import
time
MSGLEN
=
2048
PORT
=
10802
class
LPS
(
object
):
""" This object is handling the main thread which detect the connection
of the phase shifter and start the network server thread when it is the
case. It handles all communication with the phase shifter?
"""
def
__init__
(
self
,
stop_queue
):
"""
Parameters
-----------
stop_queue : Queue
It is the Queue used to kill the main thread, putting anything to
this queue will stop it.
"""
self
.
_stop_queue
=
stop_queue
api
.
fnLPS_SetTestMode
(
False
)
# Two queues are used, one to send message from the the network
# server to this object and another one for the other way around.
self
.
_network_lps_queue
=
Queue
()
self
.
_lps_network_queue
=
Queue
()
self
.
_network_server_running
=
False
self
.
_network_server
=
NetworkServer
(
self
.
_network_lps_queue
,
self
.
_lps_network_queue
)
self
.
_phase_shifter_id
=
None
def
run
(
self
):
while
True
:
# Check if we are trying to stop the script
try
:
self
.
_stop_queue
.
get
(
False
)
self
.
stop_network
()
break
except
Empty
:
pass
number_phase_shifter
=
api
.
fnLPS_GetNumDevices
()
if
number_phase_shifter
and
not
self
.
_network_server_running
:
self
.
start_network
()
if
not
number_phase_shifter
and
self
.
_network_server_running
:
self
.
stop_network
()
if
self
.
_network_server_running
:
try
:
self
.
treat_instruction
(
self
.
_network_lps_queue
.
get
(
False
))
except
Empty
:
pass
time
.
sleep
(
1.
)
def
start_network
(
self
):
self
.
_network_server_running
=
True
self
.
_network_server_thread
=
Thread
(
target
=
self
.
_network_server
.
run
)
self
.
_network_server_thread
.
start
()
self
.
_phase_shifter_id
=
api
.
fnLPS_GetDevInfo
()[
0
]
def
stop_network
(
self
):
self
.
_lps_network_queue
.
put
(
'stop'
)
self
.
_network_server_thread
.
join
()
self
.
_network_server_running
=
False
self
.
_phase_shifter_id
=
None
def
treat_instruction
(
self
,
command
):
try
:
command
,
arg
=
command
.
lower
().
split
()
except
ValueError
:
self
.
_lps_network_queue
.
put
(
'ERROR: The command is not '
\
'correctly formated.'
)
api
.
fnLPS_InitDevice
(
self
.
_phase_shifter_id
)
if
command
==
'freq'
:
if
arg
==
'?'
:
self
.
_lps_network_queue
.
put
(
api
.
fnLPS_GetWorkingFrequency
(
self
.
_phase_shifter_id
))
elif
arg
==
'min?'
:
self
.
_lps_network_queue
.
put
(
api
.
fnLPS_GetMinWorkingFrequency
(
self
.
_phase_shifter_id
))
elif
arg
==
'max?'
:
self
.
_lps_network_queue
.
put
(
api
.
fnLPS_GetMaxWorkingFrequency
(
self
.
_phase_shifter_id
))
else
:
try
:
freq
=
int
(
arg
)
ret
=
api
.
fnLPS_SetWorkingFrequency
(
self
.
_phase_shifter_id
,
freq
)
self
.
_lps_network_queue
.
put
(
self
.
check_error
(
ret
))
except
ValueError
:
self
.
_lps_network_queue
.
put
(
'ERROR: The frequency is not an integer'
)
elif
command
==
'phase'
:
if
arg
==
'?'
:
self
.
_lps_network_queue
.
put
(
api
.
fnLPS_GetPhaseAngle
(
self
.
_phase_shifter_id
))
elif
arg
==
'min?'
:
self
.
_lps_network_queue
.
put
(
api
.
fnLPS_GetMinPhaseShift
(
self
.
_phase_shifter_id
))
elif
arg
==
'max?'
:
self
.
_lps_network_queue
.
put
(
api
.
fnLPS_GetMaxPhaseShift
(
self
.
_phase_shifter_id
))
elif
arg
==
'step?'
:
self
.
_lps_network_queue
.
put
(
api
.
fnLPS_GetMinPhaseStep
(
self
.
_phase_shifter_id
))
else
:
try
:
phase
=
int
(
arg
)
ret
=
api
.
fnLPS_SetPhaseAngle
(
self
.
_phase_shifter_id
,
phase
)
self
.
_lps_network_queue
.
put
(
self
.
check_error
(
ret
))
except
ValueError
:
self
.
_lps_network_queue
.
put
(
'ERROR: The phase is not an integer'
)
elif
command
==
'idn'
and
arg
==
'?'
:
name
=
api
.
fnLPS_GetModelNameA
(
self
.
_phase_shifter_id
)
serial_number
=
api
.
fnLPS_GetSerialNumber
(
self
.
_phase_shifter_id
)
self
.
_lps_network_queue
.
put
(
name
+
' serial number: {}'
.
format
(
serial_number
))
else
:
self
.
_lps_network_queue
.
put
(
'ERROR: The command was not reconnized.'
)
api
.
fnLPS_CloseDevice
(
self
.
_phase_shifter_id
)
def
check_error
(
self
,
ret
):
"""
Check the return for the function sending back a LPSTATUS (see
vnx_lps_api.h and the binding to know exactly which one).
Parameters
-----------
ret : int
(LPSTATUS return code in vnx_lps_api.h)
"""
if
ret
==
api
.
STATUS_OK
:
return
'OK'
elif
ret
==
api
.
BAD_PARAMETER
:
return
'ERROR: The parameters is outside of the possible range.'
elif
ret
==
api
.
BAD_HID_IO
:
return
'ERROR: An error appends during the communication with the device.'
elif
ret
==
api
.
DEVICE_NOT_READY
:
return
'ERROR: The phase shifter is not yet ready.'
class
NetworkServer
(
object
):
def
__init__
(
self
,
network_lps_queue
,
lps_network_queue
):
"""
Parameters
-----------
network_lps_queue : Queue