Skip to content
Snippets Groups Projects

code refactor

Merged Florian Obersteiner requested to merge (removed):master into master
Files
3
+ 132
134
@@ -4,31 +4,33 @@ import pathlib
import collections
import re
import warnings
from enum import IntEnum
import numpy as np
from enum import Enum, IntEnum
DEFAULT_NUM_FORMAT = "%f"
DEFAULT_FIELD_DELIM = ", "
class Formats(IntEnum): # TODO: FormatIndex
"""File Format Indicators (FFI)""" # TODO: File Format Indices
class Formats(IntEnum):
"""File Format Indices (FFI)"""
FFI1001 = 1001
FFI2110 = 2110
class VariableType(Enum): # TODO: could use IntEnum here as well
class VariableType(IntEnum):
IndependentVariable = 1
IndependentBoundedVariable = 2
AuxiliaryVariable = 3
DependentVariable = 4
def sanitize(val, miss):
# TODO: should not use == for float comparison instead, use np.isclose
return float(val) if not float(val) == float(miss) else np.NaN
def vmiss_to_npnan(val, miss):
"""converts value to np.nan if is (almost) equal to miss"""
val, miss = map(float, (val, miss))
return np.NaN if np.isclose(val, miss) else val
class DataStore1001:
@@ -64,21 +66,20 @@ class DataStore1001:
if not self.ivarname in kwargs.keys():
raise Exception("Need independent variable data.")
ivarvalue = sanitize(kwargs[self.ivarname], self.missvals[self.ivarname])
ivarvalue = vmiss_to_npnan(kwargs[self.ivarname], self.missvals[self.ivarname])
newline = np.array(np.NaN, dtype=[(v, "f8") for v in self.varnames])
for key in kwargs.keys():
for key in kwargs:
if key in self.varnames:
newline[key] = sanitize(kwargs[key], self.missvals[key])
newline[key] = vmiss_to_npnan(kwargs[key], self.missvals[key])
if type(self.data) is type(None): # TODO: None is the sole instance of None ;-) => can use "if self.data is None"
if self.data is None:
self.data = newline
self.data = self.data.reshape(1) # don't even ask
else:
if ivarvalue in self.data[self.ivarname]:
raise Exception("Cannot replace data (yet).")
else:
self.data = np.append(self.data, newline)
self.data = np.append(self.data, newline)
def denanify(self, d):
dd = d.copy()
@@ -125,7 +126,7 @@ class DataStore2110(collections.UserDict):
self.add(**newdata)
def addBulkDep(self, ivar, raw):
nlines, nvars = raw.shape # nvars not used
nlines, _ = raw.shape # _ : nvars not used
self._addDeplines(ivar, raw, nlines)
def _addDeplines(self, ivar, raw, n):
@@ -142,7 +143,7 @@ class DataStore2110(collections.UserDict):
def _addBulk(self, raw, n):
cur = 0
while cur < n:
ivarvalue = sanitize(raw[cur][0], self.missvals[self.ivarname])
ivarvalue = vmiss_to_npnan(raw[cur][0], self.missvals[self.ivarname])
self._addAuxline(raw[cur])
cur += 1
@@ -160,10 +161,10 @@ class DataStore2110(collections.UserDict):
if not self.ivarname in kwargs.keys():
raise Exception("Need independent variable data.")
ivarvalue = sanitize(kwargs[self.ivarname], self.missvals[self.ivarname])
ivarvalue = vmiss_to_npnan(kwargs[self.ivarname], self.missvals[self.ivarname])
# this is an AUX line
if any([x in self.auxvarnames for x in kwargs.keys()]): # TODO: any (all as well) do not require a comprehension; pure generator expression works fine
if any(x in self.auxvarnames for x in kwargs):
# and we create the whole dataset if needed
if not ivarvalue in self.data.keys():
self.data[ivarvalue] = {
@@ -173,7 +174,7 @@ class DataStore2110(collections.UserDict):
self.data[ivarvalue]["AUX"].add(**kwargs)
# this is a DEP line
if any([x in self.dvarnames for x in kwargs.keys()]): # TODO: see above: remove [ ]
if any(x in self.dvarnames for x in kwargs):
if not self.ibvarname in kwargs.keys():
raise Exception("Need independent (bounded) variable data.")
@@ -200,17 +201,18 @@ class KeywordComment:
self.data.append(data)
def __str__(self):
d = "\n".join(self.data) if not self.data is [] else "N/A" # TODO: use implicit boolean instead of "is []"
d = "\n".join(self.data) if self.data else "N/A"
return self.key + ": " + d
class StandardNormalComments(collections.UserList):
@property
def nlines(self):
# "+ 1" -> shortnames line, and keywords might be multiline...
return (
len(self.freeform) + 1 + sum([len(k.data) for k in self.keywords.values()])
)
n = 1 # shortnames line, and keywords might be multiline...
n += len(self.freeform)
n += sum(len(k.data) or 1 for k in self.keywords.values())
# was: len(self.freeform) + 1 + sum([len(k.data) for k in self.keywords.values()])
return n
@property
def data(self):
@@ -251,7 +253,7 @@ class StandardNormalComments(collections.UserList):
for key in self.keywords:
if self.keywords[key].data == []:
warnings.warn(
"Normal comments: required keyword {:s} is missing.".format(key) # TODO: in genaral: use f-strings
f"Normal comments: required keyword {str(key)} is missing."
)
def __init__(self):
@@ -308,7 +310,7 @@ class Variable:
:type miss: float, defaults to -99999.0
"""
def desc(self, splitChar=","):
def desc(self, splitChar=", "):
"""Variable description string as it appears in an ICARTT file
:return: description string
@@ -321,15 +323,15 @@ class Variable:
descstr += [str(self.longname)]
return splitChar.join(descstr)
def isValidVariablename(self, name):
def isValidVariablename(self, name): # TODO: this could be a 'utils' function
# ICARTT Standard v2 2.1.1 2)
# Variable short names and variable standard names:
# Uppercase and lowercase ASCII alphanumeric characters
# and underscores.
def isAsciiAlphaOrUnderscore(x):
def isAsciiAlphaOrUnderscore(x): # TODO: this could be a 'utils' function
return re.match("[a-zA-Z0-9_]", x)
allAreAlphaOrUnderscore = all([isAsciiAlphaOrUnderscore(x) for x in name])
allAreAlphaOrUnderscore = all(isAsciiAlphaOrUnderscore(x) for x in name)
# The first character must be a letter,
firstIsAlpha = bool(re.match("[a-zA-Z]", name[0]))
# and the name can be at most 31 characters in length.
@@ -350,9 +352,7 @@ class Variable:
"""Constructor method"""
if not self.isValidVariablename(shortname):
warnings.warn(
"Variable short name {:s} does not comply with ICARTT standard v2".format(
shortname
)
f"Variable short name {str(shortname)} does not comply with ICARTT standard v2"
)
self.shortname = shortname
@@ -363,10 +363,12 @@ class Variable:
self.scale = scale
self.miss = miss
# TODO: should this also get a __str__ ?
def __repr__(self):
# TODO: this sould be something else than __str__ ?
return self.desc()
def __repr__(self): # TODO: this could be more meaningful?
return "ICARTT Variable description"
def __str__(self):
return self.desc()
class Dataset:
@@ -385,8 +387,6 @@ class Dataset:
:param format:
"""
# TODO: should this also get a __str__ and a __repr__ ?
@property
def nHeader(self):
"""Header line count
@@ -421,9 +421,10 @@ class Dataset:
:return: list of time steps
:rtype: list
"""
# TODO: this method currently does not work at all.
# suggest to also change to return a numpy array of numpy.datetime64
# for consistency with other data output
return [
# TODO see commit f5208db0 - this will do unexpected things if
# self.dateOfCollection is a naive datetime object (think DST transitions...)
self.dateOfCollection + datetime.timedelta(seconds=x)
for x in self.independentVariable
]
@@ -435,22 +436,22 @@ class Dataset:
:return: dictionary of all variables
:rtype: dict of Variable(s)
"""
vars = {} # TODO rename - "vars" is a Python built-in
if not self.independentVariable is None: # TODO use "is not None" for readbility
vars[self.independentVariable.shortname] = self.independentVariable
if not self.independentBoundedVariable is None: # TODO use "is not None" for readbility
vars[
variables = {}
if self.independentVariable is not None:
variables[self.independentVariable.shortname] = self.independentVariable
if self.independentBoundedVariable is not None:
variables[
self.independentBoundedVariable.shortname
] = self.independentBoundedVariable
vars = {**vars, **self.dependentVariables, **self.auxiliaryVariables}
variables = {**variables, **self.dependentVariables, **self.auxiliaryVariables}
return vars
return variables
def readHeader(self, splitChar=","):
"""Read the ICARTT header (from file)"""
class FilehandleWithLinecounter:
class FilehandleWithLinecounter: # TODO: this could be a 'utils' class
def __init__(self, f, splitChar):
self.f = f
self.line = 0
@@ -463,20 +464,12 @@ class Dataset:
dmp = [word.strip(" ") for word in dmp.split(self.splitChar)]
return dmp
if self.inputFhandle.closed:
self.inputFhandle = open(self.inputFhandle.name, encoding='utf-8')
if self.inputFhandle:
if self.inputFhandle.closed:
self.inputFhandle = open(self.inputFhandle.name, encoding='utf-8')
# TODO: refactor following try/except statement
# this one is challenging since we need an "unpopulated" file pointer if no file was specified :)
try:
f = FilehandleWithLinecounter(self.inputFhandle, splitChar)
self._readHeader(f)
del f
except:
a = 1
finally:
self.inputFhandle.close()
def _readHeader(self, f):
@@ -486,15 +479,10 @@ class Dataset:
nHeaderSuggested = int(dmp[0])
# TODO: refactor following try/except statement
try:
self.format = Formats(int(dmp[1]))
except:
raise ValueError("ICARTT format {:d} not implemented".format(dmp[1]))
# TODO except clause could be re-written like
# except ValueError as ve:
# raise ValueError(f"ICARTT format {dmp[1]:d} not implemented") from ve
# to show the error trace
except ValueError as ve:
raise NotImplementedError(f"ICARTT format {dmp[1]} not implemented") from ve
if len(dmp) > 2:
self.version = dmp[2]
@@ -522,12 +510,11 @@ class Dataset:
# line 7 - UTC date when data begin, UTC date of data reduction or revision
# - comma delimited (yyyy, mm, dd, yyyy, mm, dd).
dmp = f.readline()
self.dateOfCollection = datetime.datetime.strptime(
"".join(["{:s}".format(x) for x in dmp[0:3]]), "%Y%m%d"
)
self.dateOfRevision = datetime.datetime.strptime(
"".join(["{:s}".format(x) for x in dmp[3:6]]), "%Y%m%d"
)
self.dateOfCollection = datetime.datetime(*map(int, dmp[:3]))
self.dateOfRevision = datetime.datetime(*map(int, dmp[3:6]))
# TODO: we should either use aware datetime (UTC), date objects or
# numpy.datetime64 here to avoid some 'rough edges' of Python's datetime library...
# line 8 - Data Interval (This value describes the time spacing (in seconds)
# between consecutive data records. It is the (constant) interval between
@@ -547,8 +534,8 @@ class Dataset:
# here that the independent variable should monotonically increase even when
# crossing over to a second day.
def extractVardesc(dmp):
shortname = dmp[0]
def extractVardesc(dmp): # TODO: could be a 'utils' function or one line,
shortname = dmp[0] # shortname, units, standardname, longname, *_ = dmp + [None] * 3
units = dmp[1]
standardname = dmp[2] if len(dmp) > 2 else None
longname = dmp[3] if len(dmp) > 3 else None
@@ -613,8 +600,11 @@ class Dataset:
vstandardname += [standardname]
vlongname += [longname]
return { # TODO: refactor dict comp for readability?
shortname: Variable(
d = {}
for shortname, unit, standardname, longname, scale, miss in zip(
vshortname, vunits, vstandardname, vlongname, vscale, vmiss
):
d[shortname] = Variable(
shortname,
unit,
standardname,
@@ -623,10 +613,10 @@ class Dataset:
miss=miss,
vartype=vtype,
)
for shortname, unit, standardname, longname, scale, miss in zip(
vshortname, vunits, vstandardname, vlongname, vscale, vmiss
)
}
return d
self.dependentVariables = readVars(f, VariableType.DependentVariable)
@@ -663,39 +653,33 @@ class Dataset:
# this section of the header along with the relevant data listed after the
# colon. For key words where information is not needed or applicable, simply
# enter N/A.).
rawNcom = [f.readline(doSplit=False) for i in range(0, nncom)]
rawNcom = [f.readline(doSplit=False) for _ in range(nncom)]
self.normalComments.ingest(rawNcom)
self.nHeaderFile = f.line
if self.nHeader != nHeaderSuggested:
warnings.warn(
"Number of header lines suggested in line 1 ({:d}) do not match actual header lines read ({:d})".format(
nHeaderSuggested, self.nHeader
)
f"Number of header lines suggested in line 1 ({int(nHeaderSuggested)}) do not match actual header lines read ({int(self.nHeader)})"
)
def readData(self, splitChar=","):
"""Read ICARTT data (from file)"""
if self.inputFhandle.closed:
self.inputFhandle = open(self.inputFhandle.name, encoding='utf-8')
if self.inputFhandle:
if self.inputFhandle.closed:
self.inputFhandle = open(self.inputFhandle.name, encoding='utf-8')
# TODO: refactor following try/except statement
try:
nul = [self.inputFhandle.readline() for i in range(self.nHeaderFile)]
for _ in range(self.nHeaderFile):
self.inputFhandle.readline()
raw = [line.split(splitChar) for line in self.inputFhandle]
nul = self.data.addBulkFromTxt(raw)
except:
pass
finally:
self.data.addBulkFromTxt(raw)
self.inputFhandle.close()
def read(self, splitChar=","):
"""Read ICARTT data and header"""
self.readHeader(splitChar)
self.endDefineMode(splitChar) # TODO: endDefineMode does not take arg splitChar
self.endDefineMode()
self.readData(splitChar)
def makeFileName(self, dateFormat="%Y%m%d"):
@@ -724,19 +708,19 @@ class Dataset:
return fn + ".ict"
def isValidFileName(self, name):
def isValidFileName(self, name): # TODO: this could be a 'utils' function
# ICARTT standard v2 2.1.1 3)
# Filename: Uppercase and lowercase ASCII alphanumeric
# characters (i.e. A-Z, a-z, 0-9), underscore, period,
# and hyphen. File names can be a maximum 127
# characters in length.
def isAsciiAlpha(x):
def isAsciiAlpha(x): # TODO: this could be a 'utils' function
return re.match("[a-zA-Z0-9-_.]", x)
allAsciiAlpha = all([isAsciiAlpha(x) for x in name])
allAsciiAlpha = all(isAsciiAlpha(x) for x in name)
lessThan128Characters = len(name) < 128
return allAsciiAlpha and lessThan128Characters
return allAsciiAlpha and lessThan128Characters and name.endswith('.ict')
def writeHeader(self, f=sys.stdout, delimiter=DEFAULT_FIELD_DELIM):
"""Write header
@@ -745,7 +729,7 @@ class Dataset:
:type f: file handle or StringIO stream, defaults to sys.stdout
"""
def prnt(txt): # TODO: rename? it writes to file, so "write_to_file"?
def write_to_file(txt):
f.write(str(txt) + "\n")
# Number of lines in header, file format index (most files use 1001) - comma delimited.
@@ -754,87 +738,90 @@ class Dataset:
versInfo.append(self.version)
txt = delimiter.join([str(x) for x in versInfo])
prnt(txt)
write_to_file(txt)
# PI last name, first name/initial.
prnt(self.PIName)
write_to_file(self.PIName)
# Organization/affiliation of PI.
prnt(self.PIAffiliation)
write_to_file(self.PIAffiliation)
# Data source description (e.g., instrument name, platform name, model name, etc.).
prnt(self.dataSourceDescription)
write_to_file(self.dataSourceDescription)
# Mission name (usually the mission acronym).
prnt(self.missionName)
write_to_file(self.missionName)
# File volume number, number of file volumes (these integer values are used when the data require more than one file per day; for data that require only one file these values are set to 1, 1) - comma delimited.
prnt(
write_to_file(
delimiter.join(
[str(self.fileVolumeNumber), str(self.totalNumberOfFileVolumes)]
)
)
# UTC date when data begin, UTC date of data reduction or revision - comma delimited (yyyy, mm, dd, yyyy, mm, dd).
prnt(
write_to_file(
delimiter.join(
[
# TODO: if we use anything other than datetime.datetime, we'll have to ensure this still works...
datetime.datetime.strftime(x, delimiter.join(["%Y", "%m", "%d"]))
for x in [self.dateOfCollection, self.dateOfRevision]
]
)
)
# Data Interval (This value describes the time spacing (in seconds) between consecutive data records. It is the (constant) interval between values of the independent variable. For 1 Hz data the data interval value is 1 and for 10 Hz data the value is 0.1. All intervals longer than 1 second must be reported as Start and Stop times, and the Data Interval value is set to 0. The Mid-point time is required when it is not at the average of Start and Stop times. For additional information see Section 2.5 below.).
prnt(delimiter.join([str(x) for x in self.dataIntervalCode]))
write_to_file(delimiter.join([str(x) for x in self.dataIntervalCode]))
if self.format == Formats.FFI2110:
# Description or name of independent (bound) variable (This is the name chosen for the start time. It always refers to the number of seconds UTC from the start of the day on which measurements began. It should be noted here that the independent variable should monotonically increase even when crossing over to a second day.).
prnt(self.independentBoundedVariable.desc(delimiter))
write_to_file(self.independentBoundedVariable.desc(delimiter))
# Description or name of independent variable (This is the name chosen for the start time. It always refers to the number of seconds UTC from the start of the day on which measurements began. It should be noted here that the independent variable should monotonically increase even when crossing over to a second day.).
prnt(self.independentVariable.desc(delimiter))
write_to_file(self.independentVariable.desc(delimiter))
# Number of variables (Integer value showing the number of dependent variables: the total number of columns of data is this value plus one.).
prnt(len(self.dependentVariables))
write_to_file(len(self.dependentVariables))
# Scale factors (1 for most cases, except where grossly inconvenient) - comma delimited.
prnt(
write_to_file(
delimiter.join(
[str(DVAR.scale) for DVAR in self.dependentVariables.values()]
)
)
# Missing data indicators (This is -9999 (or -99999, etc.) for any missing data condition, except for the main time (independent) variable which is never missing) - comma delimited.
prnt(
write_to_file(
delimiter.join(
[str(DVAR.miss) for DVAR in self.dependentVariables.values()]
)
)
# Variable names and units (Short variable name and units are required, and optional long descriptive name, in that order, and separated by commas. If the variable is unitless, enter the keyword "none" for its units. Each short variable name and units (and optional long name) are entered on one line. The short variable name must correspond exactly to the name used for that variable as a column header, i.e., the last header line prior to start of data.).
_ = [prnt(DVAR.desc(delimiter)) for DVAR in self.dependentVariables.values()]
for DVAR in self.dependentVariables.values():
write_to_file(DVAR.desc(delimiter))
if self.format == Formats.FFI2110:
# Number of variables (Integer value showing the number of dependent variables: the total number of columns of data is this value plus one.).
prnt(len(self.auxiliaryVariables))
write_to_file(len(self.auxiliaryVariables))
# Scale factors (1 for most cases, except where grossly inconvenient) - comma delimited.
prnt(
write_to_file(
delimiter.join(
[str(AUXVAR.scale) for AUXVAR in self.auxiliaryVariables.values()]
)
)
# Missing data indicators (This is -9999 (or -99999, etc.) for any missing data condition, except for the main time (independent) variable which is never missing) - comma delimited.
prnt(
write_to_file(
delimiter.join(
[str(AUXVAR.miss) for AUXVAR in self.auxiliaryVariables.values()]
)
)
# Variable names and units (Short variable name and units are required, and optional long descriptive name, in that order, and separated by commas. If the variable is unitless, enter the keyword "none" for its units. Each short variable name and units (and optional long name) are entered on one line. The short variable name must correspond exactly to the name used for that variable as a column header, i.e., the last header line prior to start of data.).
_ = [
prnt(AUXVAR.desc(delimiter))
for AUXVAR in self.auxiliaryVariables.values()
]
for AUXVAR in self.auxiliaryVariables.values():
write_to_file(AUXVAR.desc(delimiter))
# Number of SPECIAL comment lines (Integer value indicating the number of lines of special comments, NOT including this line.).
prnt("{:d}".format(len(self.specialComments)))
write_to_file(f"{len(self.specialComments)}")
# Special comments (Notes of problems or special circumstances unique to this file. An example would be comments/problems associated with a particular flight.).
_ = [prnt(x) for x in self.specialComments]
for x in self.specialComments:
write_to_file(x)
# Number of Normal comments (i.e., number of additional lines of SUPPORTING information: Integer value indicating the number of lines of additional information, NOT including this line.).
prnt("{:d}".format(self.normalComments.nlines))
write_to_file(f"{self.normalComments.nlines}")
# Normal comments (SUPPORTING information: This is the place for investigators to more completely describe the data and measurement parameters. The supporting information structure is described below as a list of key word: value pairs. Specifically include here information on the platform used, the geo-location of data, measurement technique, and data revision comments. Note the non-optional information regarding uncertainty, the upper limit of detection (ULOD) and the lower limit of detection (LLOD) for each measured variable. The ULOD and LLOD are the values, in the same units as the measurements that correspond to the flags -7777s and -8888s within the data, respectively. The last line of this section should contain all the short variable names on one line. The key words in this section are written in BOLD below and must appear in this section of the header along with the relevant data listed after the colon. For key words where information is not needed or applicable, simply enter N/A.).
# re-create last line out of actual data if missing...
if self.normalComments.shortnames == []:
if not self.normalComments.shortnames:
self.normalComments.shortnames = delimiter.join(
[self.variables[x].shortname for x in self.variables]
)
_ = [prnt(x) for x in self.normalComments]
for x in self.normalComments:
write_to_file(x)
def writeData(
self, f=sys.stdout, fmt=DEFAULT_NUM_FORMAT, delimiter=DEFAULT_FIELD_DELIM
@@ -876,14 +863,20 @@ class Dataset:
)
def __del__(self):
# TODO: refactor following try/except statement
try:
if self.inputFhandle:
if not self.inputFhandle.closed:
self.inputFhandle.close()
except:
pass
def __init__(self, f=None, loadData=True, splitChar=",", format=Formats.FFI1001): # TODO: why is init comming last?
def __repr__(self):
# TODO: this could be more meaningful
return "ICARTT Dataset object repr"
def __str__(self):
# TODO: this could be more meaningful
return "ICARTT Dataset string representation"
# TODO: why is init comming last?
def __init__(self, f=None, loadData=True, splitChar=",", format=Formats.FFI1001):
"""Constructor method"""
self.format = format
self.version = None
@@ -901,8 +894,10 @@ class Dataset:
self.dataSourceDescription = "Musterdatenprodukt"
self.missionName = "MUSTEREX"
# TODO: not 100% sure if this is relevant here, but those dates should refer to UTC
# - if you leave the datetime objects naive (no tzinfo set), they represent local time.
# TODO: see also comment on _readData method.
# we should either use aware datetime (UTC), date objects or
# numpy.datetime64 here to avoid some 'rough edges' of Python's
# datetime library...
self.dateOfCollection = datetime.datetime.today()
self.dateOfRevision = datetime.datetime.today()
self.dataIntervalCode = [0.0]
@@ -923,6 +918,8 @@ class Dataset:
self.defineMode = True
self.inputFhandle = None
# read data if f is not None
if f is not None:
if isinstance(f, (str, pathlib.Path)):
@@ -930,9 +927,10 @@ class Dataset:
else:
self.inputFhandle = f
# TODO should we add a filename characters check, as we have it for the variable names?
if not self.isValidFileName(pathlib.Path(f).name):
warnings.warn(f"{pathlib.Path(f).name} is not a valid ICARTT filename")
self.readHeader(splitChar)
if loadData:
self.endDefineMode()
self.readData(splitChar)
\ No newline at end of file
self.readData(splitChar)
Loading