"""
.. module:: skrf.vi.vna.vna
=================================================
vna (:mod:`skrf.vi.vna.vna`)
=================================================
Provides the VNA base class
"""
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
pass
import functools
import inspect
import re
from abc import ABC
from enum import Enum, auto
import numpy as np
import pyvisa
from ..scpi_errors import SCPIError
from ..validators import Validator
def _format_cmd(cmd: str, **kwargs) -> str:
def sub(match_obj):
prefix = match_obj.group("prefix")
attr = match_obj.group("attr")
if prefix:
obj = kwargs[prefix]
return str(getattr(obj, attr))
else:
return str(kwargs[attr])
param_re = re.compile(r"\<(?:(?P<prefix>\w+):)?(?P<attr>\w+)\>")
return re.sub(param_re, sub, cmd)
class ValuesFormat(Enum):
"""How values are written to and queried from the insturment"""
#: 32 bits per value
BINARY_32 = auto()
#: 64 bits per value
BINARY_64 = auto()
#: Transferred as ASCII (e.g. representing numbers as strings)
ASCII = auto()
class Channel:
"""
A single channel of the instrument.
This is only for those instruments which support channels, and should be
subclassed in those instrument classes.
.. warning::
This class should not be instantiated directly
"""
def __init__(
self, parent, cnum: int | None = None, cname: str | None = None
) -> None:
self.parent = parent
self.cnum = cnum
self.name = cname
self.read = self.parent.read
self.read_values = self.parent.read_values
self.write = self.parent.write
self.write_values = self.parent.write_values
self.query = self.parent.query
self.query_values = self.parent.query_values
class VNA(ABC):
_scpi = True # Set to false in subclasses that don't use SCPI
def __init__(
self, address: str, backend: str = "@py", timeout: int | None = None
) -> None:
rm = pyvisa.ResourceManager(backend)
self._resource = rm.open_resource(address, timeout=timeout)
# Reading and setting the query values format is instrument specific
# and must be done for each subclass. We default to using ASCII
self._values_fmt: ValuesFormat = ValuesFormat.ASCII
if self._scpi:
self._setup_scpi()
self.echo = False
def __init_subclass__(cls):
if "Channel" in [c[0] for c in inspect.getmembers(cls, inspect.isclass)]:
cls._add_channel_support()
@classmethod
def _add_channel_support(cls):
def create_channel(self, cnum: int, cname: str) -> None:
ch_id = f"ch{cnum}"
if hasattr(self, ch_id):
raise RuntimeError(f"Channel {cnum} already exists")
new_channel = self.Channel(self, cnum, cname)
setattr(self, ch_id, new_channel)
def delete_channel(self, cnum: str) -> None:
ch_id = f"ch{cnum}"
if not hasattr(self, ch_id):
return
ch = getattr(self, ch_id)
if hasattr(ch, "_on_delete"):
ch._on_delete()
delattr(self, ch_id)
def _channels(self) -> list[Channel]:
return [getattr(self, ch) for ch in dir(self) if re.fullmatch(r"ch\d+", ch)]
def __getattr__(self, k):
if not hasattr(self.Channel, k):
raise AttributeError(f"{type(self).__name__} has no attribute {k}")
return getattr(self.ch1, k)
cls.create_channel = create_channel
cls.delete_channel = delete_channel
cls.channels = property(_channels)
cls.__getattr__ = __getattr__
def _setup_scpi(self) -> None:
self.__class__.wait_for_complete = lambda self: self.query("*OPC?")
self.__class__.status = property(lambda self: self.query("*STB?"))
self.__class__.options = property(lambda self: self.query("*OPT?"))
self.__class__.id = property(lambda self: self.query("*IDN?"))
self.__class__.clear_errors = lambda self: self.write("*CLS")
def errcheck(self) -> None:
err = self.query("SYST:ERR?")
errno = int(err.split(",")[0])
if errno == 0:
return
else:
raise SCPIError(errno)
self.__class__.check_errors = errcheck
[docs]
@staticmethod
def command(
get_cmd: str | None = None,
set_cmd: str | None = None,
doc: str | None = None,
validator: Validator | None = None,
values: bool = False,
values_container: type | None = np.array,
complex_values: bool = False,
) -> property:
"""
Create a property for the instrument.
This method is used to add a property to an instrument. These properties
can be read-only, write-only, read-write, and can validate values before
sending to the instrument as well as validate responses from the
instrument to return proper types.
Parameters
----------
get_cmd
Command sent to the instrument to request data
set_cmd
Command sent to the instrument to set data
doc
The docstring for the property
validator
The :class:`Validator` that will be used to transform data to the
proper format before sending and after querying
values
Whether or not this command is using a `Sequence` to set data, or
expects a `Sequence` in response.
values_container:
If values is true, you set set this to the type of container the
values should be returned in. For example, this is np.array by
default, meaning instead of return a `list`, you will get a numpy
array.
complex_values:
If the values expected from the instrument are complex. If so, the
values will be converted from [real[0], imag[0], real[1], imag[1], ...]
to [complex(real[0], imag[0]), complex(real[1], imag[1]), ...]
Returns
-------
property
The property constructed from the parameters passed. Should be set
to a class variable
"""
def fget(self, get_cmd=get_cmd, validator=validator):
if get_cmd is None:
raise LookupError("Property cannot be read")
cmd = _format_cmd(get_cmd, self=self)
if values:
arg = self.query_values(
cmd, container=values_container, complex_values=complex_values
)
else:
arg = self.query(cmd)
if hasattr(self, "wait_for_complete"):
self.wait_for_complete()
if validator:
return validator.validate_output(arg)
else:
return arg
def fset(self, arg, set_cmd=set_cmd, validator=validator):
if set_cmd is None:
raise LookupError("Property cannot be set")
if validator:
arg = validator.validate_input(arg)
cmd = _format_cmd(set_cmd, self=self, arg=arg)
self.write(cmd)
if hasattr(self, "wait_for_complete"):
self.wait_for_complete()
fget.__doc__ = doc
# TODO: Potentially add the validator docstring to add the verbosity to
# the generated docs, but keep the code less verbose?
return property(fget=fget, fset=fset)
@property
def timeout(self) -> int | None:
return self._resource.timeout
@timeout.setter
def timeout(self, timeout: int | None) -> None:
self._resource.timeout = timeout
def read(self, **kwargs) -> None:
if isinstance(self._resource, pyvisa.resources.MessageBasedResource):
fn = self._resource.read
elif isinstance(self._resource, pyvisa.resources.RegisterBasedResource):
raise NotImplementedError()
else:
raise RuntimeError("unreachable")
return fn(**kwargs)
def read_values(self, **kwargs) -> None: # noqa: B027
pass
def write(self, cmd, **kwargs) -> None:
if self.echo:
print(cmd)
if isinstance(self._resource, pyvisa.resources.MessageBasedResource):
fn = self._resource.write
elif isinstance(self._resource, pyvisa.resources.RegisterBasedResource):
raise NotImplementedError()
else:
raise RuntimeError("unreachable")
fn(cmd, **kwargs)
def write_values(self, cmd, values, complex_values: bool = False, **kwargs) -> None:
if self.echo:
print(cmd)
if complex_values:
values = np.array([(x.real, x.imag) for x in values]).flatten()
if isinstance(self._resource, pyvisa.resources.MessageBasedResource):
if self._values_fmt == ValuesFormat.ASCII:
fn = self._resource.write_ascii_values
elif self._values_fmt == ValuesFormat.BINARY_32:
fn = self._resource.write_binary_values
elif self._values_fmt == ValuesFormat.BINARY_64:
fn = functools.partial(self._resource.write_binary_values, datatype="d")
elif isinstance(self._resource, pyvisa.resources.RegisterBasedResource):
raise NotImplementedError()
else:
raise RuntimeError("unreachable")
return fn(cmd, values, **kwargs)
def query(self, cmd, **kwargs) -> None:
if self.echo:
print(cmd)
if isinstance(self._resource, pyvisa.resources.MessageBasedResource):
fn = self._resource.query
elif isinstance(self._resource, pyvisa.resources.RegisterBasedResource):
raise NotImplementedError()
else:
raise RuntimeError("unreachable")
return fn(cmd, **kwargs)
def query_values(self, cmd, complex_values: bool = False, **kwargs) -> None:
if self.echo:
print(cmd)
if isinstance(self._resource, pyvisa.resources.MessageBasedResource):
if self._values_fmt == ValuesFormat.ASCII:
fn = self._resource.query_ascii_values
elif self._values_fmt == ValuesFormat.BINARY_32:
fn = self._resource.query_binary_values
elif self._values_fmt == ValuesFormat.BINARY_64:
fn = functools.partial(self._resource.query_binary_values, datatype="d")
elif isinstance(self._resource, pyvisa.resources.RegisterBasedResource):
raise NotImplementedError()
else:
raise RuntimeError("unreachable")
vals = fn(cmd, **kwargs)
if complex_values:
vals = np.vectorize(complex)(vals[::2], vals[1::2])
return vals