"""
.. module:: skrf.io.csv
========================================
csv (:mod:`skrf.io.csv`)
========================================
Functions for reading and writing standard csv files
----------------------------------------------------
.. autosummary::
:toctree: generated/
read_all_csv
AgilentCSV
Reading/Writing Agilent
------------------------
.. autosummary::
:toctree: generated/
read_pna_csv
pna_csv_2_ntwks
pna_csv_2_ntwks3
pna_csv_2_df
Reading/Writing R&S
--------------------
.. autosummary::
:toctree: generated/
read_zva_dat
read_all_zva_dat
zva_dat_2_ntwks
Reading/Writing Anritsu VectorStar
-----------------------------------
.. autosummary::
:toctree: generated/
vectorstar_csv_2_ntwks
read_vectorstar_csv
"""
from __future__ import annotations
import os
from logging import getLogger
from warnings import warn
import numpy as np
from .. import mathFunctions as mf
from .. import util
from ..constants import FREQ_UNITS, FrequencyUnitT
from ..frequency import Frequency
from ..network import Network
logger = getLogger(__name__)
# delayed imports
# from pandas import Series, Index, DataFrame
[docs]
def read_pna_csv(filename, *args, **kwargs) -> tuple[str, str, np.ndarray]:
r"""
Reads data from a csv file written by an Agilient PNA.
This function returns a triplet containing the header, comments,
and data.
Parameters
----------
filename : str
the file
\*args, \*\*kwargs :
Returns
-------
header : str
The header string, which is the line following the 'BEGIN'
comments : str
All lines that begin with a '!'
data : :class:`numpy.ndarray`
An array containing the data. The meaning of which depends on
the header.
See Also
--------
pna_csv_2_ntwks : Reads a csv file which contains s-parameter data
Examples
--------
>>> header, comments, data = rf.read_pna_csv('myfile.csv')
"""
warn("deprecated", DeprecationWarning, stacklevel=2)
with open(filename) as fid:
begin_line = -2
end_line = -1
n_END = 0
comments = ''
for k,line in enumerate(fid.readlines()):
if line.startswith('!'):
comments += line[1:]
elif line.startswith('BEGIN') and n_END == 0:
begin_line = k
elif line.startswith('END'):
if n_END == 0:
#first END spotted -> set end_line to read first data block only
end_line = k
#increment n_END to allow for CR correction in genfromtxt
n_END += 1
if k == begin_line+1:
header = line
footer = k - end_line
try:
data = np.genfromtxt(
filename,
delimiter = ',',
skip_header = begin_line + 2,
skip_footer = footer - (n_END-1)*2,
**kwargs
)
except(ValueError):
# carriage returns require a doubling of skiplines
data = np.genfromtxt(
filename,
delimiter = ',',
skip_header = (begin_line + 2)*2,
skip_footer = footer,
**kwargs
)
# pna uses unicode coding for degree symbol, but we dont need that
header = header.replace('\xb0','deg').rstrip('\n').rstrip('\r')
units_dict: dict[str, FrequencyUnitT] = {k.lower(): k for k in FREQ_UNITS.keys()}
# Get the frequency unit from the header and convert to Hz
unit_raw = header.split(',')[0].strip('Freq')[1:-1]
try:
unit_tmp = unit_raw.lower()
if unit_tmp in units_dict:
data[:, 0] *= FREQ_UNITS[units_dict[unit_tmp]]
except Exception as exc:
raise ValueError(f"Could not parse frequency unit '{unit_raw}'") from exc
return header, comments, data
[docs]
def pna_csv_2_df(filename):
"""
Reads data from a csv file written by an Agilient PNA as a pandas DataFrame.
Parameters
----------
filename : string
filename
Returns
-------
df : `pandas.DataFrame`
"""
warn("deprecated", DeprecationWarning, stacklevel=2)
from pandas import DataFrame, Index
header, comments, d = read_pna_csv(filename)
names = header.split(',')
index = Index(d[:,0], name = names[0])
df=DataFrame({names[k]: d[:,k] for k in range(1,len(names))}, index=index)
return df
def pna_csv_2_ntwks2(filename, *args, **kwargs):
warn("deprecated", DeprecationWarning, stacklevel=2)
df = pna_csv_2_df(filename, *args, **kwargs)
header, comments, d = read_pna_csv(filename)
ntwk_dict = {}
param_set=set([k[:3] for k in df.columns])
f = df.index.values
for param in param_set:
try:
s = mf.dbdeg_2_reim(
df[f'{param} Log Mag(dB)'].values,
df[f'{param} Phase(deg)'].values,
)
except(KeyError):
s = mf.dbdeg_2_reim(
df[f'{param} (REAL)'].values,
df[f'{param} (IMAG)'].values,
)
ntwk_dict[param] = Network(f=f, s=s, name=param, comments=comments, f_unit='Hz')
try:
s=np.zeros((len(f),2,2), dtype=complex)
s[:,0,0] = ntwk_dict['S11'].s.flatten()
s[:,1,1] = ntwk_dict['S22'].s.flatten()
s[:,1,0] = ntwk_dict['S21'].s.flatten()
s[:,0,1] = ntwk_dict['S12'].s.flatten()
name =os.path.splitext(os.path.basename(filename))[0]
ntwk = Network(f=f, s=s, name=name, comments=comments)
return ntwk
except Exception:
return ntwk_dict
[docs]
def pna_csv_2_ntwks3(filename):
"""
Read a CSV file exported from an Agilent PNA in dB/deg format.
Parameters
----------
filename : str
full path or filename
Returns
-------
out : n
2-Port Network
"""
header, comments, d = read_pna_csv(filename)
col_headers = pna_csv_header_split(filename)
# set impedance to 50 Ohm (doesn't matter for now)
z0 = np.ones(np.shape(d)[0])*50
# read f values
f = d[:,0]
name = os.path.splitext(os.path.basename(filename))[0]
if 'db' in header.lower() and 'deg' in header.lower():
# this is a cvs in DB/DEG format
# -> convert db/deg values to real/imag values
s = np.zeros((len(f),2,2), dtype=complex)
for k, h in enumerate(col_headers[1:]):
if 's11' in h.lower() and 'db' in h.lower():
s[:,0,0] = mf.dbdeg_2_reim(d[:,k+1], d[:,k+2])
elif 's21' in h.lower() and 'db' in h.lower():
s[:,1,0] = mf.dbdeg_2_reim(d[:,k+1], d[:,k+2])
elif 's12' in h.lower() and 'db' in h.lower():
s[:,0,1] = mf.dbdeg_2_reim(d[:,k+1], d[:,k+2])
elif 's22' in h.lower() and 'db' in h.lower():
s[:,1,1] = mf.dbdeg_2_reim(d[:,k+1], d[:,k+2])
n = Network(f=f,s=s,z0=z0, name = name, f_unit="Hz")
return n
else:
warn("File does not seem to be formatted properly (only dB/deg supported for now)", stacklevel=2)
[docs]
def read_all_csv(dir='.', contains = None):
"""
Read all CSV files in a directory.
Parameters
----------
dir : str, optional
the directory to load from, default \'.\'
contains : str, optional
if not None, only files containing this substring will be loaded
Returns
-------
out : dictionary
dictionary containing all loaded CSV objects. keys are the
filenames without extensions, and the values are the objects
"""
out={}
for filename in os.listdir(dir):
if contains is not None and contains not in filename:
continue
fullname = os.path.join(dir,filename)
keyname = os.path.splitext(filename)[0]
try:
out[keyname] = pna_csv_2_ntwks3(fullname)
continue
except Exception:
pass
try:
out[keyname] = Network(fullname)
continue
except Exception:
pass
return out
[docs]
class AgilentCSV:
"""
Agilent-style csv file representing either scalar traces vs frequency
or complex data vs. frequency.
"""
[docs]
def __init__(self, filename, *args, **kwargs):
r"""
Init.
Parameters
----------
filename : str
filename
\*args ,\*\*kwargs :
passed to Network.__init__ in :func:`networks` and :func:`scalar_networks`
"""
self.filename = filename
self.header, self.comments, self.data = self.read()
self.args, self.kwargs = args, kwargs
[docs]
def read(self):
"""
Reads data from file.
This function returns a triplet containing the header, comments,
and data.
Returns
-------
header : str
The header string, which is the line following the 'BEGIN'
comments : str
All lines that begin with a '!'
data : :class:`numpy.ndarray`
An array containing the data. The meaning of which depends on
the header.
"""
with open(self.filename) as fid:
begin_line = -2
end_line = -1
comments = ''
for k,line in enumerate(fid.readlines()):
if line.startswith('!'):
comments += line[1:]
elif line.startswith('BEGIN'):
begin_line = k
elif line.startswith('END'):
end_line = k
if k == begin_line+1:
header = line
footer = k - end_line
try:
data = np.genfromtxt(
self.filename,
delimiter = ',',
skip_header = begin_line + 2,
skip_footer = footer,
)
except(ValueError):
# carriage returns require a doubling of skiplines
data = np.genfromtxt(
self.filename,
delimiter = ',',
skip_header = (begin_line + 2)*2,
skip_footer = footer,
)
# pna uses unicode coding for degree symbol, but we dont need that
header = header.replace('\xb0','deg').rstrip('\n').rstrip('\r')
return header, comments, data
@property
def frequency(self):
"""
Frequency object : :class:`~skrf.frequency.Frequency`.
"""
d = self.data
#try to pull out frequency unit
cols = self.columns
try:
f_unit = cols[0].split('(')[1].split(')')[0]
except Exception:
f_unit = 'hz'
f = d[:,0]
return Frequency.from_f(f, unit = f_unit)
@property
def n_traces(self):
"""
number of data traces : int
"""
return self.data.shape[1] - 1
@property
def columns(self):
"""
List of column names : list of str.
This function is needed because Agilent allows the delimiter
of a csv file (ie `'`) to be present in the header name. ridiculous.
If splitting the header fails, then a suitable list is returned of
the correct length, which looks like::
['Freq(?)','filename-0','filename-1',..]
"""
header, d = self.header, self.data
n_traces = d.shape[1] - 1 # because there's is one frequency column
if header.count(',') == n_traces:
cols = header.split(',') # column names
else:
# the header contains too many delimiters. what loosers. maybe
# we can split it on `)'` instead
if header.count('),') == n_traces:
cols = header.split('),')
# we need to add back the parenthesis we split on to all but
# last columns
cols = [col + ')' for col in cols[:-1]] + [cols[-1]]
else:
# I dont know how to separate column names
warn('Cant decipher header, so I\'m creating one. check output. ', stacklevel=2)
cols = ['Freq(?),']+['%s-%i'%(util.basename_noext(self.filename),k) \
for k in range(n_traces)]
return cols
@property
def scalar_networks(self):
"""
Returns list of Networks for each column.
.. note::
The data is stored in the Network's `.s` property, so its up
to you to interpret results. if 'db' is in the column name then
it is converted to linear before being store into `s`.
Returns
--------
out : list of :class:`~skrf.network.Network` objects
list of Networks representing the data contained in each column
"""
comments = self.comments
d = self.data
n_traces = d.shape[1] - 1 # because there's is one frequency column
cols = self.columns
freq = self.frequency
# loop through columns and create a single network for each column
ntwk_list = []
for k in range(1,n_traces+1):
s = d[:,k]
if 'db' in cols[k].lower():
s = mf.db_2_mag(s)
ntwk_list.append(
Network(
frequency = freq, s = s,comments = comments,
name = cols[k], **self.kwargs)
)
return ntwk_list
@property
def networks(self):
"""
Reads a PNAX csv file, and returns a list of one-port Networks.
.. note::
Note this only works if csv is save in Real/Imaginary format for now
Parameters
----------
filename : str
filename
Returns
-------
out : list of :class:`~skrf.network.Network` objects
list of Networks representing the data contained in column pairs
"""
names = self.columns
comments = self.comments
d = self.data
ntwk_list = []
if (self.n_traces)//2 == 0 : # / --> // for Python3 compatibility
# this isn't complex data
return self.scalar_networks
else:
for k in range((self.n_traces)//2):
name = names[k*2+1]
#print(names[k], names[k+1])
if 'db' in names[k].lower() and 'deg' in names[k+1].lower():
s = mf.dbdeg_2_reim(d[:,k*2+1], d[:,k*2+2])
elif 'real' in names[k].lower() and 'imag' in names[k+1].lower():
s = d[:,k*2+1]+1j*d[:,k*2+2]
else:
warn(f'CSV format unrecognized in "{names[k]}" or "{names[k+1]}". '
'It\'s up to you to interpret the resulting network correctly.', stacklevel=2)
s = d[:,k*2+1]+1j*d[:,k*2+2]
ntwk_list.append(
Network(frequency = self.frequency, s=s, name=name,
comments=comments, **self.kwargs)
)
return ntwk_list
@property
def dict(self):
"""
Dictionary representation of csv file.
Returns
-------
dict : dict
"""
return { self.columns[k]:self.data[:,k] \
for k in range(self.n_traces+1)}
@property
def dataframe(self):
"""
Pandas DataFrame representation of csv file.
Returns
-------
df : `pandas.DataFrame`
"""
from pandas import DataFrame, Index
index = Index(
self.frequency.f_scaled,
name = f'Frequency({self.frequency.unit})')
return DataFrame(
{ self.columns[k]:self.data[:,k] \
for k in range(1,self.n_traces+1)},
index=index,
)
def pna_csv_header_split(filename):
"""
Split a Agilent csv file's header into a list
This function is needed because Agilent allows the delimiter
of a csv file (ie `'`) to be present in the header name. ridiculous.
If splitting the header fails, then a suitable list is returned of
the correct length, which looks like
* ['Freq(?)','filename-0','filename-1',..]
Parameters
------------
filename : str
csv filename
Returns
--------
cols : list of str's
list of column names
"""
warn("deprecated", DeprecationWarning, stacklevel=2)
header, comments, d = read_pna_csv(filename)
n_traces = d.shape[1] - 1 # because theres is one frequency column
if header.count(',') == n_traces:
cols = header.split(',') # column names
else:
# the header contains too many delimiters. what loosers. maybe
# we can split it on `)'` instead
if header.count('),') == n_traces:
cols = header.split('),')
# we need to add back the parenthesis we split on to all but
# last columns
cols = [col + ')' for col in cols[:-1]] + [cols[-1]]
else:
# i dont know how to separate column names
warn('Cant decipher header, so im creating one. check output. ', stacklevel=2)
cols = ['Freq(?),']+['%s-%i'%(util.basename_noext(filename),k) \
for k in range(n_traces)]
return cols
[docs]
def pna_csv_2_ntwks(filename):
"""
Reads a PNAX csv file, and returns a list of one-port Networks.
.. deprecated::
Use :func:`pna_csv_2_ntwks3` instead.
.. note::
Note this only works if csv is save in Real/Imaginary format for now
Parameters
----------
filename : str
filename
Returns
-------
out : list of :class:`~skrf.network.Network` objects
list of Networks representing the data contained in column pairs
"""
warn("deprecated", DeprecationWarning, stacklevel=2)
#TODO: check the data's format (Real-imag or db/angle , ..)
header, comments, d = read_pna_csv(filename)
#import pdb;pdb.set_trace()
names = pna_csv_header_split(filename)
ntwk_list = []
if (d.shape[1]-1)/2 == 0 :
# this isn't complex data
f = d[:,0]
if 'db' in header.lower():
s = mf.db_2_mag(d[:,1])
else:
raise (NotImplementedError)
name = os.path.splitext(os.path.basename(filename))[0]
return Network(f=f, s=s, name=name, comments=comments, f_unit='Hz')
else:
for k in range(int((d.shape[1]-1)/2)):
f = d[:,0]
name = names[k]
if 'db' in names[k].lower() and 'deg' in names[k+1].lower():
s = mf.dbdeg_2_reim(d[:,k*2+1], d[:,k*2+2])
elif 'real' in names[k].lower() and 'imag' in names[k+1].lower():
s = d[:,k*2+1]+1j*d[:,k*2+2]
else:
logger.warning("csv format unrecognized. It's up to you to interpret the resultant network correctly.")
s = d[:,k*2+1]+1j*d[:,k*2+2]
ntwk_list.append(
Network(f=f, s=s, name=name, comments=comments, f_unit='Hz')
)
return ntwk_list
def pna_csv_2_freq(filename):
warn("deprecated", DeprecationWarning, stacklevel=2)
header, comments, d = read_pna_csv(filename)
f = d[:,0]
return Frequency.from_f(f, unit = "Hz")
def pna_csv_2_scalar_ntwks(filename, *args, **kwargs):
"""
Reads a PNAX csv file containing scalar traces, returning Networks
Parameters
-----------
filename : str
filename
Returns
--------
out : list of :class:`~skrf.network.Network` objects
list of Networks representing the data contained in column pairs
"""
warn("deprecated", DeprecationWarning, stacklevel=2)
header, comments, d = read_pna_csv(filename)
n_traces = d.shape[1] - 1 # because theres is one frequency column
cols = pna_csv_header_split(filename)
f = d[:,0]
freq = Frequency.from_f(f, unit = 'Hz')
# loop through columns and create a single network for each column
ntwk_list = []
for k in range(1,n_traces+1):
s = d[:,k]
if 'db' in cols[k].lower():
s = mf.db_2_mag(s)
ntwk_list.append(
Network(
frequency = freq, s = s,comments = comments,
name = cols[k], **kwargs)
)
return ntwk_list
[docs]
def read_zva_dat(filename, *args, **kwargs):
r"""
Reads data from a dat file written by a R&S ZVA in dB/deg or re/im format.
This function returns a triplet containing header, comments and data.
Parameters
----------
filename : str
the file
\*args, \*\*kwargs :
Returns
-------
header : str
The header string, which is the line following the 'BEGIN'
data : :class:`numpy.ndarray`
An array containing the data. The meaning of which depends on
the header.
"""
#warn("deprecated", DeprecationWarning)
with open(filename) as fid:
begin_line = -2
comments = ''
for k,line in enumerate(fid.readlines()):
if line.startswith('%'):
comments += line[1:]
header = line
begin_line = k+1
data = np.genfromtxt(
filename,
delimiter = ',',
skip_header = begin_line,
**kwargs
)
return header, comments, data
[docs]
def zva_dat_2_ntwks(filename):
"""
Read a dat file exported from a R&S ZVA in dB/deg or re/im format.
Parameters
----------
filename : str
full path or filename
Returns
-------
out : n
2-Port Network
"""
header, comments, d = read_zva_dat(filename)
col_headers = header.split(',')
# set impedance to 50 Ohm (doesn't matter for now)
z0 = np.ones(np.shape(d)[0])*50
# read f values, convert to GHz
f = d[:,0]/1e9
name = os.path.splitext(os.path.basename(filename))[0]
if 're' in header.lower() and 'im' in header.lower():
# this is a cvs in re/im format
# -> no conversion required
s = np.zeros((len(f),2,2), dtype=complex)
for k, h in enumerate(col_headers):
if 's11' in h.lower() and 're' in h.lower():
s[:,0,0] = d[:,k] + 1j*d[:,k+1]
elif 's21' in h.lower() and 're' in h.lower():
s[:,1,0] = d[:,k] + 1j*d[:,k+1]
elif 's12' in h.lower() and 're' in h.lower():
s[:,0,1] = d[:,k+1] #+ 1j*d[:,k+2]
elif 's22' in h.lower() and 're' in h.lower():
s[:,1,1] = d[:,k+1] #+ 1j*d[:,k+2]
elif 'db' in header.lower() and "deg" not in header.lower():
# this is a cvs in db format (no deg values)
# -> conversion required
s = np.zeros((len(f),2,2), dtype=complex)
for k, h in enumerate(col_headers):
# this doesn't always work! (depends on no. of channels, sequence of adding traces etc.
# -> Needs changing!
if 's11' in h.lower() and 'db' in h.lower():
s[:,0,0] = mf.dbdeg_2_reim(d[:,k], d[:,k+2])
elif 's21' in h.lower() and 'db' in h.lower():
s[:,1,0] = mf.dbdeg_2_reim(d[:,k], d[:,k+2])
n = Network(f=f,s=s,z0=z0, name = name)
return n
else:
warn("File does not seem to be formatted properly (dB/deg or re/im)", stacklevel=2)
[docs]
def read_all_zva_dat(dir='.', contains = None):
"""
Read all DAT files in a directory (from R&S ZVA).
Parameters
----------
dir : str, optional
the directory to load from, default \'.\'
contains : str, optional
if not None, only files containing this substring will be loaded
Returns
-------
out : dictionary
dictionary containing all loaded DAT objects. keys are the
filenames without extensions, and the values are the objects
"""
out={}
for filename in os.listdir(dir):
if contains is not None and contains not in filename:
continue
fullname = os.path.join(dir,filename)
keyname = os.path.splitext(filename)[0]
try:
out[keyname] = zva_dat_2_ntwks(fullname)
continue
except Exception:
pass
try:
out[keyname] = Network(fullname)
continue
except Exception:
pass
return out
[docs]
def read_vectorstar_csv(filename, *args, **kwargs):
r"""
Reads data from a csv file written by an Anritsu VectorStar.
Parameters
----------
filename : str
the file
\*args, \*\*kwargs :
Returns
-------
header : str
The header string, which is the line just before the data
comments : str
All lines that begin with a '!'
data : :class:`numpy.ndarray`
An array containing the data. The meaning of which depends on
the header.
"""
with open(filename) as fid:
comments = ''.join([line for line in fid if line.startswith('!')])
fid.seek(0)
header = [line for line in fid if line.startswith('PNT')]
fid.close()
data = np.genfromtxt(
filename,
comments='!',
delimiter =',',
skip_header = 1)[1:]
comments = comments.replace('\r','')
comments = comments.replace('!','')
return header, comments, data
[docs]
def vectorstar_csv_2_ntwks(filename):
"""
Reads a vectorstar csv file, and returns a list of one-port Networks.
.. note::
Note this only works if csv is save in Real/Imaginary format for now
Parameters
----------
filename : str
filename
Returns
-------
out : list of :class:`~skrf.network.Network` objects
list of Networks representing the data contained in column pairs
"""
#TODO: check the data's format (Real-imag or db/angle , ..)
header, comments, d = read_vectorstar_csv(filename)
names = [line for line in comments.split('\n') \
if line.startswith('PARAMETER')][0].split(',')[1:]
return [Network(
f = d[:,k*3+1],
s = d[:,k*3+2] + 1j*d[:,k*3+3],
z0 = 50,
name = names[k].rstrip(),
comments = comments,
) for k in range(d.shape[1]/3)]