# Copyright 2008-2020 pydicom authors. See LICENSE file for details. """Use the `numpy `_ package to convert supported *Waveform Data* to a :class:`numpy.ndarray`. .. versionadded:: 2.1 **Supported data** The numpy handler supports the conversion of data in the (5400,0100) *Waveform Sequence* element to an :class:`~numpy.ndarray` provided the related :dcm:`Waveform` module elements have values given in the table below. +-------------+------------------------------+------+-------------------------+ | Element | Supported | +-------------+------------------------------+------+ values | | Tag | Keyword | Type | | +=============+==============================+======+=========================+ | (003A,0005) | NumberOfWaveformChannels | 1 | N > 0 | +-------------+------------------------------+------+-------------------------+ | (003A,0010) | NumberOfWaveformSamples | 1 | N > 0 | +-------------+------------------------------+------+-------------------------+ | (5400,1004) | WaveformBitsAllocated | 1 | 8, 16, 32, 64 | +-------------+------------------------------+------+-------------------------+ | (5400,1006) | WaveformSampleInterpretation | 1 | SB, UB, MB, AB, SS, US, | | | | | SL, UL, SV, UV | +-------------+------------------------------+------+-------------------------+ """ from typing import TYPE_CHECKING, Generator, cast, List try: import numpy as np HAVE_NP = True except ImportError: HAVE_NP = False if TYPE_CHECKING: # pragma: no cover from pydicom.dataset import Dataset HANDLER_NAME = 'Numpy Waveform' DEPENDENCIES = {'numpy': ('http://www.numpy.org/', 'NumPy')} WAVEFORM_DTYPES = { (8, 'SB'): 'int8', (8, 'UB'): 'uint8', (8, 'MB'): 'uint8', (8, 'AB'): 'uint8', (16, 'SS'): 'int16', (16, 'US'): 'uint16', (32, 'SL'): 'int32', (32, 'UL'): 'uint32', (64, 'SV'): 'int64', (64, 'UV'): 'uint64', } def is_available() -> bool: """Return ``True`` if the handler has its dependencies met. .. versionadded:: 2.1 """ return HAVE_NP def generate_multiplex( ds: "Dataset", as_raw: bool = True ) -> Generator["np.ndarray", None, None]: """Yield an :class:`~numpy.ndarray` for each multiplex group in the *Waveform Sequence*. .. versionadded:: 2.1 Parameters ---------- ds : pydicom.dataset.Dataset The :class:`Dataset` containing a :dcm:`Waveform ` module and the *Waveform Sequence* to be converted. as_raw : bool, optional If ``True`` (default), then yield the raw unitless waveform data. If ``False`` then attempt to convert the raw data for each channel to the quantity specified by the corresponding (003A,0210) *Channel Sensitivity* unit. Yields ------ np.ndarray The waveform data for a multiplex group as an :class:`~numpy.ndarray` with shape (samples, channels). """ if 'WaveformSequence' not in ds: raise AttributeError( "No (5400,0100) Waveform Sequence element found in the dataset" ) for ii, item in enumerate(cast(List["Dataset"], ds.WaveformSequence)): required_elements = [ 'NumberOfWaveformChannels', 'NumberOfWaveformSamples', 'WaveformBitsAllocated', 'WaveformSampleInterpretation', 'WaveformData' ] missing = [elem for elem in required_elements if elem not in item] if missing: raise AttributeError( f"Unable to convert the waveform multiplex group with index " f"{ii} as the following required elements are missing from " f"the sequence item: {', '.join(missing)}" ) # Determine the expected length of the data (without padding) bytes_per_sample = cast(int, item.WaveformBitsAllocated) // 8 nr_samples = cast(int, item.NumberOfWaveformSamples) nr_channels = cast(int, item.NumberOfWaveformChannels) bits_allocated = cast(int, item.WaveformBitsAllocated) sample_interpretation = cast(str, item.WaveformSampleInterpretation) expected_len = nr_samples * nr_channels * bytes_per_sample # Waveform Data is ordered as (C = channel, S = sample): # C1S1, C2S1, ..., CnS1, C1S2, ..., CnS2, ..., C1Sm, ..., CnSm dtype = WAVEFORM_DTYPES[(bits_allocated, sample_interpretation)] arr = np.frombuffer( cast(bytes, item.WaveformData)[:expected_len], dtype=dtype ) # Reshape to (samples, channels) and make writeable arr = np.copy(arr.reshape(nr_samples, nr_channels)) if not as_raw: # Apply correction factor (if possible) arr = arr.astype('float') seq = cast(List["Dataset"], item.ChannelDefinitionSequence) for jj, ch in enumerate(seq): baseline = ch.get("ChannelBaseline", 0.0) sensitivity = ch.get("ChannelSensitivity", 1.0) correction = ch.get("ChannelSensitivityCorrectionFactor", 1.0) arr[..., jj] = ( (arr[..., jj] + baseline) * sensitivity * correction ) yield arr def multiplex_array( ds: "Dataset", index: int, as_raw: bool = True ) -> "np.ndarray": """Return an :class:`~numpy.ndarray` for the multiplex group in the *Waveform Sequence* at `index`. .. versionadded:: 2.1 Parameters ---------- ds : pydicom.dataset.Dataset The :class:`Dataset` containing a :dcm:`Waveform ` module and the *Waveform Sequence* to be converted. index : int The index of the multiplex group to return. as_raw : bool, optional If ``True`` (default), then return the raw unitless waveform data. If ``False`` then attempt to convert the raw data for each channel to the quantity specified by the corresponding (003A,0210) *Channel Sensitivity* unit. Returns ------- np.ndarray The waveform data for a multiplex group as an :class:`~numpy.ndarray` with shape (samples, channels). """ if 'WaveformSequence' not in ds: raise AttributeError( "No (5400,0100) Waveform Sequence element found in the dataset" ) item = cast(List["Dataset"], ds.WaveformSequence)[index] required_elements = [ 'NumberOfWaveformChannels', 'NumberOfWaveformSamples', 'WaveformBitsAllocated', 'WaveformSampleInterpretation', 'WaveformData' ] missing = [elem for elem in required_elements if elem not in item] if missing: raise AttributeError( f"Unable to convert the waveform multiplex group with index " f"{index} as the following required elements are missing from " f"the sequence item: {', '.join(missing)}" ) # Determine the expected length of the data (without padding) bytes_per_sample = cast(int, item.WaveformBitsAllocated) // 8 nr_samples = cast(int, item.NumberOfWaveformSamples) nr_channels = cast(int, item.NumberOfWaveformChannels) bits_allocated = cast(int, item.WaveformBitsAllocated) sample_interpretation = cast(str, item.WaveformSampleInterpretation) expected_len = nr_samples * nr_channels * bytes_per_sample # Waveform Data is ordered as (C = channel, S = sample): # C1S1, C2S1, ..., CnS1, C1S2, ..., CnS2, ..., C1Sm, ..., CnSm dtype = WAVEFORM_DTYPES[(bits_allocated, sample_interpretation)] arr = np.frombuffer( cast(bytes, item.WaveformData)[:expected_len], dtype=dtype ) # Reshape to (samples, channels) and make writeable arr = np.copy(arr.reshape(nr_samples, nr_channels)) if not as_raw: # Apply correction factor (if possible) arr = arr.astype('float') seq = cast(List["Dataset"], item.ChannelDefinitionSequence) for jj, ch in enumerate(seq): baseline = ch.get("ChannelBaseline", 0.0) sensitivity = ch.get("ChannelSensitivity", 1.0) correction = ch.get("ChannelSensitivityCorrectionFactor", 1.0) arr[..., jj] = ( (arr[..., jj] + baseline) * sensitivity * correction ) return cast("np.ndarray", arr)