diff --git a/pyproject.toml b/pyproject.toml index 45a6581..46a957d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -62,7 +62,8 @@ vad = [ eval = [ "unbabel-comet==2.2.6", "mweralign", - "sacrebleu" + "sacrebleu", + "mosestokenizer", ] [tool.setuptools.dynamic] diff --git a/simulstream/metrics/scorers/latency/long_yaal.py b/simulstream/metrics/scorers/latency/long_yaal.py new file mode 100644 index 0000000..aca3a3c --- /dev/null +++ b/simulstream/metrics/scorers/latency/long_yaal.py @@ -0,0 +1,195 @@ +# Copyright 2026 FBK + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +import argparse +import logging +import statistics +from typing import List, Optional + +from simulstream.metrics.readers import text_items +from simulstream.metrics.scorers.latency import register_latency_scorer, LatencyScores +from simulstream.metrics.scorers.latency.softsegmenter import ( + SoftSegmenterBasedLatencyScorer, + ResegmentedLatencyScoringSample, +) + + +LOGGER = logging.getLogger("simulstream.metrics.scorers.latency.long_yaal") + + +@register_latency_scorer("long_yaal") +class LongYAAL(SoftSegmenterBasedLatencyScorer): + """ + Computes Long-form Yet Another Average Lagging (LongYAAL) as proposed in + `Better Late Than Never: Evaluation of Latency Metrics for Simultaneous Speech-to-Text + Translation `_. + + This metric uses SoftSegmenter alignment to realign system outputs to reference segments + before computing latency, making it more robust for long-form speech translation evaluation. + + The key difference from StreamLAAL is the use of SoftSegmenter's more sophisticated + alignment algorithm that handles long-form audio better. Additionally, LongYAAL considers + all output tokens up until the end of the recording. StreamLAAL ignores any output tokens + emitted after the end of the reference segments. + """ + + @staticmethod + def _sentence_level_yaal( + delays: List[float], + source_length: float, + target_length: int, + recording_end: float = float("inf"), + is_longform: bool = True, + ) -> Optional[float]: + """ + Compute Yet Another Average Lagging (YAAL) on one sentence. + + Args: + delays (List[float]): Sequence of delays for each output token. + source_length (float): Length of the source audio segment in milliseconds. + target_length (int): Length of the target reference in tokens/characters. + recording_end (float): End time of the recording (for long-form audio). + is_longform (bool): Whether to treat as long-form audio (allows delays past + source_length). + + Returns: + Optional[float]: The YAAL score for the sentence, or None if computation is + not possible. + """ + if len(delays) == 0: + return None + + # If the first delay is already past the end, we can't compute YAAL + if (delays[0] >= source_length and not is_longform) or ( + delays[0] >= recording_end + ): + return None + + YAAL = 0.0 + gamma = max(len(delays), target_length) / source_length + tau = 0 + + for t_minus_1, d in enumerate(delays): + # Stop if we've exceeded the source length (for non-longform) + # or recording end (for longform) + if (d >= source_length and not is_longform) or (d >= recording_end): + break + + YAAL += d - t_minus_1 / gamma + tau = t_minus_1 + 1 + + if tau == 0: + return None + + YAAL /= tau + return YAAL + + def _do_score( + self, samples: List[ResegmentedLatencyScoringSample] + ) -> LatencyScores: + sentence_level_ideal_scores = [] + sentence_level_ca_scores = [] + skipped_sentences = 0 + + for sample in samples: + # Compute the total recording length (end time of the last reference segment) + if sample.reference: + recording_length = max( + ref.start_time + ref.duration for ref in sample.reference + ) + else: + LOGGER.warning( + f"Sample {sample.audio_name} has no reference segments; treating recording" + " length as infinite" + ) + recording_length = float("inf") + + for sentence_output, sentence_reference in zip( + sample.hypothesis, sample.reference + ): + # Note: delays in sentence_output are already offset relative to + # sentence_reference.start_time + # by the SoftSegmenter alignment (unlike MWERSegmenter which doesn't offset) + ideal_delays = sentence_output.ideal_delays + ca_delays = sentence_output.computational_aware_delays + + assert len(ideal_delays) == len( + ca_delays + ), f"Mismatch in delay counts: {len(ideal_delays)} vs {len(ca_delays)}" + + target_length = len( + text_items(sentence_reference.content, self.latency_unit) + ) + + if len(ideal_delays) > 0: + # Compute recording end time relative to sentence start + # This considers the entire recording, not just this segment. + # This allows LongYAAL to account for outputs emitted after the reference + # segment ends but before the recording ends (key difference from StreamLAAL) + recording_end = recording_length - sentence_reference.start_time + + ideal_score = self._sentence_level_yaal( + ideal_delays, + sentence_reference.duration, + target_length, + recording_end=recording_end, + is_longform=True, + ) + + ca_score = self._sentence_level_yaal( + ca_delays, + sentence_reference.duration, + target_length, + recording_end=recording_end, + is_longform=True, + ) + + if ideal_score is not None: + sentence_level_ideal_scores.append(ideal_score) + else: + skipped_sentences += 1 + + if ca_score is not None: + sentence_level_ca_scores.append(ca_score) + else: + skipped_sentences += 1 + + if skipped_sentences > 0: + LOGGER.warning( + f"{skipped_sentences} sentences have been skipped in LongYAAL computation " + f"as they were empty or could not be scored" + ) + + if len(sentence_level_ideal_scores) == 0: + LOGGER.error("No sentences could be scored for LongYAAL") + return LatencyScores(float("nan"), float("nan")) + + return LatencyScores( + statistics.mean(sentence_level_ideal_scores), + ( + statistics.mean(sentence_level_ca_scores) + if len(sentence_level_ca_scores) > 0 + else float("nan") + ), + ) + + @classmethod + def add_arguments(cls, parser: argparse.ArgumentParser) -> None: + parser.add_argument( + "--moses-lang", + type=str, + default=None, + help='Language code for Moses tokenizer (e.g., "en", "de"). ' + "Use None for Chinese/Japanese or to skip tokenization.", + ) diff --git a/simulstream/metrics/scorers/latency/softsegmenter.py b/simulstream/metrics/scorers/latency/softsegmenter.py new file mode 100644 index 0000000..d384950 --- /dev/null +++ b/simulstream/metrics/scorers/latency/softsegmenter.py @@ -0,0 +1,512 @@ +# Copyright 2026 FBK + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +import logging +import unicodedata +from abc import abstractmethod +from dataclasses import dataclass +from multiprocessing import Pool +from typing import Callable, List, Optional, Tuple + +try: + from mosestokenizer import MosesTokenizer # type: ignore +except ImportError: + MosesTokenizer = None + +from simulstream.metrics.readers import ReferenceSentenceDefinition, OutputWithDelays, text_items +from simulstream.metrics.scorers.latency import LatencyScorer, LatencyScoringSample, LatencyScores +from simulstream.metrics.scorers.latency.mwersegmenter import ResegmentedLatencyScoringSample + +LOGGER = logging.getLogger("simulstream.metrics.scorers.latency.softsegmenter") + +INF = float("inf") +PUNCT = set([".", "!", "?", ",", ";", ":", "-", "(", ")"]) +CHINESE_PUNCT = set(["。", "!", "?", ",", ";", ":", "—", "(", ")"]) +JAPAN_PUNCT = set(["。", "!", "?", ",", ";", ":", "ー", "(", ")"]) +ALL_PUNCT = PUNCT.union(CHINESE_PUNCT).union(JAPAN_PUNCT) + + +class Match: + """Enum for alignment operations.""" + MATCH = 0 + DELETE = 1 + INSERT = 2 + NONE = 3 + + +@dataclass +class Word: + """ + Represents a word with associated timing information. + + Attributes: + text (str): The word text. + delay (float): The delay timestamp. + seq_id (Optional[int]): Sequence identifier for alignment. + elapsed (Optional[float]): Elapsed time (for computation-aware delays). + main (bool): Whether this is a main word (not a subtoken). + original (Optional[str]): The original word before tokenization. + recording_length (Optional[float]): Total recording length. + """ + text: str + delay: float + seq_id: Optional[int] = None + elapsed: Optional[float] = None + main: bool = True + original: Optional[str] = None + recording_length: Optional[float] = None + + def __repr__(self): + return (f"Word(text={self.text}, delay={self.delay}, elapsed={self.elapsed}, " + f"seq_id={self.seq_id}, main={self.main}, original={self.original}, " + f"recording_length={self.recording_length})") + + +def unicode_normalize(text: str) -> str: + """Normalize Unicode text to NFKC form.""" + return unicodedata.normalize("NFKC", text) + + +def compute_similarity_score(ref_word: Word, hyp_word: Word, char_level: bool) -> float: + """ + Compute the similarity metric between two words. + + Args: + ref_word (Word): Reference word. + hyp_word (Word): Hypothesis word. + char_level (bool): Whether to use character-level comparison. + + Returns: + float: Similarity score between the words. + """ + ref_text = ref_word.text + hyp_text = hyp_word.text + + # If one text is punctuation and the other is not, return a negative score. + ref_t = ref_text in ALL_PUNCT + hyp_t = hyp_text in ALL_PUNCT + if ref_t ^ hyp_t: + return -INF + + # For character-level, compare lowercased texts directly + if char_level: + return float(ref_text == hyp_text) + + ref_set = set(ref_text) + hyp_set = set(hyp_text) + inter = len(ref_set & hyp_set) + union = len(ref_set) + len(hyp_set) - inter + + return (inter / union) if union else 0.0 + + +def _align_sequences(seq1: List[Word], seq2: List[Word], char_level: bool) -> tuple: + """ + Align two sequences maximizing the similarity metric. + + Args: + seq1 (List[Word]): First sequence (typically reference). + seq2 (List[Word]): Second sequence (typically hypothesis). + char_level (bool): Whether to use character-level comparison. + + Returns: + tuple: Two aligned sequences with None for gaps. + """ + # Initialize the alignment matrix + n = len(seq1) + 1 + m = len(seq2) + 1 + dp = [[0.0] * m for _ in range(n)] + dp_back = [[Match.NONE] * m for _ in range(n)] + + # Fill the first row and column of the matrix + for i in range(n): + dp[i][0] = 0.0 + dp_back[i][0] = Match.DELETE + for j in range(m): + dp[0][j] = 0.0 + dp_back[0][j] = Match.INSERT + dp[0][0] = 0.0 + dp_back[0][0] = Match.MATCH + + # Fill the alignment matrix + for i in range(1, n): + for j in range(1, m): + match = dp[i - 1][j - 1] + \ + compute_similarity_score(seq1[i - 1], seq2[j - 1], char_level) + delete = dp[i - 1][j] + insert = dp[i][j - 1] + dp[i][j] = max(match, delete, insert) + if dp[i][j] == match: + dp_back[i][j] = Match.MATCH + elif dp[i][j] == delete: + dp_back[i][j] = Match.DELETE + else: + dp_back[i][j] = Match.INSERT + + # Backtrack to find the alignment + aligned_seq1 = [] + aligned_seq2 = [] + i, j = n - 1, m - 1 + while i > 0 or j > 0: + if dp_back[i][j] == Match.MATCH: + aligned_seq1.append(seq1[i - 1]) + aligned_seq2.append(seq2[j - 1]) + i -= 1 + j -= 1 + elif dp_back[i][j] == Match.DELETE: + aligned_seq1.append(seq1[i - 1]) + aligned_seq2.append(None) + i -= 1 + elif dp_back[i][j] == Match.INSERT: + aligned_seq1.append(None) + aligned_seq2.append(seq2[j - 1]) + j -= 1 + else: + break + aligned_seq1.reverse() + aligned_seq2.reverse() + return aligned_seq1, aligned_seq2 + + +def _process_alignment( + ref_words: List[Optional[Word]], + hyp_words: List[Optional[Word]], + char_level: bool) -> List[Word]: + """ + Process the alignment to assign sequence IDs to hypothesis words. + + Args: + ref_words (List[Optional[Word]]): Aligned reference words (with None for gaps). + hyp_words (List[Optional[Word]]): Aligned hypothesis words (with None for gaps). + char_level (bool): Whether using character-level alignment. + + Returns: + List[Word]: Processed hypothesis words with assigned sequence IDs. + """ + assert len(ref_words) == len(hyp_words), \ + "Number of reference and hypothesis words do not match." + + def get_next_non_none_ref(i): + while i < len(ref_words) and ref_words[i] is None: + i += 1 + if i == len(ref_words): + return None + return i, ref_words[i] + + new_hyp_words = [] + last_ref = None + nexti = 0 + for i, (ref, hyp) in enumerate(zip(ref_words, hyp_words)): + if ref is None and i >= nexti: + if hyp is not None: + next_ref_info = get_next_non_none_ref(i) + if next_ref_info is not None: + nexti, next_ref = next_ref_info + if next_ref is not None and hyp.delay >= next_ref.delay: + last_ref = next_ref + + # last_ref can be set to next_ref to avoid non-monotonicity + if ref is not None and i >= nexti: + last_ref = ref + if hyp is not None: + if ref is None: + hyp.seq_id = last_ref.seq_id if last_ref is not None else 0 + else: + hyp.seq_id = ref.seq_id + new_hyp_words.append(hyp) + + return new_hyp_words + + +def _process_sample_alignment( + args: Tuple[int, List[Word], List[Word], bool] +) -> Tuple[int, List[Word]]: + """ + Process alignment for a single sample. Top-level function for multiprocessing. + + Args: + args: Tuple of (sample_index, ref_words, hyp_words, char_level). + + Returns: + Tuple of (sample_index, processed_hyp_words). + """ + i, ref_words, hyp_words, char_level = args + aligned_ref, aligned_hyp = _align_sequences(ref_words, hyp_words, char_level) + processed_hyp = _process_alignment(aligned_ref, aligned_hyp, char_level) + return i, processed_hyp + + +class SoftSegmenterBasedLatencyScorer(LatencyScorer): + """ + Abstract base class for scorers that require aligned system outputs and references through + SoftSegmenter alignment. + + This class wraps a latency scorer and applies the SoftSegmenter alignment algorithm from + `"Better Late Than Never: Evaluation of Latency Metrics for Simultaneous Speech-to-Text + Translation" `_ to hypotheses before scoring. + + Subclasses must implement :meth:`_do_score`, which operates on + :class:`ResegmentedLatencyScoringSample` instances where hypotheses and references are aligned. + + Args: + args: Parsed arguments containing latency_unit and optionally moses_lang. + + Example: + >>> class CustomLatencyScorer(SoftSegmenterBasedLatencyScorer): + ... def _do_score(self, samples): + ... # Compute a custom latency score + ... return LatencyScores(...) + """ + + def __init__(self, args): + super().__init__(args) + self.latency_unit = args.latency_unit + self.moses_lang = getattr(args, 'moses_lang', None) + + def requires_reference(self) -> bool: + return True + + @abstractmethod + def _do_score(self, samples: List[ResegmentedLatencyScoringSample]) -> LatencyScores: + """ + Compute latency scores on resegmented samples. + + Subclasses must override this method. + + Args: + samples (List[ResegmentedLatencyScoringSample]): Aligned + hypothesis–reference pairs with delay information. + + Returns: + LatencyScores: The computed latency metrics. + """ + ... + + def _split_delays_by_segmented_text( + self, delays: List[float], segmented_text: List[str]) -> List[List[float]]: + """ + Assign delay values to the corresponding segmented hypotheses. + + Args: + delays (List[float]): Delay values (per token or per char). + segmented_text (List[str]): Segmented hypothesis strings. + + Returns: + List[List[float]]: Delays split per segment. + """ + segmented_delays = [] + index = 0 + + for segment in segmented_text: + segment_len = len(text_items(segment, self.latency_unit)) + segmented_delays.append(delays[index:index + segment_len]) + index += segment_len + assert len(delays) == index, \ + f"Index {index} should have reached end of delays ({len(delays)})" + return segmented_delays + + def _create_words_from_output( + self, + output: OutputWithDelays, + char_level: bool, + recording_length: float) -> List[Word]: + """ + Convert OutputWithDelays to a list of Word objects. + + Args: + output (OutputWithDelays): The output with delays. + char_level (bool): Whether to use character-level units. + recording_length (float): Total length of the recording. + + Returns: + List[Word]: List of Word objects. + """ + units = text_items(output.final_text, "char" if char_level else "word") + assert len(units) == len(output.ideal_delays), \ + f"Number of units ({len(units)}) and delays ({len(output.ideal_delays)}) do not match" + lu = len(units) + l_ca = len(output.computational_aware_delays) + assert lu == l_ca, f"Number of units ({lu}) and CA delays ({l_ca}) do not match" + + words = [] + for unit, delay, elapsed in zip(units, output.ideal_delays, + output.computational_aware_delays): + words.append(Word( + text=unit, + delay=delay, + elapsed=elapsed, + recording_length=recording_length + )) + return words + + def _create_words_from_references( + self, + references: List[ReferenceSentenceDefinition], + char_level: bool) -> List[Word]: + """ + Convert reference sentences to a list of Word objects. + + Args: + references (List[ReferenceSentenceDefinition]): Reference sentences. + char_level (bool): Whether to use character-level units. + + Returns: + List[Word]: List of Word objects with sequence IDs. + """ + words = [] + for i, ref in enumerate(references): + ref_text = unicode_normalize(ref.content).lower() + units = text_items(ref_text, "char" if char_level else "word") + delay = ref.start_time + for unit in units: + words.append(Word(text=unit, delay=delay, seq_id=i)) + return words + + def tokenize_words(self, words: List[Word], tokenizer: Callable) -> List[Word]: + """ + Tokenize words using Moses tokenizer. + + Args: + words (List[Word]): List of words to tokenize. + tokenizer (callable): Callable that tokenizes a string. + + Returns: + List[Word]: Tokenized words with subtokens marked. + """ + + tokenized_words: List[Word] = [] + for word in words: + text = unicode_normalize(word.text).lower() + result = tokenizer(text) # type: ignore + # Ensure result is a list + if isinstance(result, str): + tokens: List[str] = [result] + else: + tokens = list(result) if result else [text] # type: ignore + main = True + for token in tokens: + tokenized_words.append(Word( + text=token, + delay=word.delay, + seq_id=word.seq_id, + elapsed=word.elapsed, + main=main, + original=word.text if main else None, + recording_length=word.recording_length + )) + main = False + return tokenized_words + + def score(self, samples: List[LatencyScoringSample]) -> LatencyScores: + char_level = self.latency_unit == "char" + + # Prepare alignment arguments for all samples + alignment_args = [] + sample_metadata = [] # Store per-sample metadata for post-processing + + if self.moses_lang is None: + if not (self.moses_lang == "zh" or self.moses_lang == "ja"): + LOGGER.warning( + "moses_lang not specified; defaulting to character-level tokenization. " + "This is recommended for Chinese/Japanese, but for other languages it is " + "recommended to specify a moses_lang for proper tokenization. Set " + "--moses-lang to the appropriate language code (e.g., 'en', 'de') " + "or to 'zh'/'ja' to skip tokenization.") + + else: + raise ValueError( + "moses_lang must be specified for non-Chinese/Japanese languages when " + "using softsegmenter. Set --moses-lang to the appropriate language code " + "(e.g., 'en', 'de') or to 'zh'/'ja' to skip tokenization.") + else: + if MosesTokenizer is None: + raise ImportError( + "mosestokenizer is required for softsegmenter. " + "Install it with: pip install mosestokenizer") + tokenizer = MosesTokenizer(lang=self.moses_lang, no_escape=True) + + for idx, sample in enumerate(samples): + assert sample.reference is not None, \ + "Cannot realign hypothesis to missing reference" + + # Calculate total recording length + recording_length = max( + ref.start_time + ref.duration for ref in sample.reference + ) + + # Convert references to Word objects + ref_words = self._create_words_from_references(sample.reference, char_level) + ref_words = self.tokenize_words(ref_words, tokenizer) + + # Convert hypothesis to Word objects + hyp_words = self._create_words_from_output( + sample.hypothesis, char_level, recording_length) + hyp_words = self.tokenize_words(hyp_words, tokenizer) + + alignment_args.append((idx, ref_words, hyp_words, char_level)) + sample_metadata.append(sample) + + # Parallelize alignment computation using multiprocessing Pool + with Pool() as pool: + results = pool.map(_process_sample_alignment, alignment_args) + + # Sort results by sample index to maintain original order + results.sort(key=lambda x: x[0]) + + # Post-process alignment results into resegmented samples + resegmented_samples = [] + for sample_idx, processed_hyp in results: + sample = sample_metadata[sample_idx] + + # Group hypothesis words by sequence ID + new_segmentation = {} + for i in range(len(sample.reference)): + new_segmentation[i] = [] + + for word in processed_hyp: + if word.main and word.seq_id is not None: + new_segmentation[word.seq_id].append(word) + + # Create OutputWithDelays for each segment + resegmented_hypos_with_delays = [] + for i, ref in enumerate(sample.reference): + segment_words = new_segmentation.get(i, []) + if len(segment_words) == 0: + # Empty segment + resegmented_hypos_with_delays.append( + OutputWithDelays("", [], []) + ) + else: + # Reconstruct text from main words + text_parts = [w.original if w.original else w.text for w in segment_words] + if char_level: + text = "".join(text_parts) + else: + text = " ".join(text_parts) + + # Offset delays relative to segment start + ideal_delays = [w.delay - ref.start_time for w in segment_words] + ca_delays = [w.elapsed - ref.start_time for w in segment_words] + + resegmented_hypos_with_delays.append( + OutputWithDelays(text, ideal_delays, ca_delays) + ) + + resegmented_samples.append(ResegmentedLatencyScoringSample( + sample.audio_name, + resegmented_hypos_with_delays, + sample.reference, + )) + + return self._do_score(resegmented_samples)