Skip to content
test_bulkIO.py 6.1 KiB
Newer Older
Florian Obersteiner's avatar
Florian Obersteiner committed
import unittest
import pathlib
import io
import icartt

# working directory, example files
wd = pathlib.Path(__file__).parent

#    file name : want (ffi nlscom, nlncom, nHeaderLines)
fileinfo = {
    "AROTAL-RAY_DC8_20040715_R1.ict": (
        2110,
        1,
        19,
        68,
    ),  # warns # not imported correctly!
    "AR_DC8_20050203_R0.ict": (2110, 0, 18, 54),  # warns
    "BB-FLUX_CU-SOF_20180808_R2.ict": (1001, 0, 18, 38),  # ok
    "DC8-20160517.ict": (1001, 0, 18, 36),  # ok
    "discoveraq-CO2_p3b_20140721_R0.ict": (1001, 1, 18, 37),  # ok
    "DISCOVERAQ-NOXYO3_P3B_20140720_R0.ict": (
        1001,
        0,
        27,
        47,
    ),  # warns # not imported correctly!
    "Dongdaemun_NIER_20160520_RA.ict": (1001, 0, 18, 36),  # warns
    "HOX_DC8_20040712_R0.ict": (1001, 0, 18, 36),  # ok
    "korusaq-flexpart-dc8_trajectory_20160529_R2.ict": (2110, 27, 20, 101),  # warns
    "korusaq-mrg01-HANSEO-KING-AIR_merge_20160507_RA.ict": (1001, 0, 18, 45),  # ok
    "LIDARO3_WP3_20040830_R0.ict": (
        2310,
        0,
        18,
        46,
        NotImplementedError,
    ),  # error: 2310 not implemented
    "NOx_RHBrown_20040830_R0.ict": (1001, 0, 18, 41),  # ok
    "output.ict": (
        1001,
        8,
        17,
        41,
        Exception,
    ),  # error: invalid number of variables / columns
    "PAVE-AR_DC8_20050203_R0.ict": (2110, 1, 18, 55),  #  warns
    "SEAC4RS-PTRMS-acetaldehyde_DC8_20130806_R1.ict": (
        1001,
        0,
        26,
        44,
    ),  # warns # not imported correctly!
    "bt_Munich_2020061000_72.ict.txt": (1001, 29, 18, 91),  # warns
    "korusaq-mrg10-dc8_merge_20160510_R4.ict": (
        1001,
        0,
        29,
        397,
    ),  # large file, needs improved reader
}
Florian Obersteiner's avatar
Florian Obersteiner committed


def compareFiles(fn, strIn, strOut, skiplines=0, nlines=-1):  # pragma: no cover
    strOut.seek(0)
    strIn.seek(0)
    content_in = strIn.readlines()
    content_out = strOut.readlines()
    strIn.close()
    strOut.close()

    if nlines > 0:
        content_in = content_in[skiplines : (skiplines + nlines)]
        content_out = content_out[skiplines : (skiplines + nlines)]
    else:
        content_in = content_in[skiplines:]
        content_out = content_out[skiplines:]

    if not len(content_in) == len(content_out):
        return False

    for inline, outline in zip(content_in, content_out):
        inline = inline.strip().replace(" ", "")
        outline = outline.strip().replace(" ", "")
        if not inline == outline:
            valid_data_line = False
            # maybe this is a data line in which we only have different number formatting?
            # compare as floats
            # try:
            insteps = [float(x) for x in inline.split(",")]
            outsteps = [float(x) for x in outline.split(",")]
            if len(insteps) == len(outsteps):
                valid_data_line = True
                for i in range(len(insteps)):
                    valid_data_line = valid_data_line and insteps[i] == outsteps[i]
            # except:
            #     pass

            valid_var_line = False
            # try:
            insteps = [x.strip() for x in inline.split(",")]
            outsteps = [x.strip() for x in outline.split(",")]
            if len(insteps) == 2 and len(outsteps) == 3:
                valid_var_line = (
                    insteps[0] == outsteps[0]
                    and insteps[1] == outsteps[1]
                    and insteps[1] == outsteps[2]
                )
            # except:
            #     pass

            if not valid_data_line and not valid_var_line:
                print(f"{str(fn)}: line {i:d} differs:")
                print(f"  input: {inline}")
                print(f" output: {outline}")

                return False

    return True


class BulkIOTestCase(unittest.TestCase):
    def setUp(self):
        self.files_ok = list((wd / "example_data" / "expect_ok").glob("*.ict"))
        self.files_warn = list(
            (wd / "example_data" / "expect_warn").glob("*.ict")
        ) + list((wd / "example_data" / "expect_warn").glob("*.txt"))
        self.files_fail = list((wd / "example_data" / "expect_fail").glob("*.ict"))

    def tearDown(self):
        pass
Florian Obersteiner's avatar
Florian Obersteiner committed

    def testOpen(self):
Florian Obersteiner's avatar
Florian Obersteiner committed
            with self.subTest(msg=f"Opening test file {str(fn)}"):
                ict = icartt.Dataset(fn, loadData=False)
                self.assertEqual(type(ict), icartt.Dataset)

    def testReadData(self):
Florian Obersteiner's avatar
Florian Obersteiner committed
            with self.subTest(msg=f"Reading data from test file {str(fn)}"):
                ict = icartt.Dataset(fn, loadData=True)
                self.assertEqual(type(ict), icartt.Dataset)

    def testWriteHeader(self):
Florian Obersteiner's avatar
Florian Obersteiner committed
            with self.subTest(msg=f"Writing header for test file {str(fn)}"):
                ict = icartt.Dataset(fn, loadData=False)
                strIn = open(fn)
                strOut = io.StringIO()
                ict.writeHeader(f=strOut)
                self.assertTrue(compareFiles(fn, strIn, strOut, nlines=ict.nHeader))

    def testWrite(self):
Florian Obersteiner's avatar
Florian Obersteiner committed
            with self.subTest(msg=f"Writing data for test file {str(fn)}"):
                ict = icartt.Dataset(fn, loadData=True)
                strIn = open(fn)
                strOut = io.StringIO()
                ict.write(f=strOut)
                self.assertTrue(compareFiles(fn, strIn, strOut))

    def testInvalid(self):
        for fn in self.files_fail:
            with self.subTest(msg=f"Opening invalid file {str(fn)}"):
                try:
                    _ = icartt.Dataset(fn, loadData=False)
                except fileinfo[fn.name][-1]:
                    pass
                else:
                    self.fail("expected to fail")

    def testWarnings(self):
        for fn in self.files_warn:
            with self.assertWarns(Warning):
                _ = icartt.Dataset(fn, loadData=False)

Florian Obersteiner's avatar
Florian Obersteiner committed

Florian Obersteiner's avatar
Florian Obersteiner committed
if __name__ == "__main__":  # pragma: no cover
    # with warnings.catch_warnings():
    #     warnings.simplefilter("ignore", category=UserWarning)
    #     unittest.main()