From d41b0ea04077b3dd6ff548473732510f01c6114b Mon Sep 17 00:00:00 2001 From: "Jens H. Nielsen" Date: Wed, 15 Nov 2023 13:39:09 +0100 Subject: [PATCH 1/3] chunk delayed export to limit the number of files --- src/qcodes/dataset/data_set.py | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/src/qcodes/dataset/data_set.py b/src/qcodes/dataset/data_set.py index c58aef15ff7c..4103d00e5d90 100644 --- a/src/qcodes/dataset/data_set.py +++ b/src/qcodes/dataset/data_set.py @@ -14,7 +14,7 @@ from typing import TYPE_CHECKING, Any, Literal import numpy -from tqdm.auto import trange +from tqdm.auto import tqdm import qcodes from qcodes.dataset.data_set_protocol import ( @@ -246,7 +246,8 @@ def __init__( #: In memory representation of the data in the dataset. self._cache: DataSetCacheWithDBBackend = DataSetCacheWithDBBackend(self) self._results: list[dict[str, VALUE]] = [] - self._in_memory_cache = in_memory_cache + self._in_memory_cache: bool = in_memory_cache + self._max_num_files_export = 100 self._export_limit = 1000 if run_id is not None: @@ -1484,6 +1485,21 @@ def _set_export_info(self, export_info: ExportInfo) -> None: def _export_as_netcdf(self, path: Path, file_name: str) -> Path: """Export data as netcdf to a given path with file prefix""" + + def generate_steps(num_rows, max_num_steps) -> list[tuple[int, int]]: + if max_num_steps >= num_rows: + return [(i + 1, i + 1) for i in range(num_rows)] + + step_size, remainder = divmod(num_rows, max_num_steps) + limits = [ + (i * step_size + 1, (i + 1) * step_size) for i in range(max_num_steps) + ] + + if remainder > 0: + limits[-1] = (limits[-1][0], (step_size) * max_num_steps + remainder) + + return limits + import xarray as xr file_path = path / file_name @@ -1514,12 +1530,16 @@ def _export_as_netcdf(self, path: Path, file_name: str) -> Path: "temp_dir": temp_dir, }, ) - num_files = len(self) + num_rows = len(self) + steps = generate_steps(num_rows, self._max_num_files_export) + num_files = len(steps) num_digits = len(str(num_files)) file_name_template = f"ds_{{:0{num_digits}d}}.nc" - for i in trange(num_files, desc="Writing individual files"): + for i, (start, stop) in tqdm( + enumerate(steps), total=num_files, desc="Writing individual files" + ): xarray_to_h5netcdf_with_complex_numbers( - self.to_xarray_dataset(start=i + 1, end=i + 1), + self.to_xarray_dataset(start=start, end=stop), temp_path / file_name_template.format(i), ) files = tuple(temp_path.glob("*.nc")) From 1faf0d50559cc2df4d931ea8220da0ede75e364c Mon Sep 17 00:00:00 2001 From: "Jens H. Nielsen" Date: Thu, 16 Nov 2023 07:04:41 +0100 Subject: [PATCH 2/3] add hypothesis test for chunk size --- tests/dataset/test_dataset_export.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/tests/dataset/test_dataset_export.py b/tests/dataset/test_dataset_export.py index f9f00195aff0..d38c7cd86ad2 100644 --- a/tests/dataset/test_dataset_export.py +++ b/tests/dataset/test_dataset_export.py @@ -5,9 +5,11 @@ import os from pathlib import Path +import hypothesis.strategies as hst import numpy as np import pytest import xarray as xr +from hypothesis import HealthCheck, given, settings from numpy.testing import assert_allclose from pytest import LogCaptureFixture, TempPathFactory @@ -826,11 +828,17 @@ def test_export_dataset_small_no_delated( assert "Writing netcdf file directly" in caplog.records[0].msg +@settings( + deadline=None, + suppress_health_check=(HealthCheck.function_scoped_fixture,), +) +@given(max_num_files=hst.integers(min_value=1, max_value=55)) def test_export_dataset_delayed_numeric( - tmp_path_factory: TempPathFactory, mock_dataset_grid: DataSet, caplog + max_num_files, tmp_path_factory: TempPathFactory, mock_dataset_grid: DataSet, caplog ) -> None: - tmp_path = tmp_path_factory.mktemp("export_netcdf") + tmp_path = tmp_path_factory.mktemp(f"export_netcdf_{max_num_files}") mock_dataset_grid._export_limit = 0 + mock_dataset_grid._max_num_files_export = max_num_files with caplog.at_level(logging.INFO): mock_dataset_grid.export(export_type="netcdf", path=tmp_path, prefix="qcodes_") From 8e4861aba96ef09c551f2a64d57904563183b6fc Mon Sep 17 00:00:00 2001 From: "Jens H. Nielsen" Date: Wed, 20 Dec 2023 16:47:37 +0100 Subject: [PATCH 3/3] uncompress before trying to merge --- src/qcodes/dataset/data_set.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/qcodes/dataset/data_set.py b/src/qcodes/dataset/data_set.py index 4103d00e5d90..0bd0d101dc40 100644 --- a/src/qcodes/dataset/data_set.py +++ b/src/qcodes/dataset/data_set.py @@ -13,6 +13,7 @@ from threading import Thread from typing import TYPE_CHECKING, Any, Literal +import cf_xarray as cfxr import numpy from tqdm.auto import tqdm @@ -1543,7 +1544,9 @@ def generate_steps(num_rows, max_num_steps) -> list[tuple[int, int]]: temp_path / file_name_template.format(i), ) files = tuple(temp_path.glob("*.nc")) - data = xr.open_mfdataset(files) + data = xr.open_mfdataset( + files, preprocess=cfxr.coding.decode_compress_to_multi_index + ) try: log.info( "Combining temp files into one file.",