Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions livekit-rtc/livekit/rtc/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from collections import deque
import ctypes
import random
from typing import Callable, Generator, Generic, List, TypeVar
from typing import Callable, Generator, Generic, List, TypeVar, Union

logger = logging.getLogger("livekit")

Expand All @@ -40,8 +40,35 @@ def task_done_logger(task: asyncio.Task) -> None:
return


def get_address(mv: memoryview) -> int:
return ctypes.addressof(ctypes.c_char.from_buffer(mv))
def _buffer_supported_or_raise(
data: Union[bytes, bytearray, memoryview],
) -> None:
"""Validate a buffer for FFI use.

Raises clear errors for non-contiguous or sliced memoryviews.
"""
if isinstance(data, memoryview):
if not data.contiguous:
raise ValueError("memoryview must be contiguous")
if data.nbytes != len(data.obj): # type: ignore[arg-type]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 _buffer_supported_or_raise incorrectly rejects valid non-sliced memoryviews backed by non-byte-sized element types

The sliced-memoryview check at line 53 compares data.nbytes (always in bytes) with len(data.obj) (element count for non-byte buffer types). This causes valid, non-sliced memoryviews backed by objects like array.array, ctypes arrays, or numpy arrays to be incorrectly rejected.

Root Cause and Impact

For bytes and bytearray, len() returns byte count, so the check works. But for other buffer-exporting objects, len() returns element count:

  • array.array('h', range(10)): len() = 10, but memoryview(...).nbytes = 20
  • numpy.zeros(10, dtype=np.int16): len() = 10, but memoryview(...).nbytes = 20
  • ctypes (c_int16 * 10)(): len() = 10, but memoryview(...).nbytes = 20

When a user passes memoryview(array.array('h', samples)) or memoryview(numpy_int16_array) as the data argument to AudioFrame() or VideoFrame(), _buffer_supported_or_raise raises ValueError("sliced memoryviews are not supported") even though the memoryview is perfectly valid and non-sliced.

Impact: Users passing memoryviews of non-byte-element buffer objects (common with numpy or array.array workflows) will get a spurious ValueError.

Suggested change
if data.nbytes != len(data.obj): # type: ignore[arg-type]
if data.nbytes != memoryview(data.obj).nbytes:
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

raise ValueError("sliced memoryviews are not supported")
elif not isinstance(data, (bytes, bytearray)):
raise TypeError(f"expected bytes, bytearray, or memoryview, got {type(data)}")


def get_address(data) -> int:
if isinstance(data, memoryview):
_buffer_supported_or_raise(data)
if not data.readonly:
return ctypes.addressof(ctypes.c_char.from_buffer(data))
data = data.obj
if isinstance(data, bytearray):
return ctypes.addressof(ctypes.c_char.from_buffer(data))
if isinstance(data, bytes):
addr = ctypes.cast(ctypes.c_char_p(data), ctypes.c_void_p).value
assert addr is not None
return addr
raise TypeError(f"expected bytes, bytearray, or memoryview, got {type(data)}")


T = TypeVar("T")
Expand Down
18 changes: 12 additions & 6 deletions livekit-rtc/livekit/rtc/apm.py

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 process_reverse_stream checks wrong response field, silently ignoring errors

In process_reverse_stream, the error check on line 93 reads resp.apm_process_stream.error instead of resp.apm_process_reverse_stream.error. This means any error returned by the FFI reverse-stream processing call is silently ignored.

Root Cause and Impact

The method sends a request via req.apm_process_reverse_stream but checks the response on the wrong oneof field resp.apm_process_stream.error (copy-paste from process_stream). Since protobuf oneofs default to empty/falsy when the wrong field is accessed, resp.apm_process_stream.error will always be empty after an apm_process_reverse_stream request, so the RuntimeError is never raised even when the FFI call fails.

Compare with the correct check in process_stream at apm.py:65-66:

if resp.apm_process_stream.error:
    raise RuntimeError(resp.apm_process_stream.error)

The process_reverse_stream method at apm.py:93-94 should mirror this but for the reverse stream:

if resp.apm_process_reverse_stream.error:  # NOT apm_process_stream
    raise RuntimeError(resp.apm_process_reverse_stream.error)

Impact: Errors during reverse stream processing (used for echo cancellation) are silently swallowed, making it very difficult to diagnose audio processing failures in full-duplex setups.

(Refers to lines 93-94)

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,15 @@ def process_stream(self, data: AudioFrame) -> None:
Important:
Audio frames must be exactly 10 ms in duration.
"""
bdata = data.data.cast("b")
if isinstance(data._data, bytes) or (
isinstance(data._data, memoryview) and data._data.readonly
):
data._data = bytearray(data._data)

req = proto_ffi.FfiRequest()
req.apm_process_stream.apm_handle = self._ffi_handle.handle
req.apm_process_stream.data_ptr = get_address(memoryview(bdata))
req.apm_process_stream.size = len(bdata)
req.apm_process_stream.data_ptr = get_address(data._data)
req.apm_process_stream.size = len(data._data)
req.apm_process_stream.sample_rate = data.sample_rate
req.apm_process_stream.num_channels = data.num_channels

Expand All @@ -73,12 +76,15 @@ def process_reverse_stream(self, data: AudioFrame) -> None:
Important:
Audio frames must be exactly 10 ms in duration.
"""
bdata = data.data.cast("b")
if isinstance(data._data, bytes) or (
isinstance(data._data, memoryview) and data._data.readonly
):
data._data = bytearray(data._data)

req = proto_ffi.FfiRequest()
req.apm_process_reverse_stream.apm_handle = self._ffi_handle.handle
req.apm_process_reverse_stream.data_ptr = get_address(memoryview(bdata))
req.apm_process_reverse_stream.size = len(bdata)
req.apm_process_reverse_stream.data_ptr = get_address(data._data)
req.apm_process_reverse_stream.size = len(data._data)
req.apm_process_reverse_stream.sample_rate = data.sample_rate
req.apm_process_reverse_stream.num_channels = data.num_channels

Expand Down
16 changes: 9 additions & 7 deletions livekit-rtc/livekit/rtc/audio_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import ctypes
from ._ffi_client import FfiHandle
from ._proto import audio_frame_pb2 as proto_audio
from ._utils import get_address
from ._utils import _buffer_supported_or_raise, get_address
from typing import Any, Union


Expand Down Expand Up @@ -49,19 +49,21 @@ def __init__(
Raises:
ValueError: If the length of `data` is smaller than the required size.
"""
data = memoryview(data).cast("B")
_buffer_supported_or_raise(data)

if len(data) < num_channels * samples_per_channel * ctypes.sizeof(ctypes.c_int16):
min_size = num_channels * samples_per_channel * ctypes.sizeof(ctypes.c_int16)
data_len = len(data)

if data_len < min_size:
raise ValueError(
"data length must be >= num_channels * samples_per_channel * sizeof(int16)"
)

if len(data) % ctypes.sizeof(ctypes.c_int16) != 0:
if data_len % ctypes.sizeof(ctypes.c_int16) != 0:
# can happen if data is bigger than needed
raise ValueError("data length must be a multiple of sizeof(int16)")

n = len(data) // ctypes.sizeof(ctypes.c_int16)
self._data = (ctypes.c_int16 * n).from_buffer_copy(data)
self._data = data

self._sample_rate = sample_rate
self._num_channels = num_channels
Expand Down Expand Up @@ -97,7 +99,7 @@ def _from_owned_info(owned_info: proto_audio.OwnedAudioFrameBuffer) -> "AudioFra

def _proto_info(self) -> proto_audio.AudioFrameBufferInfo:
audio_info = proto_audio.AudioFrameBufferInfo()
audio_info.data_ptr = get_address(memoryview(self._data))
audio_info.data_ptr = get_address(self._data)
audio_info.sample_rate = self.sample_rate
audio_info.num_channels = self.num_channels
audio_info.samples_per_channel = self.samples_per_channel
Expand Down
6 changes: 4 additions & 2 deletions livekit-rtc/livekit/rtc/video_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from ._proto import ffi_pb2 as proto
from typing import List, Optional
from ._ffi_client import FfiClient, FfiHandle
from ._utils import get_address
from ._utils import _buffer_supported_or_raise, get_address

from typing import Any

Expand Down Expand Up @@ -48,10 +48,12 @@ def __init__(
(e.g., RGBA, BGRA, RGB24, etc.).
data (Union[bytes, bytearray, memoryview]): The raw pixel data for the video frame.
"""
_buffer_supported_or_raise(data)

self._width = width
self._height = height
self._type = type
self._data = bytearray(data)
self._data = data

@property
def width(self) -> int:
Expand Down
Loading