Skip to content
test_bulkIO.py 6.14 KiB
Newer Older
Florian Obersteiner's avatar
Florian Obersteiner committed
import unittest
import pathlib
import io
import icartt

# working directory, example files
Florian Obersteiner's avatar
Florian Obersteiner committed
wd = pathlib.Path(__file__).parent / "example_data"
Florian Obersteiner's avatar
Florian Obersteiner committed
# file : (ffi, nlscom, nlncom, nHeaderLines, exception) <- want
Florian Obersteiner's avatar
Florian Obersteiner committed
     # warns # not imported correctly!
     'AROTAL-RAY_DC8_20040715_R1.ict': (2110, 1, 19, 68, None),

     'AR_DC8_20050203_R0.ict': (2110, 0, 18, 54, None), # warns

     'BB-FLUX_CU-SOF_20180808_R2.ict': (1001, 0, 18, 38, None), # ok

     'DC8-20160517.ict': (1001, 0, 18, 36, None), # ok

     'discoveraq-CO2_p3b_20140721_R0.ict': (1001, 1, 18, 37, None), # ok

     # warns # not imported correctly!
     'DISCOVERAQ-NOXYO3_P3B_20140720_R0.ict': (1001, 0, 27, 47, None),

     'Dongdaemun_NIER_20160520_RA.ict': (1001, 0, 18, 36, None), # warns

     'HOX_DC8_20040712_R0.ict': (1001, 0, 18, 36, None), # ok

     # warns
     'korusaq-flexpart-dc8_trajectory_20160529_R2.ict': (2110, 27, 20, 101, None),

     # ok
     'korusaq-mrg01-HANSEO-KING-AIR_merge_20160507_RA.ict': (1001, 0, 18, 45, None),

     # error: 2310 not implemented
     'LIDARO3_WP3_20040830_R0.ict': (2310, 0, 18, 46, NotImplementedError),

     'NOx_RHBrown_20040830_R0.ict': (1001, 0, 18, 41, None), # ok

     # error: invalid number of variables / columns
     'output.ict': (1001, 8, 17, 41, Exception),

     'PAVE-AR_DC8_20050203_R0.ict': (2110, 1, 18, 55, None), #  warns

     # warns # not imported correctly!
     'SEAC4RS-PTRMS-acetaldehyde_DC8_20130806_R1.ict': (1001, 0, 26, 44, None),

     'bt_Munich_2020061000_72.ict.txt': (1001, 29, 18, 91, None), # warns

     # large file, needs improved reader
     'korusaq-mrg10-dc8_merge_20160510_R4.ict': (1001, 0, 29, 397, None),
     }

# TODO: dataset -> close file pointer after read ?!
Florian Obersteiner's avatar
Florian Obersteiner committed


def compareFiles(fn, strIn, strOut, skiplines=0, nlines=-1):  # pragma: no cover
    strOut.seek(0)
    strIn.seek(0)
    content_in = strIn.readlines()
    content_out = strOut.readlines()
    strIn.close()
    strOut.close()

    if nlines > 0:
        content_in = content_in[skiplines : (skiplines + nlines)]
        content_out = content_out[skiplines : (skiplines + nlines)]
    else:
        content_in = content_in[skiplines:]
        content_out = content_out[skiplines:]

    if not len(content_in) == len(content_out):
        return False

    for inline, outline in zip(content_in, content_out):
        inline = inline.strip().replace(" ", "")
        outline = outline.strip().replace(" ", "")
        if not inline == outline:
            valid_data_line = False
            # maybe this is a data line in which we only have different number formatting?
            # compare as floats
            insteps = [float(x) for x in inline.split(",")]
            outsteps = [float(x) for x in outline.split(",")]
            if len(insteps) == len(outsteps):
                valid_data_line = True
                for i in range(len(insteps)):
                    valid_data_line = valid_data_line and insteps[i] == outsteps[i]

            valid_var_line = False
Florian Obersteiner's avatar
Florian Obersteiner committed

Florian Obersteiner's avatar
Florian Obersteiner committed
            insteps = [x.strip() for x in inline.split(",")]
            outsteps = [x.strip() for x in outline.split(",")]
            if len(insteps) == 2 and len(outsteps) == 3:
                valid_var_line = (
                    insteps[0] == outsteps[0]
                    and insteps[1] == outsteps[1]
                    and insteps[1] == outsteps[2]
                )

            if not valid_data_line and not valid_var_line:
                print(f"{str(fn)}: line {i:d} differs:")
                print(f"  input: {inline}")
                print(f" output: {outline}")

                return False

    return True


class BulkIOTestCase(unittest.TestCase):
Florian Obersteiner's avatar
Florian Obersteiner committed
        self.files_ok = list((wd / "expect_ok").glob("*.ict"))
Florian Obersteiner's avatar
Florian Obersteiner committed
            (wd / "expect_warn").glob("*.ict")
        ) + list((wd / "example_data" / "expect_warn").glob("*.txt"))
Florian Obersteiner's avatar
Florian Obersteiner committed
        self.files_fail = list((wd / "expect_fail").glob("*.ict"))
Florian Obersteiner's avatar
Florian Obersteiner committed

    def testOpen(self):
Florian Obersteiner's avatar
Florian Obersteiner committed
            with self.subTest(msg=f"Opening test file {str(fn)}"):
                ict = icartt.Dataset(fn, loadData=False)
                self.assertEqual(type(ict), icartt.Dataset)

Florian Obersteiner's avatar
Florian Obersteiner committed
                # assert that we have correct number of header lines
                self.assertEqual(
                    (len(ict.specialComments), ict.normalComments.nlines, ict.nHeader),
                    fileinfo[fn.name][1:4],
                )

Florian Obersteiner's avatar
Florian Obersteiner committed
    def testReadData(self):
Florian Obersteiner's avatar
Florian Obersteiner committed
            with self.subTest(msg=f"Reading data from test file {str(fn)}"):
                ict = icartt.Dataset(fn, loadData=True)
                self.assertEqual(type(ict), icartt.Dataset)

    def testWriteHeader(self):
Florian Obersteiner's avatar
Florian Obersteiner committed
            with self.subTest(msg=f"Writing header for test file {str(fn)}"):
                ict = icartt.Dataset(fn, loadData=False)
                strIn = open(fn)
                strOut = io.StringIO()
                ict.writeHeader(f=strOut)
                self.assertTrue(compareFiles(fn, strIn, strOut, nlines=ict.nHeader))

    def testWrite(self):
Florian Obersteiner's avatar
Florian Obersteiner committed
            with self.subTest(msg=f"Writing data for test file {str(fn)}"):
                ict = icartt.Dataset(fn, loadData=True)
                strIn = open(fn)
                strOut = io.StringIO()
                ict.write(f=strOut)
                self.assertTrue(compareFiles(fn, strIn, strOut))

    def testInvalid(self):
        for fn in self.files_fail:
            with self.subTest(msg=f"Opening invalid file {str(fn)}"):
                try:
Florian Obersteiner's avatar
Florian Obersteiner committed
                    _ = icartt.Dataset(fn, loadData=True)
                except fileinfo[fn.name][-1]:
                    pass
                else:
                    self.fail("expected to fail")

    def testWarnings(self):
        for fn in self.files_warn:
            with self.assertWarns(Warning):
Florian Obersteiner's avatar
Florian Obersteiner committed
                _ = icartt.Dataset(fn, loadData=True)
Florian Obersteiner's avatar
Florian Obersteiner committed

Florian Obersteiner's avatar
Florian Obersteiner committed
if __name__ == "__main__":  # pragma: no cover
Florian Obersteiner's avatar
Florian Obersteiner committed
    # unittest.main()
Florian Obersteiner's avatar
Florian Obersteiner committed
    import warnings
Florian Obersteiner's avatar
Florian Obersteiner committed
    with warnings.catch_warnings():
        warnings.simplefilter("ignore", category=UserWarning)
        unittest.main()