Skip to content

Tokenization

Text detokenization, token alignment and sentence splitting.

Detokenizer

Detokenizes tokenized text into a string with aligned entity spans.

Handles language-specific detokenization rules including character-level languages (Chinese, Japanese, etc.) and various special modes like WikiANN hash replacement. Currently text in most language is detokenized by simply joining tokens by whitespace.

Parameters:

Name Type Description Default
language str

The language of the tokens for selecting the appropriate detokenization strategy.

required
detokenizer_type DetokenizerType

The type of detokenization to perform. Options: - "whitespace" to join by whitespace - "concatenate" to concatenate tokens without a delimiter - "wikiann" for WikiANN specific preprocessing

'whitespace'
Source code in meld/tokenization.py
class Detokenizer:
    """
    Detokenizes tokenized text into a string with aligned entity spans.

    Handles language-specific detokenization rules including character-level languages
    (Chinese, Japanese, etc.) and various special modes like WikiANN hash replacement.
    Currently text in most language is detokenized by simply joining tokens by whitespace.

    Args:
        language: The language of the tokens for selecting the
            appropriate detokenization strategy.
        detokenizer_type: The type of detokenization to perform. Options:
            - `"whitespace"` to join by whitespace
            - `"concatenate"` to concatenate tokens without a delimiter
            - `"wikiann"` for WikiANN specific preprocessing
    """

    def __init__(self, language: str, detokenizer_type: DetokenizerType = "whitespace") -> None:
        self.language = language
        self.separator = "" if detokenizer_type == "concatenate" or language in _CHARACTER_LEVEL else " "
        self.detokenizer_type = detokenizer_type
        self._wikiann_hash_replacement = detokenizer_type == "wikiann" and language in _WIKIANN_HASHES

    def _whitespace_detokenize(self, tokens: list[str]) -> tuple[str, list[Span]]:
        """
        Detokenize tokens by simple joining with whitespaces.

        Args:
            tokens: The list of tokens to detokenize.

        Returns:
            A tuple of the detokenized text and corresponding spans of
            the tokens in the detokenized text.
        """

        delimiter_length = len(self.separator)
        text = self.separator.join(tokens)
        start = 0
        spans = []
        for length in map(len, tokens):
            spans.append(Span(start, start + length))
            # Length + delimiter
            start += length + delimiter_length

        return text, spans

    def _preprocess_tokens(self, tokens: list[str]) -> list[str]:
        """
        Preprocesses tokens before detokenization. Currently only handles WikiANN hash replacement, if enabled

        Args:
            tokens: List of tokens to preprocess.

        Returns:
            Preprocessed tokens.
        """

        # Replace hash tokens in WikiANN which should be spaces
        if self._wikiann_hash_replacement:
            return [" " if token == "#" else token for token in tokens]

        return tokens

    def detokenize(self, tokens: list[str]) -> tuple[str, list[Span]]:
        """
        Detokenize a list of tokens into text with aligned spans.

        Args:
            tokens: List of tokens to detokenize.

        Returns:
            The detokenized text and spans for each token within it.
        """

        tokens = self._preprocess_tokens(tokens)
        return self._whitespace_detokenize(tokens)

    def detokenize_bio(
        self, document: list[LabeledTokens], original_text: list[str] | None = None
    ) -> list[LabeledText]:
        """
        Detokenize a BIO-formatted document. If an `original_text` is given, the detokenizer will simply align tokens with the given source text.

        Args:
            document: The `LabeledTokens` to detokenize.
            original_text: Optional original text for alignment.

        Returns:
            A list of `LabeledText` objects with detokenized text and
            spans.

        Raises:
            ValueError: If tokens cannot be aligned and fallback to
                whitespace tokenization fails.
        """

        detokenized = []
        for i, sentence in enumerate(document):
            sentence.tokens = self._preprocess_tokens(sentence.tokens)

            if original_text is None:
                text, sentence_offsets = self._whitespace_detokenize(sentence.tokens)
            else:
                text = original_text[i]
                try:
                    sentence_offsets = align_tokens_with_text(sentence.tokens, text)
                except ValueError as error:
                    logger.warning(
                        f"Could not align text with tokens, falling back to whitespace tokenization: {error}"
                    )
                    text, sentence_offsets = self._whitespace_detokenize(sentence.tokens)

            detokenized.append(
                LabeledText(
                    text,
                    {tagset: bio_to_spans(labels, sentence_offsets) for tagset, labels in sentence.labels.items()},
                )
            )

        return detokenized

    def tokens_to_document(
        self, labeled_tokens: list[LabeledTokens], original_text: list[str] | None = None
    ) -> NERDocument:
        """
        Convert labeled tokens to a `NERDocument`.

        Args:
            labeled_tokens: The list of LabeledTokens to convert.
            original_text: Optional original text for alignment.

        Returns:
            A `NERDocument` containing both the labeled tokens and
            detokenized text with aligned entity spans.
        """

        return NERDocument(self.detokenize_bio(labeled_tokens, original_text), labeled_tokens)

detokenize(tokens)

Detokenize a list of tokens into text with aligned spans.

Parameters:

Name Type Description Default
tokens list[str]

List of tokens to detokenize.

required

Returns:

Type Description
tuple[str, list[Span]]

The detokenized text and spans for each token within it.

Source code in meld/tokenization.py
def detokenize(self, tokens: list[str]) -> tuple[str, list[Span]]:
    """
    Detokenize a list of tokens into text with aligned spans.

    Args:
        tokens: List of tokens to detokenize.

    Returns:
        The detokenized text and spans for each token within it.
    """

    tokens = self._preprocess_tokens(tokens)
    return self._whitespace_detokenize(tokens)

detokenize_bio(document, original_text=None)

Detokenize a BIO-formatted document. If an original_text is given, the detokenizer will simply align tokens with the given source text.

Parameters:

Name Type Description Default
document list[LabeledTokens]

The LabeledTokens to detokenize.

required
original_text list[str] | None

Optional original text for alignment.

None

Returns:

Type Description
list[LabeledText]

A list of LabeledText objects with detokenized text and

list[LabeledText]

spans.

Raises:

Type Description
ValueError

If tokens cannot be aligned and fallback to whitespace tokenization fails.

Source code in meld/tokenization.py
def detokenize_bio(
    self, document: list[LabeledTokens], original_text: list[str] | None = None
) -> list[LabeledText]:
    """
    Detokenize a BIO-formatted document. If an `original_text` is given, the detokenizer will simply align tokens with the given source text.

    Args:
        document: The `LabeledTokens` to detokenize.
        original_text: Optional original text for alignment.

    Returns:
        A list of `LabeledText` objects with detokenized text and
        spans.

    Raises:
        ValueError: If tokens cannot be aligned and fallback to
            whitespace tokenization fails.
    """

    detokenized = []
    for i, sentence in enumerate(document):
        sentence.tokens = self._preprocess_tokens(sentence.tokens)

        if original_text is None:
            text, sentence_offsets = self._whitespace_detokenize(sentence.tokens)
        else:
            text = original_text[i]
            try:
                sentence_offsets = align_tokens_with_text(sentence.tokens, text)
            except ValueError as error:
                logger.warning(
                    f"Could not align text with tokens, falling back to whitespace tokenization: {error}"
                )
                text, sentence_offsets = self._whitespace_detokenize(sentence.tokens)

        detokenized.append(
            LabeledText(
                text,
                {tagset: bio_to_spans(labels, sentence_offsets) for tagset, labels in sentence.labels.items()},
            )
        )

    return detokenized

tokens_to_document(labeled_tokens, original_text=None)

Convert labeled tokens to a NERDocument.

Parameters:

Name Type Description Default
labeled_tokens list[LabeledTokens]

The list of LabeledTokens to convert.

required
original_text list[str] | None

Optional original text for alignment.

None

Returns:

Type Description
NERDocument

A NERDocument containing both the labeled tokens and

NERDocument

detokenized text with aligned entity spans.

Source code in meld/tokenization.py
def tokens_to_document(
    self, labeled_tokens: list[LabeledTokens], original_text: list[str] | None = None
) -> NERDocument:
    """
    Convert labeled tokens to a `NERDocument`.

    Args:
        labeled_tokens: The list of LabeledTokens to convert.
        original_text: Optional original text for alignment.

    Returns:
        A `NERDocument` containing both the labeled tokens and
        detokenized text with aligned entity spans.
    """

    return NERDocument(self.detokenize_bio(labeled_tokens, original_text), labeled_tokens)

SentenceSplitter

Sentence tokenizer that splits documents into sentences using either a Segment Any Text (SAT) [1] model or pre-computed sentence boundaries from Parquet files.

Requires the optional wtpsplit dependency to be installed unless read_spans is True (e.g. by installing with the "sentence-segmentation" extra enabled).

References

[1] Markus Frohmann, Igor Sterner, Ivan Vulić, Benjamin Minixhofer, and Markus Schedl. 2024. Segment Any Text: A Universal Approach for Robust, Efficient and Adaptable Sentence Segmentation. In Yaser Al-Onaizan, Mohit Bansal, and Yun-Nung Chen, editors, Proceedings of the 2024 Conference on Empirical Methods in Natural Language Processing, pages 11908–11941, Miami, Florida, USA, November. Association for Computational Linguistics.

Parameters:

Name Type Description Default
sentence_boundaries SentenceBoundaryType

Type of sentence boundaries.

required
sentence_span_file Path | Traversable | None

Optional path to a Parquet file with pre-computed sentence spans used if read_spans is True.

None
sat_model str

Model name for wtpsplit for segmenting text if read_spans is False.

'sat-12l-sm'
read_spans bool

Whether to read from pre-computed span file instead of splitting.

True

Raises:

Type Description
ValueError

When attempting to tokenize when full sentence boundaries already exist (sentence_boundaries is set to "full").

ValueError

When read_spans is set to True but no sentence_span_file is provided.

ImportError

If the optional wtpsplit dependency is not installed.

Source code in meld/tokenization.py
class SentenceSplitter:
    """
    Sentence tokenizer that splits documents into sentences using either a Segment Any Text (SAT) [1] model
    or pre-computed sentence boundaries from Parquet files.

    Requires the optional `wtpsplit` dependency to be installed unless `read_spans` is `True` (e.g. by installing with the "sentence-segmentation" extra enabled).

    # References

    [1] Markus Frohmann, Igor Sterner, Ivan Vulić, Benjamin Minixhofer, and Markus Schedl. 2024. [Segment Any Text: A Universal Approach for Robust, Efficient and Adaptable Sentence Segmentation](https://aclanthology.org/2024.emnlp-main.665/). In Yaser Al-Onaizan, Mohit Bansal, and Yun-Nung Chen, editors, Proceedings of the 2024 Conference on Empirical Methods in Natural Language Processing, pages 11908–11941, Miami, Florida, USA, November. Association for Computational Linguistics.

    Args:
        sentence_boundaries: Type of sentence boundaries.
        sentence_span_file: Optional path to a Parquet file with
            pre-computed sentence spans used if `read_spans` is
            `True`.
        sat_model: Model name for `wtpsplit` for segmenting text if
            `read_spans` is `False`.
        read_spans: Whether to read from pre-computed span file
            instead of splitting.

    Raises:
        ValueError: When attempting to tokenize when full sentence
            boundaries already exist (`sentence_boundaries` is set
            to "full").
        ValueError: When `read_spans` is set to `True` but no
            `sentence_span_file` is provided.
        ImportError: If the optional `wtpsplit` dependency is not
            installed.
    """

    _BUFFER_SIZE = 256

    def __init__(
        self,
        sentence_boundaries: SentenceBoundaryType,
        sentence_span_file: Path | Traversable | None = None,
        sat_model: str = "sat-12l-sm",
        read_spans: bool = True,
    ) -> None:
        if sentence_boundaries == "full":
            raise ValueError("Sentence tokenization only supports splits that are not already segmented into sentences")

        self._split_prefix = None if sentence_span_file is None else sentence_span_file.name.encode()
        self._sentence_span_path = sentence_span_file
        self._sentence_span_file = None
        self._read_spans = read_spans

        self._boundaries = None
        self._writer = None

        self._sequence_index = 0

        if read_spans:
            if self._sentence_span_path is None:
                raise ValueError("`sentence_span_file` must not be `None` if `read_spans` is `True`")

            self._sat = None
            return

        if SaT is None or torch is None:
            raise ImportError('Sentence splitting requires optional dependency "wtpsplit" to be installed')

        self._sat = SaT(sat_model)
        if torch.cuda.is_available():
            self._sat.to("cuda")

        self._parquet_batch = {
            "sequence_id": [],
            "sentence_offsets": [],
        }

    def __enter__(self) -> Self:
        if self._sentence_span_path is not None:
            if self._read_spans:
                self._sentence_span_file = self._sentence_span_path.open("rb")
                self._boundaries = parquet.read_table(self._sentence_span_file)
            else:
                if not isinstance(self._sentence_span_path, Path):
                    raise ValueError("sentence_span_path must be a Path object for writing")
                self._sentence_span_file = self._sentence_span_path.open("wb")
                self._writer = ParquetWriter(self._sentence_span_file, SENTENCE_SPLIT_SCHEMA, compression="zstd")

        return self

    def __exit__(
        self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None
    ) -> None:
        # Write final batch if necessary
        if self._writer is not None and self._parquet_batch["sequence_id"]:
            self._writer.write_batch(RecordBatch.from_pydict(self._parquet_batch, SENTENCE_SPLIT_SCHEMA))
        if self._sentence_span_file is not None:
            if self._writer is not None:
                self._writer.close()
            self._sentence_span_file.close()

    def _current_uuid(self) -> bytes:
        """
        Generate a reproducible UUID v5 for the current sequence.

        Returns:
            The generated UUID as bytes.
        """

        if self._split_prefix is None:
            raise ValueError("A split prefix needs to be defined for gerating UUIDs for the sample")

        return uuid.uuid5(
            _SENTENCE_SPLIT_UUID_NAMESPACE, self._split_prefix + self._sequence_index.to_bytes(64, "little")
        ).bytes

    def sentence_tokenize(self, document: NERDocument) -> NERDocument:
        """
        Tokenize the document into sentences and update its spans accordingly.

        Args:
            document: The document to sentence-tokenize.

        Returns:
            The document with spans split into sentences.
        """

        if self._sat is None:
            assert self._boundaries is not None
            end_index = self._sequence_index + len(document.spans)
            offsets = self._boundaries.column("sentence_offsets")[self._sequence_index : end_index]
            document, _ = _split_document(
                document,
                [
                    [Span(*sentence_span.values()) for sentence_span in sentence_offsets.as_py()]
                    for sentence_offsets in offsets
                ],
            )
            self._sequence_index = end_index
            return document

        segment_ids = self._parquet_batch["sequence_id"]
        sentence_offsets = self._parquet_batch["sentence_offsets"]

        offsets = []

        segments = [annotated_text.text for annotated_text in document.spans]

        for sentences, segment in zip(
            self._sat.split(segments, strip_whitespace=True, split_on_input_newlines=False), segments
        ):
            assert segment is not None, "Text field must not be None"
            offsets.append(align_tokens_with_text(sentences, segment))
            self._sequence_index += 1

        document, offsets = _split_document(document, offsets)
        if self._writer is None:
            return document

        sentence_offsets.extend([list(map(dataclasses.asdict, spans)) for spans in offsets])
        segment_ids.extend([self._current_uuid()] * len(offsets))

        if len(sentence_offsets) > self._BUFFER_SIZE:
            if self._writer is not None:
                self._writer.write_batch(RecordBatch.from_pydict(self._parquet_batch, SENTENCE_SPLIT_SCHEMA))

            self._parquet_batch = {
                "sequence_id": [],
                "sentence_offsets": [],
            }

        return document

sentence_tokenize(document)

Tokenize the document into sentences and update its spans accordingly.

Parameters:

Name Type Description Default
document NERDocument

The document to sentence-tokenize.

required

Returns:

Type Description
NERDocument

The document with spans split into sentences.

Source code in meld/tokenization.py
def sentence_tokenize(self, document: NERDocument) -> NERDocument:
    """
    Tokenize the document into sentences and update its spans accordingly.

    Args:
        document: The document to sentence-tokenize.

    Returns:
        The document with spans split into sentences.
    """

    if self._sat is None:
        assert self._boundaries is not None
        end_index = self._sequence_index + len(document.spans)
        offsets = self._boundaries.column("sentence_offsets")[self._sequence_index : end_index]
        document, _ = _split_document(
            document,
            [
                [Span(*sentence_span.values()) for sentence_span in sentence_offsets.as_py()]
                for sentence_offsets in offsets
            ],
        )
        self._sequence_index = end_index
        return document

    segment_ids = self._parquet_batch["sequence_id"]
    sentence_offsets = self._parquet_batch["sentence_offsets"]

    offsets = []

    segments = [annotated_text.text for annotated_text in document.spans]

    for sentences, segment in zip(
        self._sat.split(segments, strip_whitespace=True, split_on_input_newlines=False), segments
    ):
        assert segment is not None, "Text field must not be None"
        offsets.append(align_tokens_with_text(sentences, segment))
        self._sequence_index += 1

    document, offsets = _split_document(document, offsets)
    if self._writer is None:
        return document

    sentence_offsets.extend([list(map(dataclasses.asdict, spans)) for spans in offsets])
    segment_ids.extend([self._current_uuid()] * len(offsets))

    if len(sentence_offsets) > self._BUFFER_SIZE:
        if self._writer is not None:
            self._writer.write_batch(RecordBatch.from_pydict(self._parquet_batch, SENTENCE_SPLIT_SCHEMA))

        self._parquet_batch = {
            "sequence_id": [],
            "sentence_offsets": [],
        }

    return document

align_tokens_with_text(tokens, text)

Align token spans with the original text.

Parameters:

Name Type Description Default
tokens list[str]

List of tokens to align.

required
text str

The original text to align tokens against.

required

Returns:

Type Description
list[Span]

List of spans of each token's position in the text.

Raises:

Type Description
ValueError

If tokens cannot be aligned with the text.

Source code in meld/tokenization.py
def align_tokens_with_text(tokens: list[str], text: str) -> list[Span]:
    """
    Align token spans with the original text.

    Args:
        tokens: List of tokens to align.
        text: The original text to align tokens against.

    Returns:
        List of spans of each token's position in the text.

    Raises:
        ValueError: If tokens cannot be aligned with the text.
    """

    spans = []
    position = 0
    for token in tokens:
        # Consume whitespace between tokens
        while text[position].isspace():
            position += 1

        if not text[position : position + len(token)] == token:
            raise ValueError(f"Failed to align tokens {tokens} with text {text!r}")
        end = position + len(token)
        spans.append(Span(position, end))
        position = end

    return spans

bio_to_spans(bio_labels, token_spans)

Convert BIO-formatted labels to annotated spans based on their position in the untokenized document.

Parameters:

Name Type Description Default
bio_labels list[BIO]

List of BIO tags.

required
token_spans list[Span]

List of token spans corresponding to the BIO labels.

required

Returns:

Type Description
list[Annotation]

List of annotations with labels and their spans.

Raises:

Type Description
ValueError

If a BIO tag with position "B" or "I" is encountered without an entity type.

Source code in meld/tokenization.py
def bio_to_spans(bio_labels: list[BIO], token_spans: list[Span]) -> list[Annotation]:
    """
    Convert BIO-formatted labels to annotated spans based on their position in the untokenized document.

    Args:
        bio_labels: List of BIO tags.
        token_spans: List of token spans corresponding to the BIO
            labels.

    Returns:
        List of annotations with labels and their spans.

    Raises:
        ValueError: If a BIO tag with position "B" or "I" is encountered
            without an entity type.
    """

    annotations: list[tuple[str, Span]] = []
    current_tag: tuple[str, Span] | None = None
    previous_tag = BIO("O")
    for tag, span in zip(bio_labels, token_spans):
        if current_tag and (tag.position in {"O", "B"} or tag.entity_type != previous_tag.entity_type):
            annotations.append(current_tag)
            current_tag = None

        if tag.position in {"B", "I"}:
            if current_tag:
                # Advance the stop index
                label, current_span = current_tag
                current_tag = (label, Span(current_span.start, span.stop))
            elif tag.entity_type is None:
                raise ValueError(f"BIO tag without type encountered: {tag}")
            else:
                current_tag = (tag.entity_type, span)

        previous_tag = tag

    if current_tag:
        annotations.append(current_tag)

    return [Annotation(label, (span,)) for label, span in annotations]