Skip to content
test_dataset.py 11.5 KiB
Newer Older
Christoph Knote's avatar
Aye
Christoph Knote committed
import unittest
import pathlib
Christoph Knote's avatar
Aye
Christoph Knote committed
import io
import datetime
import numpy as np
Christoph Knote's avatar
Aye
Christoph Knote committed
import icartt

# working directory, example files
wd = pathlib.Path(__file__).parent
fns = (wd / "examples").glob("*.ict")

def compareFiles(fn, strIn, strOut, skiplines=0, nlines=-1):  # pragma: no cover
    strOut.seek(0)
    strIn.seek(0)
Florian Obersteiner's avatar
Florian Obersteiner committed
    content_in = strIn.readlines()
    content_out = strOut.readlines()
    strIn.close()
    strOut.close()
Christoph Knote's avatar
Christoph Knote committed
    if nlines > 0:
        content_in = content_in[skiplines : (skiplines + nlines)]
        content_out = content_out[skiplines : (skiplines + nlines)]
Christoph Knote's avatar
Christoph Knote committed
    else:
        content_in = content_in[skiplines:]
Florian Obersteiner's avatar
Florian Obersteiner committed
        content_out = content_out[skiplines:]
Florian Obersteiner's avatar
Florian Obersteiner committed
    if not len(content_in) == len(content_out):
Christoph Knote's avatar
Christoph Knote committed
        return False

Florian Obersteiner's avatar
Florian Obersteiner committed
    for inline, outline in zip(content_in, content_out):
        inline = inline.strip().replace(" ", "")
Florian Obersteiner's avatar
Florian Obersteiner committed
        outline = outline.strip().replace(" ", "")
Christoph Knote's avatar
Aye
Christoph Knote committed
        if not inline == outline:
Christoph Knote's avatar
Christoph Knote committed
            valid_data_line = False
            # maybe this is a data line in which we only have different number formatting?
            # compare as floats
            # try:
            insteps = [float(x) for x in inline.split(",")]
            outsteps = [float(x) for x in outline.split(",")]
            if len(insteps) == len(outsteps):
                valid_data_line = True
                for i in range(len(insteps)):
                    valid_data_line = valid_data_line and insteps[i] == outsteps[i]
            # except:
            #     pass
Christoph Knote's avatar
Christoph Knote committed
            valid_var_line = False
            # try:
            insteps = [x.strip() for x in inline.split(",")]
            outsteps = [x.strip() for x in outline.split(",")]
            if len(insteps) == 2 and len(outsteps) == 3:
                valid_var_line = (
                    insteps[0] == outsteps[0]
                    and insteps[1] == outsteps[1]
                    and insteps[1] == outsteps[2]
                )
            # except:
            #     pass
Christoph Knote's avatar
Christoph Knote committed
            if not valid_data_line and not valid_var_line:
                print(f"{str(fn)}: line {i:d} differs:")
                print(f"  input: {inline}")
                print(f" output: {outline}")
Christoph Knote's avatar
Christoph Knote committed
                return False
Christoph Knote's avatar
Aye
Christoph Knote committed
    return True

Christoph Knote's avatar
Christoph Knote committed
class Simple1001TestCase(unittest.TestCase):
    def setUp(self):
        self.fn = wd / "examples/NOx_RHBrown_20040830_R0.ict"
        self.nHeader = 41
Christoph Knote's avatar
Aye
Christoph Knote committed

Christoph Knote's avatar
Christoph Knote committed
    def tearDown(self):
    def testOpen(self):
        ict = icartt.Dataset(self.fn, loadData=False)
        self.assertEqual(type(ict), icartt.Dataset)
Christoph Knote's avatar
Christoph Knote committed

    def testFormat(self):
        ict = icartt.Dataset(self.fn, loadData=False)
        self.assertEqual(ict.format, icartt.Formats.FFI1001)
Christoph Knote's avatar
Aye
Christoph Knote committed

    def testN(self):
        ict = icartt.Dataset(self.fn, loadData=False)
        self.assertEqual(ict.nHeader, self.nHeader)
        self.assertEqual(len(ict.dependentVariables), 9)
        self.assertEqual(len(ict.normalComments), 18)
        self.assertEqual(len(ict.specialComments), 0)
    def testIvar(self):
        ict = icartt.Dataset(self.fn, loadData=False)
        self.assertEqual(ict.independentVariable.shortname, "Start_UTC")
        self.assertEqual(ict.independentVariable.units, "seconds")
        self.assertEqual(
            ict.independentVariable.standardname, "number_of_seconds_from_0000_UTC"
        )
        self.assertEqual(ict.independentVariable.longname, None)
        self.assertEqual(ict.independentVariable.scale, 1.0)
        self.assertEqual(ict.independentVariable.miss, -99999.0)

    def testDvar(self):
        ict = icartt.Dataset(self.fn, loadData=False)

        self.assertEqual(
            [DVAR.shortname for DVAR in ict.dependentVariables.values()],
            [
                "Stop_UTC",
                "Mid_UTC",
                "DLat",
                "DLon",
                "Elev",
                "NO_ppbv",
                "NO_1sig",
                "NO2_ppbv",
                "NO2_1sig",
            ],
        )

        self.assertEqual(
            [DVAR.units for DVAR in ict.dependentVariables.values()],
            [
                "seconds",
                "seconds",
                "deg_N",
                "deg_E",
                "meters",
                "ppbv",
                "ppbv",
                "ppbv",
                "ppbv",
            ],
        )

        self.assertEqual(
            [DVAR.standardname for DVAR in ict.dependentVariables.values()],
            [None, None, None, None, None, None, None, None, None],
        )

        self.assertEqual(
            [DVAR.longname for DVAR in ict.dependentVariables.values()],
            [None, None, None, None, None, None, None, None, None],
        )

        self.assertEqual(
            [DVAR.scale for DVAR in ict.dependentVariables.values()],
            ["1", "1", "1", "1", "1", "1", "1", "1", "1"],
        )

        self.assertEqual(
            [DVAR.miss for DVAR in ict.dependentVariables.values()],
            [
                "-9999",
                "-9999",
                "-9999",
                "-9999",
                "-9999",
                "-9999",
                "-9999",
                "-9999",
                "-9999",
            ],
        )
Christoph Knote's avatar
Christoph Knote committed

    def testNCOM(self):
        ict = icartt.Dataset(self.fn, loadData=False)

        self.assertEqual(
            ict.normalComments.keywords["PI_CONTACT_INFO"].data,
            [
                "325 Broadway, Boulder, CO 80305; 303-497-3226; email:eric.j.williams@noaa.gov"
            ],
        )
        self.assertEqual(
            ict.normalComments.keywords["PLATFORM"].data,
            ["NOAA research vessel Ronald H. Brown"],
        )
        self.assertEqual(
            ict.normalComments.keywords["LOCATION"].data,
            ["Latitude, longitude and elevation data are included in the data records"],
        )
        self.assertEqual(ict.normalComments.keywords["ASSOCIATED_DATA"].data, ["N/A"])
        self.assertEqual(
            ict.normalComments.keywords["INSTRUMENT_INFO"].data,
            ["NO: chemiluminescence; NO2: narrow-band photolysis/chemiluminescence"],
        )
        self.assertEqual(
            ict.normalComments.keywords["DATA_INFO"].data,
            [
                "All data with the exception of the location data are in ppbv. All oneminute averages contain at least 35 seconds of data, otherwise missing."
            ],
        )
        self.assertEqual(
            ict.normalComments.keywords["UNCERTAINTY"].data,
            ["included in the data records as variables with a _1sig suffix"],
        )
        self.assertEqual(ict.normalComments.keywords["ULOD_FLAG"].data, ["-7777"])
        self.assertEqual(ict.normalComments.keywords["ULOD_VALUE"].data, ["N/A"])
        self.assertEqual(ict.normalComments.keywords["LLOD_FLAG"].data, ["-8888"])
        self.assertEqual(
            ict.normalComments.keywords["LLOD_VALUE"].data,
            ["N/A, N/A, N/A, N/A, N/A, 0.005, N/A, 0.025, N/A"],
        )
        self.assertEqual(ict.normalComments.keywords["DM_CONTACT_INFO"].data, ["N/A"])
        self.assertEqual(
            ict.normalComments.keywords["PROJECT_INFO"].data,
            [
                "ICARTT study; 1 July-15 August 2004; Gulf of Maine and North Atlantic Ocean"
            ],
        )
        self.assertEqual(
            ict.normalComments.keywords["STIPULATIONS_ON_USE"].data,
            ["Use of these data requires PRIOR OK from the PI"],
        )
        self.assertEqual(ict.normalComments.keywords["OTHER_COMMENTS"].data, ["N/A"])

    def testReadData(self):
        ict = icartt.Dataset(self.fn, loadData=True)
        self.assertEqual(type(ict), icartt.Dataset)
Christoph Knote's avatar
Christoph Knote committed

    def testWriteHeader(self):
        ict = icartt.Dataset(self.fn, loadData=False)
Christoph Knote's avatar
Christoph Knote committed

        strIn = open(self.fn)
        strOut = io.StringIO()
        ict.writeHeader(f=strOut)
        self.assertTrue(compareFiles(self.fn, strIn, strOut, nlines=self.nHeader))
Christoph Knote's avatar
Aye
Christoph Knote committed

    def testWriteData(self):
        ict = icartt.Dataset(self.fn, loadData=True)
Christoph Knote's avatar
Christoph Knote committed

        strIn = open(self.fn)
        strOut = io.StringIO()
        ict.write(f=strOut)
        self.assertTrue(compareFiles(self.fn, strIn, strOut, skiplines=self.nHeader))
Christoph Knote's avatar
Aye
Christoph Knote committed

    def testWrite(self):
        ict = icartt.Dataset(self.fn, loadData=True)
Christoph Knote's avatar
Christoph Knote committed

        strIn = open(self.fn)
        strOut = io.StringIO()
        ict.write(f=strOut)
        self.assertTrue(compareFiles(self.fn, strIn, strOut))


class Create1001TestCase(unittest.TestCase):
    def testCreateDs(self):
        ict = icartt.Dataset(format=icartt.Formats.FFI1001)

        ict.PIName = "Knote, Christoph"
        ict.PIAffiliation = "Faculty of Medicine, University Augsburg, Germany"
        ict.dataSourceDescription = "Example data"
        ict.missionName = "MBEES"
        ict.dateOfCollection = datetime.datetime.today()
        ict.dateOfRevision = datetime.datetime.today()

        ict.dataIntervalCode = [0]

        ict.independentVariable = icartt.Variable(
            "Time_Start",
            "seconds_from_0_hours_on_valid_date",
            "Time_Start",
            "Time_Start",
            vartype=icartt.VariableType.IndependentVariable,
            scale=1.0,
            miss=-9999999,
        )

        ict.dependentVariables["Time_Stop"] = icartt.Variable(
            "Time_Stop",
            "seconds_from_0_hours_on_valid_date",
            "Time_Stop",
            "Time_Stop",
            scale=1.0,
            miss=-9999999,
        )

        ict.dependentVariables["Payload"] = icartt.Variable(
            "Payload", "some_units", "Payload", "Payload", scale=1.0, miss=-9999999
        )

        ict.specialComments.append("Some comments on this dataset:")
        ict.specialComments.append("They are just examples!")
        ict.specialComments.append("Adapt as needed.")

        ict.endDefineMode()

        ict.data.add(Time_Start=12.3, Time_Stop=12.5, Payload=23789423.2e5)
        mydict = {"Time_Start": 12.6, "Time_Stop": 13.1, "Payload": 324235644.1e5}
        ict.data.add(**mydict)
        data = np.array([(13.4, 14.0, 2348925e5), (14.1, 14.9, 23425634e5)])
        ict.data.addBulk(data)

        strOut = io.StringIO()
        ict.write(f=strOut)
Christoph Knote's avatar
Christoph Knote committed

Christoph Knote's avatar
Christoph Knote committed

# fns        = [ os.path.join("tests", "examples", fn) for fn in os.listdir(os.path.join("tests", "examples")) if fn.endswith(".ict")]
# fns = [ "tests/examples/AROTAL-RAY_DC8_20040715_R1.ict" ]

Christoph Knote's avatar
Christoph Knote committed

class BulkIOTestCase(unittest.TestCase):
    def testOpen(self):
Christoph Knote's avatar
Christoph Knote committed
        for fn in fns:
            with self.subTest(msg=f"Opening test file {str(fn)}"):
                ict = icartt.Dataset(fn, loadData=False)
                self.assertEqual(type(ict), icartt.Dataset)
Christoph Knote's avatar
Christoph Knote committed

    def testReadData(self):
Christoph Knote's avatar
Christoph Knote committed
        for fn in fns:
            with self.subTest(msg=f"Reading data from test file {str(fn)}"):
                ict = icartt.Dataset(fn, loadData=True)
                self.assertEqual(type(ict), icartt.Dataset)
Christoph Knote's avatar
Christoph Knote committed

    def testWriteHeader(self):
Christoph Knote's avatar
Christoph Knote committed
        for fn in fns:
            with self.subTest(msg=f"Writing header for test file {str(fn)}"):
                ict = icartt.Dataset(fn, loadData=False)
                strIn = open(fn)
                strOut = io.StringIO()
                ict.writeHeader(f=strOut)
                self.assertTrue(compareFiles(fn, strIn, strOut, nlines=ict.nHeader))
Christoph Knote's avatar
Christoph Knote committed

    def testWrite(self):
Christoph Knote's avatar
Christoph Knote committed
        for fn in fns:
            with self.subTest(msg=f"Writing data for test file {str(fn)}"):
                ict = icartt.Dataset(fn, loadData=True)
                strIn = open(fn)
                strOut = io.StringIO()
                ict.write(f=strOut)
                self.assertTrue(compareFiles(fn, strIn, strOut))

Christoph Knote's avatar
Christoph Knote committed

if __name__ == "__main__":  # pragma: no cover
    unittest.main()