Added a datatrove based pipeline for filtering tokenized data using scores.#235
Added a datatrove based pipeline for filtering tokenized data using scores.#235BlueCrescent wants to merge 20 commits intomasterfrom
Conversation
BlueCrescent
commented
Jul 25, 2025
- Included an example configuration file.
- Added datatrove and pydantic-settings to requirements.
- Note that modalities is also required for the pipeline to work, but it is not included in the requirements file.
…ized data using scores. - Included an example configuration file. - Added datatrove and pydantic-settings to requirements. - Note that modalities is also required for the pipeline to work, but it is not included in the requirements file.
There was a problem hiding this comment.
Pull Request Overview
This PR implements a data filtering pipeline using datatrove for filtering tokenized data based on scores. The pipeline processes JSONL files containing scores for data samples and filters corresponding tokenized datasets based on configurable thresholds.
- Adds a complete datatrove-based filtering pipeline with score parsing and data filtering components
- Introduces configuration management using pydantic-settings for both local and Slurm execution environments
- Updates dependencies to include datatrove and pydantic-settings
Reviewed Changes
Copilot reviewed 5 out of 6 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py | Implements ScoresParser class for reading JSONL score files and mapping to tokenized data |
| src/ml_filter/data_processing/score_based_filtering/step_data_filtering.py | Implements DataFiltering class for filtering datasets based on score thresholds |
| src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py | Main pipeline orchestration with configuration management and execution settings |
| pyproject.toml | Adds datatrove and pydantic-settings dependencies |
| configs/data_processing/example_filter_pipeline_config.yaml | Example configuration file for the filtering pipeline |
Comments suppressed due to low confidence (1)
src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py:241
- [nitpick] The error message could be more helpful by providing an example of how to use the FilterPipelineBuilder class directly or where to find documentation.
"and use the FilterPipelineBuilder class directly."
src/ml_filter/data_processing/score_based_filtering/step_data_filtering.py
Outdated
Show resolved
Hide resolved
src/ml_filter/data_processing/score_based_filtering/step_data_filtering.py
Outdated
Show resolved
Hide resolved
src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py
Outdated
Show resolved
Hide resolved
src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py
Show resolved
Hide resolved
src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py
Outdated
Show resolved
Hide resolved
src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py
Outdated
Show resolved
Hide resolved
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…g pipeline and adapted the codebase for new changes from main
… execution settings
…dle duplicates in score parsing
| document = self.get_document_from_dict(doc_content, filepath, 0) | ||
| return [document] | ||
|
|
||
| def _parse_scores_jsonl_file(self, filepath: str) -> tuple[str, list[dict[str, float]]]: |
There was a problem hiding this comment.
the scores are emitted in lexicographic order of the document IDs. IDs such as sample1, sample2, sample10 will be reordered to sample1, sample10, sample2, so the thresholds get applied to the wrong rows in the packed dataset. Please preserve the original file order (e.g. rely on insertion order or track the original line index when deduplicating).
src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py
Show resolved
Hide resolved
src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py
Show resolved
Hide resolved
| output_folder (Path): The folder where the filtered datasets will be saved. | ||
| thresholds (dict[str, float]): A dictionary where keys are score names and values are the | ||
| thresholds to filter samples. | ||
| hash_to_base_file_mapping_csv (Path): A CSV file mapping base file hashes to their corresponding paths. |
src/ml_filter/data_processing/score_based_filtering/step_data_filtering.py
Show resolved
Hide resolved
|
|
||
| sbatch_args = values.get("sbatch_args") or {} | ||
| if isinstance(sbatch_args, _DictConfig): | ||
| sbatch_args = OmegaConf.to_container(sbatch_args, resolve=True) # type: ignore[arg-type] |
There was a problem hiding this comment.
Will this not throw an error ?, unless you import OmegaConf ?
There was a problem hiding this comment.
Hmmm ? from omegaconf import DictConfig as _DictConfig does not import OmegaConf. I am not sure why the code is not throwing an error here, from omegaconf import DictConfig as _DictConfig, OmegaConf should be the way
src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py
Outdated
Show resolved
Hide resolved
| @@ -0,0 +1,173 @@ | |||
| import json | |||
| import logging | |||
There was a problem hiding this comment.
Unused import, please remove it
| """ | ||
| Maps a base file path to the corresponding tokenized data path. | ||
| Args: | ||
| base_file_path (str): The path of the base file. |
There was a problem hiding this comment.
Please update the docstrings to reflect the new changes
| _TOKENIZER_CACHE: dict[str, Any] = {} | ||
|
|
||
| HEADER_SIZE = 64 # Mimics EmbeddedStreamData.HEADER_SIZE_IN_BYTES (simplified for tests) | ||
| DATA_SECTION_LEN_BYTES = 8 |
There was a problem hiding this comment.
Unsed constants DATA_SECTION_LEN_BYTES and TOKEN_SIZE_DESC_LEN_BYTES
| from modalities.dataloader.filter_packed_data import filter_dataset | ||
| except ImportError: | ||
| logging.error("The filtering pipeline requires the 'modalities' package to be installed.") | ||
| exit(1) |
There was a problem hiding this comment.
using exit(1) is not ideal , i would say something like
try:
from modalities.dataloader.filter_packed_data import filter_dataset
except ImportError as exc:
raise ImportError(
"The filtering pipeline requires the optional dependency 'modalities'. "
"Install it via `pip install modalities` and try again."
) from exc
would be better
| """ | ||
|
|
||
| name = "ScoresParser" | ||
| # type = "Parser" |
There was a problem hiding this comment.
Please remove this line altogether
src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py
Show resolved
Hide resolved
AbasKhan
left a comment
There was a problem hiding this comment.
Apart from a minor change , rest looks really good . Well done
|
|
||
| sbatch_args = values.get("sbatch_args") or {} | ||
| if isinstance(sbatch_args, _DictConfig): | ||
| sbatch_args = OmegaConf.to_container(sbatch_args, resolve=True) # type: ignore[arg-type] |
There was a problem hiding this comment.
Hmmm ? from omegaconf import DictConfig as _DictConfig does not import OmegaConf. I am not sure why the code is not throwing an error here, from omegaconf import DictConfig as _DictConfig, OmegaConf should be the way
| ] | ||
| return pipeline | ||
|
|
||
| if __name__ == "__main__": |
There was a problem hiding this comment.
Do we need this here ?, I think we should have a entry point in main.py rather
AbasKhan
left a comment
There was a problem hiding this comment.
I think we can merge it. But, I would suggest to add Mehdi or Max and second reviewer