import unittest import pathlib import io import datetime import numpy as np import icartt # working directory, example files wd = pathlib.Path(__file__).parent def compareFiles(fn, strIn, strOut, skiplines=0, nlines=-1): # pragma: no cover strOut.seek(0) strIn.seek(0) content_in = strIn.readlines() content_out = strOut.readlines() strIn.close() strOut.close() if nlines > 0: content_in = content_in[skiplines : (skiplines + nlines)] content_out = content_out[skiplines : (skiplines + nlines)] else: content_in = content_in[skiplines:] content_out = content_out[skiplines:] if not len(content_in) == len(content_out): return False for inline, outline in zip(content_in, content_out): inline = inline.strip().replace(" ", "") outline = outline.strip().replace(" ", "") if not inline == outline: valid_data_line = False # maybe this is a data line in which we only have different number formatting? # compare as floats # try: insteps = [float(x) for x in inline.split(",")] outsteps = [float(x) for x in outline.split(",")] if len(insteps) == len(outsteps): valid_data_line = True for i in range(len(insteps)): valid_data_line = valid_data_line and insteps[i] == outsteps[i] # except: # pass valid_var_line = False # try: insteps = [x.strip() for x in inline.split(",")] outsteps = [x.strip() for x in outline.split(",")] if len(insteps) == 2 and len(outsteps) == 3: valid_var_line = ( insteps[0] == outsteps[0] and insteps[1] == outsteps[1] and insteps[1] == outsteps[2] ) # except: # pass if not valid_data_line and not valid_var_line: print(f"{str(fn)}: line {i:d} differs:") print(f" input: {inline}") print(f" output: {outline}") return False return True class Simple1001TestCase(unittest.TestCase): def setUp(self): self.fn = wd / "example_data" / "expect_ok" / "NOx_RHBrown_20040830_R0.ict" self.nHeader = 41 def tearDown(self): pass def testOpen(self): ict = icartt.Dataset(self.fn, loadData=False) self.assertEqual(type(ict), icartt.Dataset) def testFormat(self): ict = icartt.Dataset(self.fn, loadData=False) self.assertEqual(ict.format, icartt.Formats.FFI1001) def testN(self): ict = icartt.Dataset(self.fn, loadData=False) self.assertEqual(ict.nHeader, self.nHeader) self.assertEqual(len(ict.dependentVariables), 9) self.assertEqual(len(ict.normalComments), 18) self.assertEqual(len(ict.specialComments), 0) def testIvar(self): ict = icartt.Dataset(self.fn, loadData=False) self.assertEqual(ict.independentVariable.shortname, "Start_UTC") self.assertEqual(ict.independentVariable.units, "seconds") self.assertEqual( ict.independentVariable.standardname, "number_of_seconds_from_0000_UTC" ) self.assertEqual(ict.independentVariable.longname, None) self.assertEqual(ict.independentVariable.scale, 1.0) self.assertEqual(ict.independentVariable.miss, -99999.0) def testDvar(self): ict = icartt.Dataset(self.fn, loadData=False) self.assertEqual( [DVAR.shortname for DVAR in ict.dependentVariables.values()], [ "Stop_UTC", "Mid_UTC", "DLat", "DLon", "Elev", "NO_ppbv", "NO_1sig", "NO2_ppbv", "NO2_1sig", ], ) self.assertEqual( [DVAR.units for DVAR in ict.dependentVariables.values()], [ "seconds", "seconds", "deg_N", "deg_E", "meters", "ppbv", "ppbv", "ppbv", "ppbv", ], ) self.assertEqual( [DVAR.standardname for DVAR in ict.dependentVariables.values()], [None, None, None, None, None, None, None, None, None], ) self.assertEqual( [DVAR.longname for DVAR in ict.dependentVariables.values()], [None, None, None, None, None, None, None, None, None], ) self.assertEqual( [DVAR.scale for DVAR in ict.dependentVariables.values()], ["1", "1", "1", "1", "1", "1", "1", "1", "1"], ) self.assertEqual( [DVAR.miss for DVAR in ict.dependentVariables.values()], [ "-9999", "-9999", "-9999", "-9999", "-9999", "-9999", "-9999", "-9999", "-9999", ], ) def testNCOM(self): ict = icartt.Dataset(self.fn, loadData=False) self.assertEqual( ict.normalComments.keywords["PI_CONTACT_INFO"].data, [ "325 Broadway, Boulder, CO 80305; 303-497-3226; email:eric.j.williams@noaa.gov" ], ) self.assertEqual( ict.normalComments.keywords["PLATFORM"].data, ["NOAA research vessel Ronald H. Brown"], ) self.assertEqual( ict.normalComments.keywords["LOCATION"].data, ["Latitude, longitude and elevation data are included in the data records"], ) self.assertEqual(ict.normalComments.keywords["ASSOCIATED_DATA"].data, ["N/A"]) self.assertEqual( ict.normalComments.keywords["INSTRUMENT_INFO"].data, ["NO: chemiluminescence; NO2: narrow-band photolysis/chemiluminescence"], ) self.assertEqual( ict.normalComments.keywords["DATA_INFO"].data, [ "All data with the exception of the location data are in ppbv. All oneminute averages contain at least 35 seconds of data, otherwise missing." ], ) self.assertEqual( ict.normalComments.keywords["UNCERTAINTY"].data, ["included in the data records as variables with a _1sig suffix"], ) self.assertEqual(ict.normalComments.keywords["ULOD_FLAG"].data, ["-7777"]) self.assertEqual(ict.normalComments.keywords["ULOD_VALUE"].data, ["N/A"]) self.assertEqual(ict.normalComments.keywords["LLOD_FLAG"].data, ["-8888"]) self.assertEqual( ict.normalComments.keywords["LLOD_VALUE"].data, ["N/A, N/A, N/A, N/A, N/A, 0.005, N/A, 0.025, N/A"], ) self.assertEqual(ict.normalComments.keywords["DM_CONTACT_INFO"].data, ["N/A"]) self.assertEqual( ict.normalComments.keywords["PROJECT_INFO"].data, [ "ICARTT study; 1 July-15 August 2004; Gulf of Maine and North Atlantic Ocean" ], ) self.assertEqual( ict.normalComments.keywords["STIPULATIONS_ON_USE"].data, ["Use of these data requires PRIOR OK from the PI"], ) self.assertEqual(ict.normalComments.keywords["OTHER_COMMENTS"].data, ["N/A"]) def testReadData(self): ict = icartt.Dataset(self.fn, loadData=True) self.assertEqual(type(ict), icartt.Dataset) def testWriteHeader(self): ict = icartt.Dataset(self.fn, loadData=False) strIn = open(self.fn) strOut = io.StringIO() ict.writeHeader(f=strOut) self.assertTrue(compareFiles(self.fn, strIn, strOut, nlines=self.nHeader)) def testWriteData(self): ict = icartt.Dataset(self.fn, loadData=True) strIn = open(self.fn) strOut = io.StringIO() ict.write(f=strOut) self.assertTrue(compareFiles(self.fn, strIn, strOut, skiplines=self.nHeader)) def testWrite(self): ict = icartt.Dataset(self.fn, loadData=True) strIn = open(self.fn) strOut = io.StringIO() ict.write(f=strOut) self.assertTrue(compareFiles(self.fn, strIn, strOut)) class Create1001TestCase(unittest.TestCase): def testCreateDs(self): now = datetime.datetime.today() ict = icartt.Dataset(format=icartt.Formats.FFI1001) ict.PIName = "Knote, Christoph" ict.PIAffiliation = "Faculty of Medicine, University Augsburg, Germany" ict.dataSourceDescription = "Example data" ict.missionName = "MBEES" ict.dateOfCollection = now.timetuple()[:3] ict.dateOfRevision = now.timetuple()[:3] ict.dataIntervalCode = [0] ict.independentVariable = icartt.Variable( "Time_Start", "seconds_from_0_hours_on_valid_date", "Time_Start", "Time_Start", vartype=icartt.VariableType.IndependentVariable, scale=1.0, miss=-9999999, ) ict.dependentVariables["Time_Stop"] = icartt.Variable( "Time_Stop", "seconds_from_0_hours_on_valid_date", "Time_Stop", "Time_Stop", scale=1.0, miss=-9999999, ) ict.dependentVariables["Payload"] = icartt.Variable( "Payload", "some_units", "Payload", "Payload", scale=1.0, miss=-9999999 ) ict.specialComments.append("Some comments on this dataset:") ict.specialComments.append("They are just examples!") ict.specialComments.append("Adapt as needed.") # we can just use len of the list to check number of comments self.assertEqual(len(ict.specialComments), 3) # let's define some normal comments... 21 lines ncom = { "PI_CONTACT_INFO": "PI1 pi-email@mail.com\nPI2 more-email@what.com", "PLATFORM": "a platform", "LOCATION": "somewhere", "ASSOCIATED_DATA": "met sensor data", "INSTRUMENT_INFO": "super cool instrument", "DATA_INFO": f"icartt Python package version: {icartt.__version__}", "UNCERTAINTY": "not much", "ULOD_FLAG": "-7777", "ULOD_VALUE": "N/A", "LLOD_FLAG": "-8888", "LLOD_VALUE": "N/A", "DM_CONTACT_INFO": "datamanager@mail.edu", "PROJECT_INFO": "the campaign", "STIPULATIONS_ON_USE": "no", "OTHER_COMMENTS": "a lot more info\non multiple lines", "REVISION": ( "R1\n" "R1: revised time synchronization.\n" "R0: initial, preliminary version." ), } for k, v in ncom.items(): ict.normalComments.keywords[k].append(v) # we can check if nlines method of normalComments class works self.assertEqual(ict.normalComments.nlines, 21) ict.normalComments.freeform.append("free comment line 1") ict.normalComments.freeform.append("free comment line 2") self.assertEqual(ict.normalComments.nlines, 23) ict.endDefineMode() # we have not added data yet, so data must be None self.assertIsNone(ict.data[:]) # and times must be NaT self.assertTrue(np.isnat(ict.times)) # single line data adding # it is the users job to ensure data consistency! # Time_Start, Time_Stop, Payload oneLineData = np.array([(12.3, 12.5, 23789423.2e5)]) ict.data.add(oneLineData) # this also works for bulk data dumps multiLineData = np.array( [ (12.6, 13.0, 2348925e5), (13.4, 14.0, 23425634e5), (14.1, 14.7, 23422344e5), ] ) ict.data.add(multiLineData) # you can also be explicit, and define names and datatypes structuredData = np.array( [(14.5, 14.6, 24824525e5), (14.6, 14.7, 41225634e5)], dtype=[ ("Time_Start", ict.data.default_dtype), ("Time_Stop", ict.data.default_dtype), ("Payload", ict.data.default_dtype), ], ) ict.data.add(structuredData) # elements of the time array must be equal to our input t0 = np.datetime64(datetime.datetime(*now.timetuple()[:3]), "ns") for have, want in zip(ict.times, (12.3, 12.6, 13.4, 14.1, 14.5, 14.6)): self.assertEqual(int(have - t0), int(want * 10**9)) strOut = io.StringIO() ict.write(f=strOut) return True if __name__ == "__main__": # pragma: no cover unittest.main()