Skip to content

Data

Main entrypoint for data download and management.

available_datasets()

Lists all available datasets.

Returns:

Type Description
list[str]

A list containing the names of all datasets included in the

list[str]

package.

Source code in meld/data.py
def available_datasets() -> list[str]:
    """
    Lists all available datasets.

    Returns:
        A list containing the names of all datasets included in the
        package.
    """

    return sorted(load_manifest().keys())

bibliography_entries(datasets=None)

Collects a list of bibliography entries as bibtex strings for the given datasets or MELD.

Parameters:

Name Type Description Default
datasets list[str] | None

A list of datasets to collect bibliography entries for or None to return all dataset bibliography entries.

None

Returns:

Type Description
list[str]

A list of bibtex strings. If None, bibliography entries are returned for all datasets, if the passed list is empty,

list[str]

only the MELD entry is returned, otherwise all entries for the given list of datasets are returned in order

Source code in meld/data.py
def bibliography_entries(datasets: list[str] | None = None) -> list[str]:
    """
    Collects a list of bibliography entries as bibtex strings for the given datasets or MELD.

    Args:
        datasets: A list of datasets to collect bibliography entries for or `None` to return all dataset bibliography entries.

    Returns:
        A list of bibtex strings. If `None`, bibliography entries are returned for all datasets, if the passed list is empty,
        only the MELD entry is returned, otherwise all entries for the given list of datasets are returned in order
    """

    bibliography = bibtexparser.parse_string(  # pyright: ignore
        resources.files("meld.package_data").joinpath("dataset_references.bib").read_text()
    )
    manifest = load_manifest()

    if datasets is None:
        meld_citekey = _raise_if_none(bibliography.entries_dict[_MELD_CITEKEY].raw)
        bibliography.remove(bibliography.entries_dict[_MELD_CITEKEY])
        return [meld_citekey] + [_raise_if_none(entry.raw) for entry in bibliography.entries]

    if not datasets:
        return [
            _raise_if_none(bibliography.entries_dict[_MELD_CITEKEY].raw),
            f"% When using the PhoNER COVID19 dataset, also cite:\n{_raise_if_none(bibliography.entries_dict[_DEFAULT_CITEKEY].raw)}",
        ]

    bibtex = []
    for dataset in datasets:
        bibtex.extend(
            _raise_if_none(bibliography.entries_dict[cite_key].raw) for cite_key in manifest[dataset].citekeys
        )

    return bibtex

compute_word_counts(data_directory, output, append=False, workers=12)

Computes word counts using a word tokenizer for each dataset split and writes statistics to a parquet file.

Parameters:

Name Type Description Default
data_directory Path

Directory containing processed benchmark datasets.

required
output Path

Path to output parquet file for word count statistics.

required
append bool

Whether to append to an existing output file instead of overwriting.

False
workers int

Number of workers to use for parallel word tokenization. Note that a high worker count will increase memory consumption substantially for some tokenizers

12
Source code in meld/data.py
def compute_word_counts(data_directory: Path, output: Path, append: bool = False, workers: int = 12) -> None:
    """
    Computes word counts using a word tokenizer for each dataset split and writes statistics to a parquet file.

    Args:
        data_directory: Directory containing processed benchmark
            datasets.
        output: Path to output parquet file for word count statistics.
        append: Whether to append to an existing output file instead of
            overwriting.
        workers: Number of workers to use for parallel word tokenization.
            Note that a high worker count will increase memory consumption substantially for some tokenizers
    """

    if append:
        previous_data = pl.read_parquet(output)
        processed_datasets = set(previous_data.select(pl.col("dataset_name").unique()).to_series())
        statistics = iter(
            (
                stats
                for dataset in local_datasets(data_directory)
                if dataset.metadata.name not in processed_datasets
                for stats in data_stats.word_tokenize_dataset(dataset, workers)
            ),
        )

    else:
        previous_data = None
        statistics = iter(
            stats
            for dataset in local_datasets(data_directory)
            for stats in data_stats.word_tokenize_dataset(dataset, workers)
        )

    first_row = next(statistics)
    first_sample = dataclasses.asdict(first_row)
    first_batch = pa.RecordBatch.from_pylist([first_sample])
    schema = first_batch.schema
    subset_field = schema.get_field_index("subset_hierarchy")
    schema = schema.set(subset_field, schema.field(subset_field).with_type(pa.list_(pa.string())))

    first_batch = first_batch.cast(schema)

    batch = [first_sample]

    with ParquetWriter(output, schema=schema, compression="zstd") as writer, logging_redirect_tqdm():
        if previous_data is not None:
            writer.write_table(previous_data.to_arrow().cast(schema))

        for split_statistics in tqdm(statistics):
            batch.append(dataclasses.asdict(split_statistics))
            if len(batch) >= _WORD_COUNT_BATCH_SIZE:
                writer.write_batch(pa.RecordBatch.from_pylist(batch, schema=schema))
                batch = []

        # Write final batch
        if batch:
            writer.write_batch(pa.RecordBatch.from_pylist(batch, schema=schema))

download(data_directory, datasets=None, force_reprocess=False, meld_open_repo='kgnlp/meld-open', sentence_span_path=None)

Downloads NER datasets and processes them into the standardized benchmark format in the specified directory.

Parameters:

Name Type Description Default
data_directory Path

Directory where the datasets will be stored.

required
datasets Sequence[str] | None

List of dataset names and/or profiles to download. Dataset profiles and names may be mixed. For example, ["meld:open", "CoNLL-2003"] will download all datasets in the "meld:open" list and "CoNLL-2003". If None, all available datasets will be downloaded.

None
force_reprocess bool

Whether to reprocess the datasets even if they are already processed on disk.

False
meld_open_repo str | None

Repository ID on Huggingface Hub or path to preprocessed datasets in MELD format which will be loaded directly, bypassing processing from source for these datasets. If set to None, all datasets will be downloaded and processed from their original source.

'kgnlp/meld-open'
sentence_span_path Path | None

Reproduces the sentence tokenization bundled with the package and stores spans for each sentence in the given directory. Intended for full reproducibility and addition of new datasets.

None
Source code in meld/data.py
def download(
    data_directory: Path,
    datasets: Sequence[str] | None = None,
    force_reprocess: bool = False,
    meld_open_repo: str | None = "kgnlp/meld-open",
    sentence_span_path: Path | None = None,
) -> None:
    """
    Downloads NER datasets and processes them into the standardized benchmark format in the specified directory.

    Args:
        data_directory: Directory where the datasets will be stored.
        datasets: List of dataset names and/or profiles to download. Dataset profiles and names may be mixed. For example, ["meld:open", "CoNLL-2003"] will download all datasets in the "meld:open" list and "CoNLL-2003". If `None`, all available datasets will be downloaded.

        force_reprocess: Whether to reprocess the datasets even if they are already processed on disk.
        meld_open_repo: Repository ID on Huggingface Hub or path to preprocessed datasets in MELD format which will be loaded directly, bypassing processing from source for these datasets.
            If set to `None`, all datasets will be downloaded and processed from their original source.
        sentence_span_path: Reproduces the sentence tokenization bundled
            with the package and stores spans for each sentence in the
            given directory. Intended for full reproducibility and
            addition of new datasets.
    """
    manifest = load_manifest()
    requested_datasets = _resolve_profiles(datasets, manifest)

    # Resolve and create the cache directory if it doesn't already exist
    data_directory = data_directory.resolve()
    data_directory.mkdir(exist_ok=True)
    shared_intermediate_directory = data_directory / ".shared_download"

    preprocessed_datasets = set()
    if meld_open_repo is not None:
        _download_preprocessed(data_directory, meld_open_repo, requested_datasets)

    # Construct all data pipes in advance
    logger.info("Constructing data pipes")
    data_pipes = [
        (name, dataset_pipe(dataset), dataset.use_shared_cache)
        for name, dataset in manifest.items()
        if name in requested_datasets and name not in preprocessed_datasets
    ]
    logger.info(f'Downloading benchmark datasets to "{data_directory}"')

    if sentence_span_path is not None:
        sentence_span_path.mkdir(exist_ok=True)

    with logging_redirect_tqdm():
        for name, data_pipe, shared_cache_directory in data_pipes:
            processed_path = data_directory / PROCESSED_DIRECTORY / name
            if not force_reprocess and (processed_path / METADATA_FILENAME).exists():
                logger.info(f"{name} was already processed, skipping")
                continue

            logger.info(f"Processing {name}")
            (data_directory / PROCESSED_DIRECTORY).mkdir(exist_ok=True)

            if shared_cache_directory is None:
                intermediate_directory = data_directory / _TEMP_DIRECTORY / name
            else:
                shared_intermediate_directory.mkdir(exist_ok=True)
                intermediate_directory = shared_intermediate_directory / shared_cache_directory

            data_pipe.run(name, intermediate_directory, processed_path, sentence_span_path)

main(args=None)

Main entry point for the MELD data management CLI.

Parameters:

Name Type Description Default
args Sequence[str] | None

Command line arguments. If None, arguments are parsed from sys.argv.

None
Source code in meld/data.py
def main(args: Sequence[str] | None = None) -> None:
    """
    Main entry point for the MELD data management CLI.

    Args:
        args: Command line arguments. If None, arguments are parsed from
            `sys.argv`.
    """

    if args is None:
        args = sys.argv[1:]

    arguments = _argument_parser().parse_args(args)
    log_levels = logging.getLevelNamesMapping()

    match arguments.mode:
        case "list":
            for dataset in available_datasets():
                print(dataset)
        case "cite":
            for entry in bibliography_entries(None if arguments.all else (arguments.datasets or [])):
                print(entry)
        case "download":
            logger.setLevel(log_levels[arguments.log_level])
            download(
                arguments.data_directory,
                arguments.datasets,
                arguments.force,
                arguments.meld_repo,
                arguments.sentence_span_path,
            )
        case "sample":
            sample_data(
                arguments.data_directory,
                arguments.language,
                arguments.subset_size,
                arguments.output,
                arguments.split,
                arguments.label_config,
                arguments.merge_documents,
                arguments.keep_documents_without_entities,
                arguments.keep_discontinuous_spans,
                arguments.merging_max_tokens,
                arguments.tokenizer,
            )
        case "merge":
            merge_data(arguments.data_directory, arguments.output, arguments.label_config, arguments.merge_documents)
        case "count-words":
            logger.setLevel(log_levels[arguments.log_level])
            compute_word_counts(arguments.data_directory, arguments.output, arguments.append, arguments.workers)
        case "hf":
            match arguments.normalize_labels:
                case "":
                    label_mapping = load_label_map()
                case None:
                    label_mapping = None
                case _:
                    label_mapping = load_label_map(arguments.normalize_labels)

            included_datasets = _resolve_profiles(arguments.datasets)

            with open(arguments.dataset_card_template) as file:
                dataset_card_template = file.read()

            convert_to_hf(
                arguments.meld_data_path,
                arguments.output_data_path,
                None if arguments.datasets is None else dataset_filter(included_datasets),
                label_mapping,
                dataset_card_template,
                arguments.pretty_name,
                arguments.extra_files,
            )

merge_data(data_directory, output=None, label_config=None, merge_documents=False)

Merges data from multiple datasets into a single parquet output.

Parameters:

Name Type Description Default
data_directory Path

Directory containing processed benchmark datasets.

required
output Path | IO[bytes] | None

Output path for merged data, or stdout if None.

None
label_config dict[str, str] | None

Configuration mapping dataset names to their tagsets for multi-tagset datasets.

None
merge_documents bool

Whether to merge multiple sentences/paragraphs into single documents.

False
Source code in meld/data.py
def merge_data(
    data_directory: Path,
    output: Path | IO[bytes] | None = None,
    label_config: dict[str, str] | None = None,
    merge_documents: bool = False,
) -> None:
    """
    Merges data from multiple datasets into a single parquet output.

    Args:
        data_directory: Directory containing processed benchmark
            datasets.
        output: Output path for merged data, or stdout if None.
        label_config: Configuration mapping dataset names to their
            tagsets for multi-tagset datasets.
        merge_documents: Whether to merge multiple sentences/paragraphs
            into single documents.
    """

    data_splits: list[pl.LazyFrame] = []

    for dataset in formats.local_datasets(data_directory):
        dataset_name = dataset.metadata.name
        for subset in dataset:
            for split in subset.splits:
                tagsets = subset.metadata.tagsets
                columns_to_rename = {}

                if label_config is None:
                    columns_to_remove = tagsets
                elif len(tagsets) > 1:
                    try:
                        selected_tagset = label_config[dataset_name]
                    except KeyError:
                        raise KeyError(
                            f"No tagset specified for {dataset_name}, which contains multiple tagsets ({tagsets})"
                        )
                    columns_to_remove = [tagset for tagset in tagsets if tagset != selected_tagset]
                    columns_to_rename[selected_tagset] = DEFAULT_TAGSET
                    if subset.metadata.pre_tokenized:
                        columns_to_rename[f"{selected_tagset}_iob"] = f"{DEFAULT_TAGSET}_iob"
                else:
                    columns_to_remove = []

                reorder_tokens = False
                if subset.metadata.pre_tokenized:
                    columns_to_add = {}
                    if columns_to_remove:
                        columns_to_remove = columns_to_remove.copy()
                        # Store in a list first to avoid recursion
                        columns_to_remove.extend([f"{tagset}_iob" for tagset in columns_to_remove])
                else:
                    columns_to_add = {"tokens": pl.lit(None, pl.List(pl.String()))}
                    if label_config is not None:
                        reorder_tokens = True
                        columns_to_add[f"{DEFAULT_TAGSET}_iob"] = pl.lit(None, pl.List(pl.String()))

                data = (
                    subset.scan_split(split)
                    .with_columns(
                        **columns_to_add,
                        dataset=pl.lit(dataset_name),
                        subset=pl.lit("/".join(subset.hierarchy)),
                        split=pl.lit(split),
                        language=pl.lit(subset.metadata.language),
                    )
                    .drop(*columns_to_remove)
                    .rename(columns_to_rename)
                )

                # Note: Column order matters for strict concatenation in polars. Therefore, the "tokens" and "ner" tagset column are swapped if empty tokens columns are inserted
                if reorder_tokens:
                    columns = data.collect_schema().names()
                    tokens_index = columns.index("tokens")
                    columns[tokens_index - 1 : tokens_index + 1] = columns[tokens_index : tokens_index - 2 : -1]
                    data = data.select(*columns)

                if merge_documents:
                    data = formats._merge_documents(data).with_columns(
                        dataset=pl.lit(dataset_name),
                        subset=pl.lit("/".join(subset.hierarchy)),
                        split=pl.lit(split),
                        language=pl.lit(subset.metadata.language),
                    )

                # Collect each split directly to avoid memory spikes
                data_splits.append(data)

    # Limited chunk size to reduce memory usage
    with pl.Config(streaming_chunk_size=512):
        pl.concat(data_splits).sink_parquet(sys.stdout.buffer if output is None else output)

sample_data(data_directory, language, subset_size, output=None, split='train', tagset_config=None, merge_documents=False, keep_documents_without_entities=True, keep_discontinuous_spans=False, target_num_tokens=None, aggregation_tokenizer='google/gemma-3-27b-it')

Samples and processes data from a specified directory.

Parameters:

Name Type Description Default
data_directory Path

The path to the directory containing the benchmark data.

required
language str

The ISO 639-3 code of the target language to sample.

required
subset_size int

The number of samples to extract per dataset.

required
output Path | IO[bytes] | None

The destination for the output, either a file path or a writable IO object. Defaults to standard output.

None
split str

The dataset split to process, e.g., "train", "validation"

'train'
tagset_config dict[str, str] | None

Indicates which tagset to use for datasets with multiple tag sets for each sample. E.g. {"Few-NERD": "fine"} selects fine-grained tags from the Few-NERD dataset. This parameter is required if a dataset with multiple tagsets is encountered during sampling with the given configuration.

None
merge_documents bool

Whether to merge documents consisting of multiple sentences or paragraphs into a single sample.

False
keep_documents_without_entities bool

Whether to keep documents without entities.

True
keep_discontinuous_spans bool

Whether to keep discontinuous spans. By default, only continuous spans are kept and flattened into simplified span annotations.

False
target_num_tokens int | None

If merge_documents is true, attempts to merge sentences or passages into documents only if the given number of tokens is not exceeded.

None
aggregation_tokenizer str

Tokenizer used for counting tokens if target_num_tokens is set.

'google/gemma-3-27b-it'
Source code in meld/data.py
def sample_data(
    data_directory: Path,
    language: str,
    subset_size: int,
    output: Path | IO[bytes] | None = None,
    split: str = "train",
    tagset_config: dict[str, str] | None = None,
    merge_documents: bool = False,
    keep_documents_without_entities: bool = True,
    keep_discontinuous_spans: bool = False,
    target_num_tokens: int | None = None,
    aggregation_tokenizer: str = "google/gemma-3-27b-it",
) -> None:
    """
    Samples and processes data from a specified directory.

    Args:
        data_directory: The path to the directory containing the
            benchmark data.
        language: The ISO 639-3 code of the target language to sample.
        subset_size: The number of samples to extract per dataset.
        output: The destination for the output, either a file path or a
            writable IO object. Defaults to standard output.
        split: The dataset split to process, e.g., "train", "validation"
        tagset_config: Indicates which tagset to use for datasets with
            multiple tag sets for each sample. E.g. `{"Few-NERD":
            "fine"}` selects fine-grained tags from the `Few-NERD`
            dataset. This parameter is required if a dataset with
            multiple tagsets is encountered during sampling with the
            given configuration.
        merge_documents: Whether to merge documents consisting of
            multiple sentences or paragraphs into a single sample.
        keep_documents_without_entities: Whether to keep documents
            without entities.
        keep_discontinuous_spans: Whether to keep discontinuous spans.
            By default, only continuous spans are kept and flattened
            into simplified span annotations.
        target_num_tokens: If `merge_documents` is true, attempts to
            merge sentences or passages into documents only if the given
            number of tokens is not exceeded.
        aggregation_tokenizer: Tokenizer used for counting tokens if
            `target_num_tokens` is set.
    """

    sample = read_monolingual_sample(
        data_directory,
        language,
        subset_size,
        split,
        tagset_config,
        merge_documents,
        keep_documents_without_entities,
        target_num_tokens=target_num_tokens,
        aggregation_tokenizer=aggregation_tokenizer,
    )

    if not keep_discontinuous_spans:
        sample = drop_discontinuous_spans(sample)

    sample.sink_parquet(sys.stdout.buffer if output is None else output)