Source code for skrf.vi.vna.hp.hp8510c

"""
.. module:: skrf.vi.vna.hp.hp8510c
=================================================
HP 8510C (:mod:`skrf.vi.vna.hp.hp8510c`)
=================================================

HP8510C Class
================

.. autosummary::
    :nosignatures:
    :toctree: generated/
"""

import time

import numpy as np
import pyvisa

import skrf
import skrf.network
from skrf.vi.vna import VNA

from .hp8510c_sweep_plan import SweepPlan


[docs] class HP8510C(VNA): ''' HP 8510 driver that is capable of compound sweeps, segmented sweeps, and fast binary transfers. These features make this venerable old instrument much more pleasant to use in the 21st century. Likely works with "A" and "B" versions of the instrument as well. Compound sweeps occur automatically when the user requests a sweep larger than what the instrument natively supports (51/101/201/401/801pts). This driver takes multiple shorter sweeps and stitches them together. Segmented sweeps occur automatically when the user requests a short or irregularly spaced sweep (see "Advanced Example" below). The 8510 actually does support sweeps other than 51/101/201/401/801pts, but only in a separate mode (segmented sweep mode) and with significant restrictions. This driver knows how to handle segmented sweep mode to get what it wants. Examples ============ Basic one-port: .. code-block:: python vna = skrf.vi.vna.hp.HP8510C(address='TCPIP::ad007-right.lan::gpib0,16::INSTR', backend='@py') vna.set_frequency_sweep(2e9,3e9,201) vna.get_snp_network(ports=(1,)) Basic two-port: .. code-block:: python vna = skrf.vi.vna.hp.HP8510C(address='TCPIP::ad007-right.lan::gpib0,16::INSTR', backend='@py') vna.set_frequency_sweep(2e9,3e9,201) vna.get_snp_network(ports=(1,2)) Intermediate example -- note that 1001 point sweeps are not natively supported by the instrument; this driver takes multiple sweeps and stitches the results together. .. code-block:: python vna = skrf.vi.vna.hp.HP8510C(address='TCPIP::ad007-right.lan::gpib0,16::INSTR', backend='@py') vna.set_frequency_sweep(2e9,3e9,1001) vna.get_snp_network(ports=(1,2)) Advanced example. The driver is handed a bucket of frequencies containing two separate bands mashed together. Behind the scenes it will construct a sweep plan consisting of one native sweep and one segmented sweep, perform both, and stitch the results together -- but all the user need worry about is constructing the request and interpreting the results. .. code-block:: python vna = skrf.vi.vna.hp.HP8510C(address='TCPIP::ad007-right.lan::gpib0,16::INSTR', backend='@py') freq_block_1 = np.linspace(1e9,2e9,801) freq_block_2 = [10e9,11e9,12e9] freqs = np.concatenate((freq_block_1, freq_block_2)) vna.frequency = skrf.Frequency.from_f(freqs) vna.get_snp_network(ports=(1,2)) ''' min_hz = None #: Minimum frequency supported by instrument max_hz = None #: Maximum frequency supported by instrument compound_sweep_plan = None #: If None, get_snp_network()/one_port()/two_port() just ask the VNA for data. # If populated, those methods perform the multiple sweeps in the plan and stitch together the results.
[docs] def __init__(self, address : str, backend : str = "@py", **kwargs): super().__init__(address, backend, **kwargs) # Tested 2024-03-09: # HP8510C.07.14 # AD007 (a VXI11-complaint GPIB-Ethernet adapter) # address="TCPIP::ad007-right.lan::gpib0,16::INSTR", backend="@py" # 8510s are slow. This check ensures we won't wait 60s for connection error. self._resource.timeout = 2_000 id_str = self.query('OUTPIDEN;') assert('HP8510' in id_str) # example: 'HP8510C.07.14: Aug 26 1998 ' # 8510s are slow. Actual work might take acutal 60 seconds. self._resource.timeout = 60_000 self._resource.read_termination = False # Binary mode doesn't work if we allow premature termination on \n self.read_raw = self._resource.read_raw self.reset() self.min_hz = self.freq_start self.max_hz = self.freq_stop self.write('STEP;') # If compound_sweep_plan is None, we rely on the internal state of the analyzer. # If compound_sweep_plan is a SweepPlan object, we must take multiple short sweeps # on the 8510 and stitch them into a single compound sweep ourselves. self.compound_sweep_plan = None
@property def id(self): ''' Instrument ID string ''' return self.query("OUTPIDEN;")
[docs] def reset(self): ''' Preset instrument. ''' self.write("FACTPRES;") self.wait_until_finished()
[docs] def clear(self): self._resource.clear()
[docs] def wait_until_finished(self): self.query("OUTPIDEN;")
[docs] def get_snp_network(self, ports, **kwargs): ''' MAIN METHOD for obtaining S parameters, like get_snp_network((1,)) or get_snp_network((1,2)). ''' ports = tuple(ports) sweep = kwargs.get("sweep", True) # name = kwargs.get("name", "") # raw_data = kwargs.get("raw_data", True) if ports==(1,): self.write('s11;') return self.one_port(fresh_sweep=sweep) elif ports==(2,): self.write('s22;') return self.one_port(fresh_sweep=sweep) elif ports==(1,2) or ports==(2,1): return self.two_port(fresh_sweep=sweep) else: raise(ValueError("Invalid ports "+str(ports)+". Options: (1,) (2,) (1,2)."))
[docs] def get_switch_terms(self, ports=(1, 2), **kwargs): ''' Returns (forward_one_port,reverse_one_port) switch terms. The ports short be connected with a half decent THRU before calling. These measure how much signal is reflected from the imperfect switched termination on the non-stimulated port. ''' return self.switch_terms()
@property def error(self): ''' Error from OUTPERRO ''' return self.query('OUTPERRO') @property def is_continuous(self): ''' True iff sweep mode is continuous. Can be set. ''' answer_dict={'\"HOLD\"':False,'\"CONTINUAL\"':True} return answer_dict[self.query('GROU?')] @is_continuous.setter def is_continuous(self, choice): if choice: self.write('CONT;') elif not choice: self.write('SING;') else: raise(ValueError('takes a boolean')) @property def averaging(self): ''' Averaging factor for AVERON ''' raise NotImplementedError @averaging.setter def averaging(self, factor ): self.write('AVERON %i;'%factor ) @property def _frequency(self): ''' Frequencies of non-compound sweep ''' freq=skrf.Frequency( self.freq_start, self.freq_stop, self._npoints, unit='hz' ) return freq @property def frequency(self): ''' Frequencies of compound sweep ''' if self.compound_sweep_plan is None: return self._frequency return skrf.Frequency.from_f(self.compound_sweep_plan.get_hz(), unit='hz') @frequency.setter def frequency(self, frequency_obj: skrf.Frequency): ''' List sweep using hz, an array of frequencies. If hz is too long, multiple sweeps will automatically be performed.''' hz = frequency_obj.f valid = (self.min_hz<=hz) & (hz<=self.max_hz) if not np.all(valid): print("set_frequency called with %i/%i points out of VNA frequency range. Dropping them." %(np.sum(valid),len(valid))) hz = hz[valid] self.compound_sweep_plan = SweepPlan.from_hz(hz)
[docs] def set_frequency_sweep(self, f_start, f_stop, f_npoints, **kwargs): ''' Interprets units and calls set_frequency_step ''' f_unit = kwargs.get("f_unit", "hz").lower() if f_unit != "hz": hz_start = self.to_hz(f_start, f_unit) hz_stop = self.to_hz(f_stop, f_unit) else: hz_start, hz_stop = f_start, f_stop self.set_frequency_step(hz_start, hz_stop, f_npoints)
[docs] def set_frequency_step(self, hz_start, hz_stop, npoint=801): ''' Step (slow, synthesized) sweep + logic to handle lots of npoint. ''' if (self._instrument_natively_supports_steps(npoint)): self.compound_sweep_plan = None self._set_instrument_step_state(hz_start, hz_stop, npoint) else: self.compound_sweep_plan = SweepPlan.from_ssn(hz_start, hz_stop, npoint)
[docs] def set_frequency_ramp(self, hz_start, hz_stop, npoint=801): ''' Ramp (fast, not synthesized) sweep. Must have standard npoint. ''' if npoint not in [51,101,201,401,801]: print("Warning: 8510C only supports NPOINT in [51,101,201,401,801]") self.resource.clear() self.write('RAMP; STAR %f; STOP %f; POIN%i;'%(hz_start,hz_stop,npoint))
@property def freq_start(self): ''' Start frequency [hz] ''' return float(self.query('STAR;OUTPACTI;')) @freq_start.setter def freq_start(self, new_start_hz): self.write(f'STEP; STAR {new_start_hz};') @property def freq_stop(self): ''' Stop frequency [hz] ''' return float(self.query('STOP;OUTPACTI;')) @freq_stop.setter def freq_stop(self, new_stop_hz): self.write(f'STEP; STOP {new_stop_hz};') @property def _npoints(self): ''' Number of points in non-compound sweep ''' instrument_ret_val = self.query('POIN;OUTPACTI;') v1 = instrument_ret_val.strip() v2 = float(v1) v3 = int(v2) return v3 @property def npoints(self): ''' Number of points in compound sweep (if programmed) or non-compound sweep ''' if self.compound_sweep_plan is None: return self._npoints return len(self.compound_sweep_plan.get_hz()) @npoints.setter def npoints(self, npoint): ''' Set number of points in sweep''' hz_start, hz_stop = self.freq_start, self.freq_stop if (self._instrument_natively_supports_steps(npoint)): self.compound_sweep_plan = None self._set_instrument_step_state(hz_start, hz_stop, npoint) else: self.compound_sweep_plan = SweepPlan.from_ssn(hz_start, hz_stop, npoint) def _instrument_natively_supports_steps(self, npoint): if npoint in [51,101,201,401,801]: return True if npoint<=792: # Supported with a single native list sweep return True return False def _set_instrument_step_state(self, hz_start, hz_stop, npoint=801): assert(self._instrument_natively_supports_steps(npoint)) if self._resource is not None: self._resource.clear() if npoint in [51,101,201,401,801]: # If it's a directly supported step sweep npoints, use regular sweep self.write('STEP; STAR %f; STOP %f; POIN%i;'%(hz_start,hz_stop,npoint)) elif npoint<=792: # List sweep lets us support, for example, 401<npoints<801 self.write('STEP;') self.write('EDITLIST;') self.write('CLEL;') self.write('SADD;') self.write('STAR %f; STOP %f; POIN %i;'%(hz_start,hz_stop,npoint)) self.write('SDON; EDITDONE; LISFREQ;') def _set_instrument_cwstep_state(self, hz_list): assert(len(hz_list)<=30) # 8510 only supports CW lists up to length 30 self.write('STEP;') self.write('EDITLIST;') self.write('CLEL;') for hz in hz_list: self.write('SADD;') self.write(f'CWFREQ {int(hz)};') self.write('SDON; EDITDONE; LISFREQ;')
[docs] def ask_for_cmplx(self, outp_cmd, timeout_s=30): """Like ask_for_values, but use FORM2 binary transfer, much faster than ASCII.""" # Benchmarks: # %time i.write('FORM4; OUTPDATA'); i._read(); None # 7960ms # %time i.write('FORM2; OUTPDATA'); i._read_raw(); None # 399ms self.wait_for_status(max_wait_seconds=timeout_s) self.write('FORM2;') self.write(outp_cmd) buf = self.read_raw() float_bin = buf[4:] # Skip 4 header bytes and trailing newline try: floats = np.frombuffer(float_bin, dtype='>f4').reshape((-1,2)) except ValueError as e: print(buf) print("len(buf): %i"%(len(buf),)) raise(e) cmplxs = (floats[:,0] + 1j*floats[:,1]).flatten() return cmplxs
def _one_port(self, expected_hz=None, fresh_sweep=True): ''' Perform a single sweep and return Network data. ''' if fresh_sweep: self.write('SING;') # Poll for sweep status s = self.ask_for_cmplx('OUTPDATA') ntwk = skrf.Network() ntwk.s = s hz = expected_hz if expected_hz is not None else self._frequency.f assert(len(s)==len(hz)) ntwk.frequency = skrf.Frequency.from_f(hz,unit='hz') return ntwk
[docs] def one_port(self, **kwargs): ''' Performs a single sweep OR COMPOUND SWEEP and returns Network data. ''' if self.compound_sweep_plan is None: return self._one_port() old_start_hz, old_stop_hz = self.freq_start, self.freq_stop stitched_network = None for sweep_section in self.compound_sweep_plan.get_sections(): sweep_section.apply_8510(self) chunk_net_nomask = self._one_port(expected_hz=sweep_section.get_raw_hz()) chunk_net = sweep_section.mask_8510(chunk_net_nomask) stitched_network = (chunk_net if stitched_network is None else skrf.network.stitch(stitched_network, chunk_net)) self.freq_start = old_start_hz self.freq_stop = old_stop_hz return stitched_network
def _two_port(self, expected_hz=None, fresh_sweep=True): ''' Performs a single sweep and returns Network data. ''' # ASCII vs Binary transfer performance: # ascii_xfer separate_sweep: 32s # bin_xfer separate_sweep: 08s # ------------------------------ # Decision: spend time to implement binary transfer. It matters. # # Sweep mode: # source mode strategy time # 83651 ramp FOUPOVER 12.8s # 83651 ramp 4xSxx 11.6s <-- in ramp mode + faststep, consecutive Sxx sweeps are fastest # 83651 ramp FULL2PORT 14.5s # 83651 step FOUPOVER 50.0s <-- in step mode, FOUPOVER is a tad faster. # 83651 step 4xSxx 52.5s # 83651 step FULL2PORT 52.3s # -------------------------- # Decision: use consecutive Sxx sweeps everywhere for simplicity + max speed of fast sweeps self.write('s11;') s11 = self._one_port(expected_hz=expected_hz, fresh_sweep=fresh_sweep).s[:,0,0] self.write('s12;') s12 = self._one_port(expected_hz=expected_hz, fresh_sweep=fresh_sweep).s[:,0,0] self.write('s22;') s22 = self._one_port(expected_hz=expected_hz, fresh_sweep=fresh_sweep).s[:,0,0] self.write('s21;') s21 = self._one_port(expected_hz=expected_hz, fresh_sweep=fresh_sweep).s[:,0,0] ntwk = skrf.Network() ntwk.s = np.array(\ [[s11,s21],\ [ s12, s22]]\ ).transpose().reshape(-1,2,2) hz = expected_hz if expected_hz is not None else self._frequency.f assert(len(s11)==len(hz)) ntwk.frequency= skrf.Frequency.from_f(hz,unit='hz') return ntwk
[docs] def two_port(self, **kwargs): ''' Performs a single sweep OR COMPOUND SWEEP and returns Network data. ''' if self.compound_sweep_plan is None: return self._two_port() old_start_hz, old_stop_hz = self.freq_start, self.freq_stop stitched_network = None for sweep_chunk in self.compound_sweep_plan.get_sections(): sweep_chunk.apply_8510(self) chunk_net_nomask = self._two_port(expected_hz=sweep_chunk.get_raw_hz()) chunk_net = sweep_chunk.mask_8510(chunk_net_nomask) stitched_network = (chunk_net if stitched_network is None else skrf.network.stitch(stitched_network,chunk_net)) self.freq_start = old_start_hz self.freq_stop = old_stop_hz return stitched_network
[docs] def wait_for_status(self, max_wait_seconds=30): ''' Fetches status bytes respecting max_wait_seconds even if it must tank a few timeouts along the way. ''' t0 = time.time() while True: try: status_str = self.query('OUTPSTAT') s0str, s1str = status_str.strip().split(',') s0 = int(s0str) s1 = int(s1str) break except pyvisa.errors.VisaIOError as e: waited = time.time()-t0 if waited > max_wait_seconds: raise e return s0,s1
[docs] def switch_terms(self): ''' Returns (forward_one_port,reverse_one_port) switch terms. The ports short be connected with a half decent THRU before calling. These measure how much signal is reflected from the imperfect switched termination on the non-stimulated port. ''' print('forward') self.write('USER2;DRIVPORT1;LOCKA1;NUMEB2;DENOA2;CONV1S;') forward = self.one_port() forward.name = 'forward switch term' print ('reverse') self.write('USER1;DRIVPORT2;LOCKA2;NUMEB1;DENOA1;CONV1S;') reverse = self.one_port() reverse.name = 'reverse switch term' return (forward,reverse)