Skip to content
test_1001.py 12.2 KiB
Newer Older
Christoph Knote's avatar
Aye
Christoph Knote committed
import unittest
import pathlib
Christoph Knote's avatar
Aye
Christoph Knote committed
import io
import datetime
import numpy as np
Christoph Knote's avatar
Aye
Christoph Knote committed
import icartt

# working directory, example files
wd = pathlib.Path(__file__).parent
Florian Obersteiner's avatar
Florian Obersteiner committed
fns_pass = (wd / "example_data").glob("*.ict")
fns_fail = (wd / "example_data" / "will_fail").glob("*.ict")

def compareFiles(fn, strIn, strOut, skiplines=0, nlines=-1):  # pragma: no cover
    strOut.seek(0)
    strIn.seek(0)
Florian Obersteiner's avatar
Florian Obersteiner committed
    content_in = strIn.readlines()
    content_out = strOut.readlines()
    strIn.close()
    strOut.close()
Christoph Knote's avatar
Christoph Knote committed
    if nlines > 0:
        content_in = content_in[skiplines : (skiplines + nlines)]
        content_out = content_out[skiplines : (skiplines + nlines)]
Christoph Knote's avatar
Christoph Knote committed
    else:
        content_in = content_in[skiplines:]
Florian Obersteiner's avatar
Florian Obersteiner committed
        content_out = content_out[skiplines:]
Florian Obersteiner's avatar
Florian Obersteiner committed
    if not len(content_in) == len(content_out):
Christoph Knote's avatar
Christoph Knote committed
        return False

Florian Obersteiner's avatar
Florian Obersteiner committed
    for inline, outline in zip(content_in, content_out):
        inline = inline.strip().replace(" ", "")
Florian Obersteiner's avatar
Florian Obersteiner committed
        outline = outline.strip().replace(" ", "")
Christoph Knote's avatar
Aye
Christoph Knote committed
        if not inline == outline:
Christoph Knote's avatar
Christoph Knote committed
            valid_data_line = False
            # maybe this is a data line in which we only have different number formatting?
            # compare as floats
            # try:
            insteps = [float(x) for x in inline.split(",")]
            outsteps = [float(x) for x in outline.split(",")]
            if len(insteps) == len(outsteps):
                valid_data_line = True
                for i in range(len(insteps)):
                    valid_data_line = valid_data_line and insteps[i] == outsteps[i]
            # except:
            #     pass
Christoph Knote's avatar
Christoph Knote committed
            valid_var_line = False
            # try:
            insteps = [x.strip() for x in inline.split(",")]
            outsteps = [x.strip() for x in outline.split(",")]
            if len(insteps) == 2 and len(outsteps) == 3:
                valid_var_line = (
                    insteps[0] == outsteps[0]
                    and insteps[1] == outsteps[1]
                    and insteps[1] == outsteps[2]
                )
            # except:
            #     pass
Christoph Knote's avatar
Christoph Knote committed
            if not valid_data_line and not valid_var_line:
                print(f"{str(fn)}: line {i:d} differs:")
                print(f"  input: {inline}")
                print(f" output: {outline}")
Christoph Knote's avatar
Christoph Knote committed
                return False
Christoph Knote's avatar
Aye
Christoph Knote committed
    return True

Christoph Knote's avatar
Christoph Knote committed
class Simple1001TestCase(unittest.TestCase):
    def setUp(self):
        self.fn = wd / "example_data" / "NOx_RHBrown_20040830_R0.ict"
        self.nHeader = 41
Christoph Knote's avatar
Aye
Christoph Knote committed

Christoph Knote's avatar
Christoph Knote committed
    def tearDown(self):
    def testOpen(self):
        ict = icartt.Dataset(self.fn, loadData=False)
        self.assertEqual(type(ict), icartt.Dataset)
Christoph Knote's avatar
Christoph Knote committed

    def testFormat(self):
        ict = icartt.Dataset(self.fn, loadData=False)
        self.assertEqual(ict.format, icartt.Formats.FFI1001)
Christoph Knote's avatar
Aye
Christoph Knote committed

    def testN(self):
        ict = icartt.Dataset(self.fn, loadData=False)
        self.assertEqual(ict.nHeader, self.nHeader)
        self.assertEqual(len(ict.dependentVariables), 9)
        self.assertEqual(len(ict.normalComments), 18)
        self.assertEqual(len(ict.specialComments), 0)
    def testIvar(self):
        ict = icartt.Dataset(self.fn, loadData=False)
        self.assertEqual(ict.independentVariable.shortname, "Start_UTC")
        self.assertEqual(ict.independentVariable.units, "seconds")
        self.assertEqual(
            ict.independentVariable.standardname, "number_of_seconds_from_0000_UTC"
        )
        self.assertEqual(ict.independentVariable.longname, None)
        self.assertEqual(ict.independentVariable.scale, 1.0)
        self.assertEqual(ict.independentVariable.miss, -99999.0)

    def testDvar(self):
        ict = icartt.Dataset(self.fn, loadData=False)

        self.assertEqual(
            [DVAR.shortname for DVAR in ict.dependentVariables.values()],
            [
                "Stop_UTC",
                "Mid_UTC",
                "DLat",
                "DLon",
                "Elev",
                "NO_ppbv",
                "NO_1sig",
                "NO2_ppbv",
                "NO2_1sig",
            ],
        )

        self.assertEqual(
            [DVAR.units for DVAR in ict.dependentVariables.values()],
            [
                "seconds",
                "seconds",
                "deg_N",
                "deg_E",
                "meters",
                "ppbv",
                "ppbv",
                "ppbv",
                "ppbv",
            ],
        )

        self.assertEqual(
            [DVAR.standardname for DVAR in ict.dependentVariables.values()],
            [None, None, None, None, None, None, None, None, None],
        )

        self.assertEqual(
            [DVAR.longname for DVAR in ict.dependentVariables.values()],
            [None, None, None, None, None, None, None, None, None],
        )

        self.assertEqual(
            [DVAR.scale for DVAR in ict.dependentVariables.values()],
            ["1", "1", "1", "1", "1", "1", "1", "1", "1"],
        )

        self.assertEqual(
            [DVAR.miss for DVAR in ict.dependentVariables.values()],
            [
                "-9999",
                "-9999",
                "-9999",
                "-9999",
                "-9999",
                "-9999",
                "-9999",
                "-9999",
                "-9999",
            ],
        )
Christoph Knote's avatar
Christoph Knote committed

    def testNCOM(self):
        ict = icartt.Dataset(self.fn, loadData=False)

        self.assertEqual(
            ict.normalComments.keywords["PI_CONTACT_INFO"].data,
            [
                "325 Broadway, Boulder, CO 80305; 303-497-3226; email:eric.j.williams@noaa.gov"
            ],
        )
        self.assertEqual(
            ict.normalComments.keywords["PLATFORM"].data,
            ["NOAA research vessel Ronald H. Brown"],
        )
        self.assertEqual(
            ict.normalComments.keywords["LOCATION"].data,
            ["Latitude, longitude and elevation data are included in the data records"],
        )
        self.assertEqual(ict.normalComments.keywords["ASSOCIATED_DATA"].data, ["N/A"])
        self.assertEqual(
            ict.normalComments.keywords["INSTRUMENT_INFO"].data,
            ["NO: chemiluminescence; NO2: narrow-band photolysis/chemiluminescence"],
        )
        self.assertEqual(
            ict.normalComments.keywords["DATA_INFO"].data,
            [
                "All data with the exception of the location data are in ppbv. All oneminute averages contain at least 35 seconds of data, otherwise missing."
            ],
        )
        self.assertEqual(
            ict.normalComments.keywords["UNCERTAINTY"].data,
            ["included in the data records as variables with a _1sig suffix"],
        )
        self.assertEqual(ict.normalComments.keywords["ULOD_FLAG"].data, ["-7777"])
        self.assertEqual(ict.normalComments.keywords["ULOD_VALUE"].data, ["N/A"])
        self.assertEqual(ict.normalComments.keywords["LLOD_FLAG"].data, ["-8888"])
        self.assertEqual(
            ict.normalComments.keywords["LLOD_VALUE"].data,
            ["N/A, N/A, N/A, N/A, N/A, 0.005, N/A, 0.025, N/A"],
        )
        self.assertEqual(ict.normalComments.keywords["DM_CONTACT_INFO"].data, ["N/A"])
        self.assertEqual(
            ict.normalComments.keywords["PROJECT_INFO"].data,
            [
                "ICARTT study; 1 July-15 August 2004; Gulf of Maine and North Atlantic Ocean"
            ],
        )
        self.assertEqual(
            ict.normalComments.keywords["STIPULATIONS_ON_USE"].data,
            ["Use of these data requires PRIOR OK from the PI"],
        )
        self.assertEqual(ict.normalComments.keywords["OTHER_COMMENTS"].data, ["N/A"])

    def testReadData(self):
        ict = icartt.Dataset(self.fn, loadData=True)
        self.assertEqual(type(ict), icartt.Dataset)
Christoph Knote's avatar
Christoph Knote committed

    def testWriteHeader(self):
        ict = icartt.Dataset(self.fn, loadData=False)
Christoph Knote's avatar
Christoph Knote committed

        strIn = open(self.fn)
        strOut = io.StringIO()
        ict.writeHeader(f=strOut)
        self.assertTrue(compareFiles(self.fn, strIn, strOut, nlines=self.nHeader))
Christoph Knote's avatar
Aye
Christoph Knote committed

    def testWriteData(self):
        ict = icartt.Dataset(self.fn, loadData=True)
Christoph Knote's avatar
Christoph Knote committed

        strIn = open(self.fn)
        strOut = io.StringIO()
        ict.write(f=strOut)
        self.assertTrue(compareFiles(self.fn, strIn, strOut, skiplines=self.nHeader))
Christoph Knote's avatar
Aye
Christoph Knote committed

    def testWrite(self):
        ict = icartt.Dataset(self.fn, loadData=True)
Christoph Knote's avatar
Christoph Knote committed

        strIn = open(self.fn)
        strOut = io.StringIO()
        ict.write(f=strOut)
        self.assertTrue(compareFiles(self.fn, strIn, strOut))


class Create1001TestCase(unittest.TestCase):
    def testCreateDs(self):
Florian Obersteiner's avatar
Florian Obersteiner committed
        now = datetime.datetime.today()

        ict = icartt.Dataset(format=icartt.Formats.FFI1001)

        ict.PIName = "Knote, Christoph"
        ict.PIAffiliation = "Faculty of Medicine, University Augsburg, Germany"
        ict.dataSourceDescription = "Example data"
        ict.missionName = "MBEES"
Florian Obersteiner's avatar
Florian Obersteiner committed
        ict.dateOfCollection = now.timetuple()[:3]
        ict.dateOfRevision = now.timetuple()[:3]

        ict.dataIntervalCode = [0]

        ict.independentVariable = icartt.Variable(
            "Time_Start",
            "seconds_from_0_hours_on_valid_date",
            "Time_Start",
            "Time_Start",
            vartype=icartt.VariableType.IndependentVariable,
            scale=1.0,
            miss=-9999999,
        )

        ict.dependentVariables["Time_Stop"] = icartt.Variable(
            "Time_Stop",
            "seconds_from_0_hours_on_valid_date",
            "Time_Stop",
            "Time_Stop",
            scale=1.0,
            miss=-9999999,
        )

        ict.dependentVariables["Payload"] = icartt.Variable(
            "Payload", "some_units", "Payload", "Payload", scale=1.0, miss=-9999999
        )

        ict.specialComments.append("Some comments on this dataset:")
        ict.specialComments.append("They are just examples!")
        ict.specialComments.append("Adapt as needed.")

        # we can just use len of the list to check number of comments
        self.assertEqual(len(ict.specialComments), 3)

        # let's define some normal comments... 21 lines
        ncom = {
            "PI_CONTACT_INFO": "PI1 pi-email@mail.com\nPI2 more-email@what.com",
            "PLATFORM": "a platform",
            "LOCATION": "somewhere",
            "ASSOCIATED_DATA": "met sensor data",
            "INSTRUMENT_INFO": "super cool instrument",
            "DATA_INFO": f"icartt Python package version: {icartt.__version__}",
            "UNCERTAINTY": "not much",
            "ULOD_FLAG": "-7777",
            "ULOD_VALUE": "N/A",
            "LLOD_FLAG": "-8888",
            "LLOD_VALUE": "N/A",
            "DM_CONTACT_INFO": "datamanager@mail.edu",
            "PROJECT_INFO": "the campaign",
            "STIPULATIONS_ON_USE": "no",
            "OTHER_COMMENTS": "a lot more info\non multiple lines",
            "REVISION": (
                "R1\n"
                "R1: revised time synchronization.\n"
                "R0: initial, preliminary version."
            ),
        }
        for k, v in ncom.items():
            ict.normalComments.keywords[k].append(v)

        # we can check if nlines method of normalComments class works
        self.assertEqual(ict.normalComments.nlines, 21)

Florian Obersteiner's avatar
Florian Obersteiner committed
        ict.normalComments.freeform.append("free comment line 1")
        ict.normalComments.freeform.append("free comment line 2")
        self.assertEqual(ict.normalComments.nlines, 23)
        ict.endDefineMode()

Florian Obersteiner's avatar
Florian Obersteiner committed
        # we have not added data yet, so data must be None
        self.assertIsNone(ict.data[:])
        # and times must be NaT
        self.assertTrue(np.isnat(ict.times))

        ict.data.add(Time_Start=12.3, Time_Stop=12.5, Payload=23789423.2e5)
        mydict = {"Time_Start": 12.6, "Time_Stop": 13.1, "Payload": 324235644.1e5}
        ict.data.add(**mydict)
        data = np.array([(13.4, 14.0, 2348925e5), (14.1, 14.9, 23425634e5)])
        ict.data.addBulk(data)
Florian Obersteiner's avatar
Florian Obersteiner committed
        # elements of the time array must be equal to our input
Florian Obersteiner's avatar
Florian Obersteiner committed
        t0 = np.datetime64(datetime.datetime(*now.timetuple()[:3]), "ns")
Florian Obersteiner's avatar
Florian Obersteiner committed
        for have, want in zip(ict.times, (12.3, 12.6, 13.4, 14.1)):
Florian Obersteiner's avatar
Florian Obersteiner committed
            self.assertEqual(int(have - t0), int(want * 10**9))
Florian Obersteiner's avatar
Florian Obersteiner committed

        strOut = io.StringIO()
        ict.write(f=strOut)
Christoph Knote's avatar
Christoph Knote committed

Christoph Knote's avatar
Christoph Knote committed


if __name__ == "__main__":  # pragma: no cover

    import warnings

    with warnings.catch_warnings():
        warnings.simplefilter("ignore", category=UserWarning)
        unittest.main()