Source code for linesep.splitters

from __future__ import annotations
from abc import ABC, abstractmethod
from collections import deque
from collections.abc import AsyncIterable, AsyncIterator, Iterable, Iterator
from dataclasses import dataclass
import re
from typing import AnyStr, Generic, Optional


[docs]class Splitter(ABC, Generic[AnyStr]): """ .. versionadded:: 0.4.0 Abstract base class for all splitters. The abstract methods are an implementation detail; this class is exported only for `isinstance()` and typing purposes and should not be subclassed by users. `Splitter` and its subclasses are generic in `AnyStr`; i.e., they should be written in type annotations as ``SplitterClass[AnyStr]``, ``SplitterClass[str]``, or ``SplitterClass[bytes]``, as appropriate. """ def __init__(self) -> None: #: The output queue self._items: deque[AnyStr] = deque() #: The buffer of unsplit input data self._buff: Optional[AnyStr] = None #: A "hold space" for holding intermediate states of compound output #: items self._hold: Optional[AnyStr] = None #: Whether `close()` has been called self._closed: bool = False #: Whether we've handled the first split segment yet self._first: bool = True @abstractmethod def _find_separator(self, data: AnyStr) -> Optional[tuple[int, int]]: """ Find the first occurrence of a separator in ``data`` and return the separator's starting and ending indices; if no separator is found, return `None`. """ ... @abstractmethod def _handle_segment( self, item: AnyStr, first: bool = False, last: bool = False ) -> None: """ Process split segment ``item``. :param bool first: Whether this is the first segment in the data stream :param bool last: Whether this is the last segment in the data stream """ ... @abstractmethod def _handle_separator(self, item: AnyStr) -> None: """Process split separator ``item``""" ... def _output(self, item: AnyStr) -> None: """Append ``item`` to the output queue""" self._items.append(item) def _split(self) -> None: """Split up the current contents of `_buff`""" while self._buff: span = self._find_separator(self._buff) if span is None: break start, end = span self._handle_segment(self._buff[:start], first=self._first) self._first = False self._handle_separator(self._buff[start:end]) self._buff = self._buff[end:] if self._closed and self._buff is not None: self._handle_segment(self._buff, first=self._first, last=True) self._buff = None
[docs] def feed(self, data: AnyStr) -> None: """ Split input ``data``. Any segments or separators extracted can afterwards be retrieved by calling `get()` or `getall()`. :raises SplitterClosedError: if `close()` has already been called on this splitter """ if self._closed: raise SplitterClosedError("Cannot feed data to closed splitter") if self._buff is None: self._buff = data else: self._buff += data self._split()
[docs] def get(self) -> AnyStr: """ Retrieve the next unfetched item that has been split from the input. :raises SplitterEmptyError: if there are no items currently available """ try: return self._items.popleft() except IndexError: raise SplitterEmptyError("No items available in splitter")
@property def nonempty(self) -> bool: """Whether a subsequent call to `get()` would return an item""" return bool(self._items)
[docs] def getall(self) -> list[AnyStr]: """Retrieve all unfetched items that have been split from the input""" items = list(self._items) self._items.clear() return items
[docs] def split(self, data: AnyStr, final: bool = False) -> list[AnyStr]: """ Split input ``data`` and return all items thus extracted. Set ``final`` to `True` if this is the last chunk of input. Note that, if a previous call to `feed()` was not followed by enough calls to `get()` to retrieve all items, any items left over from the previous round of input will be prepended to the list returned by this method. :raises SplitterClosedError: if `close()` has already been called on this splitter """ self.feed(data) if final: self.close() return self.getall()
[docs] def close(self) -> None: """ Indicate to the splitter that the end of input has been reached. No further calls to `feed()` or `split()` may be made after calling this method unless `reset()` or `setstate()` is called in between. Depending on the internal state, calling this method may cause more segments or separators to be split from unprocessed input; be sure to fetch them with `get()` or `getall()`. """ self._closed = True if self._buff is not None: self._split()
@property def closed(self) -> bool: """Whether `close()` has been called on this splitter""" return self._closed
[docs] def reset(self) -> None: """ Reset the splitter to its initial state, as though a new instance with the same parameters were constructed """ self._items.clear() self._buff = None self._hold = None self._closed = False self._first = True
[docs] def getstate(self) -> SplitterState[AnyStr]: """Retrieve a representation of the splitter's current state""" return SplitterState( items=list(self._items), buff=self._buff, hold=self._hold, closed=self._closed, first=self._first, )
[docs] def setstate(self, state: SplitterState[AnyStr]) -> None: """ Restore the state of the splitter to the what it was when the corresponding `getstate()` call was made """ self._items.clear() self._items.extend(state.items) self._buff = state.buff self._hold = state.hold self._closed = state.closed self._first = state.first
[docs] def itersplit(self, iterable: Iterable[AnyStr]) -> Iterator[AnyStr]: """ Feed each element of ``iterable`` as input to the splitter and yield each item produced. None of the splitter's other methods should be called while iterating over the yielded values. The splitter's state is saved & reset before processing the iterable, and the saved state is restored at the end. If you break out of the resulting iterator early, the splitter will be in an undefined state unless & until you reset it. """ st = self.getstate() self.reset() try: for s in iterable: yield from self.split(s) self.close() yield from self.getall() finally: self.setstate(st)
[docs] async def aitersplit( self, aiterable: AsyncIterable[AnyStr] ) -> AsyncIterator[AnyStr]: """Like `itersplit()`, but for asynchronous iterators""" st = self.getstate() self.reset() try: async for s in aiterable: for t in self.split(s): yield t self.close() for t in self.getall(): yield t finally: self.setstate(st)
class ConstantSplitter(Splitter[AnyStr]): """ .. versionadded:: 0.4.0 A splitter that uses a single, fixed string as the separator """ def __init__(self, separator: AnyStr, retain: bool = False) -> None: """ :param AnyStr separator: The string to split the input on :param bool retain: Whether to include the separators in split items (`True`) or discard them (`False`, default) :raises ValueError: if ``separator`` is an empty string """ if not separator: raise ValueError("Separator cannot be empty") super().__init__() self._separator: AnyStr = separator self._retain: bool = retain def _find_separator(self, data: AnyStr) -> Optional[tuple[int, int]]: try: i = data.index(self._separator) except ValueError: return None else: return (i, i + len(self._separator))
[docs]class TerminatedSplitter(ConstantSplitter[AnyStr]): """ .. versionadded:: 0.4.0 A splitter that splits segments terminated by a given string. A separator at the beginning of the input creates an empty leading segment, and a separator at the end of the input simply terminates the last segment. Two adjacent separators always create an empty segment between them. """ def _handle_segment( self, item: AnyStr, first: bool = False, last: bool = False # noqa: U100 ) -> None: if not last or item: if self._retain and not last: assert self._hold is None self._hold = item else: self._output(item) def _handle_separator(self, item: AnyStr) -> None: if self._retain: assert self._hold is not None self._output(self._hold + item) self._hold = None
[docs]class SeparatedSplitter(ConstantSplitter[AnyStr]): """ .. versionadded:: 0.4.0 A splitter that splits segments separated by a given string. A separator at the beginning of the input creates an empty leading segment, and a separator at the end of the input creates an empty trailing segment. Two adjacent separators always create an empty segment between them. Note that, when ``retain`` is true, separators are returned as separate items, alternating with segments (unlike `TerminatedSplitter` and `PrecededSplitter`, where separators are appended/prepended to the segments). In a list returned by `split()` or `getall()`, the segments will be the items at the even indices (starting at 0), and the separators will be at the odd indices (assuming you're calling `get()` the right amount of times and not leaving any output unfetched). """ def _handle_segment( self, item: AnyStr, first: bool = False, last: bool = False # noqa: U100 ) -> None: self._output(item) def _handle_separator(self, item: AnyStr) -> None: if self._retain: self._output(item)
[docs]class PrecededSplitter(ConstantSplitter[AnyStr]): """ .. versionadded:: 0.4.0 A splitter that splits segments preceded by a given string. A separator at the beginning of the input simply starts the first segment, and a separator at the end of the input creates an empty trailing segment. Two adjacent separators always create an empty segment between them. """ def _handle_segment( self, item: AnyStr, first: bool = False, last: bool = False # noqa: U100 ) -> None: if first: if item: self._output(item) elif self._retain: assert self._hold is not None self._output(self._hold + item) self._hold = None else: self._output(item) def _handle_separator(self, item: AnyStr) -> None: if self._retain: assert self._hold is None self._hold = item
[docs]class UniversalNewlineSplitter(Splitter[AnyStr]): """ .. versionadded:: 0.4.0 A splitter that splits segments terminated by the ASCII newline sequences ``"\\n"``, ``"\\r\\n"``, and ``"\\r"``. """ def __init__(self, retain: bool = False, translate: bool = True) -> None: """ :param bool retain: Whether to include the newlines in split items (`True`) or discard them (`False`, default) :param bool translate: Whether to convert all retained newlines to ``"\\n"`` (`True`, default) or leave them as-is (`False`) """ super().__init__() self._retain = retain self._translate = translate self._strs: Optional[NewlineStrs[AnyStr]] = None def _find_separator(self, data: AnyStr) -> Optional[tuple[int, int]]: if self._strs is None: self._strs = NewlineStrs.for_type(data) return self._strs.search(data, self.closed) def _handle_segment( self, item: AnyStr, first: bool = False, last: bool = False # noqa: U100 ) -> None: if not last or item: if self._retain and not last: assert self._hold is None self._hold = item else: self._output(item) def _handle_separator(self, item: AnyStr) -> None: if self._retain: if self._translate: assert self._strs is not None item = self._strs.LF assert self._hold is not None self._output(self._hold + item) self._hold = None
[docs]class UnicodeNewlineSplitter(Splitter[str]): """ .. versionadded:: 0.5.0 A splitter that splits segments terminated by the same set of line endings as recognized by the `str.splitlines()` method. Note that, unlike other splitters, this class is not generic and is only usable on `str` values, not `bytes`. """ SEP_RGX = re.compile(r"\r\n?|[\n\v\f\x1C-\x1E\x85\u2028\u2029]") def __init__(self, retain: bool = False, translate: bool = True) -> None: """ :param bool retain: Whether to include the newlines in split items (`True`) or discard them (`False`, default) :param bool translate: Whether to convert all retained newlines to ``"\\n"`` (`True`, default) or leave them as-is (`False`) """ super().__init__() self._retain = retain self._translate = translate def _find_separator(self, data: str) -> Optional[tuple[int, int]]: m = self.SEP_RGX.search(data) if m and not (m.group() == "\r" and m.end() == len(data) and not self.closed): return m.span() else: return None def _handle_segment( self, item: str, first: bool = False, last: bool = False # noqa: U100 ) -> None: if not last or item: if self._retain and not last: assert self._hold is None self._hold = item else: self._output(item) def _handle_separator(self, item: str) -> None: if self._retain: if self._translate: item = "\n" assert self._hold is not None self._output(self._hold + item) self._hold = None
[docs]class ParagraphSplitter(Splitter[AnyStr]): """ .. versionadded:: 0.5.0 A splitter that splits segments terminated by one or more blank lines (i.e., lines containing only a line ending), where lines are terminated by the ASCII newline sequences ``"\\n"``, ``"\\r\\n"``, and ``"\\r"``. """ def __init__(self, retain: bool = False, translate: bool = True) -> None: """ :param bool retain: Whether to include the trailing newlines in split items (`True`) or discard them (`False`, default) :param bool translate: Whether to convert all newlines (both trailing and internal) to ``"\\n"`` (`True`, default) or leave them as-is (`False`) """ super().__init__() self._retain = retain self._translate = translate self._strs: Optional[NewlineStrs[AnyStr]] = None def _split(self) -> None: if self._strs is None and self._buff is not None: self._strs = NewlineStrs.for_type(self._buff) pos = 0 while self._buff: assert self._strs is not None if self._hold is None: span = self._strs.search(self._buff, self.closed, pos=pos) if span is None: break start, end = span if (self._first and start == 0) or self._strs.match( self._buff, pos=end, closed=self.closed ) is not None: if self._retain: self._hold = self._buff[:start] else: self._output(self._buff[:start]) self._hold = self._buff[0:0] self._handle_separator(self._buff[start:end]) self._first = False self._buff = self._buff[end:] else: if self._translate and self._buff[start:end] != self._strs.LF: self._buff = ( self._buff[:start] + self._strs.LF + self._buff[end:] ) end = start + 1 pos = end else: end2 = self._strs.match(self._buff, self.closed) if end2 is None: if self._retain: self._output(self._hold) self._hold = None elif end2 == -1: break else: self._handle_separator(self._buff[:end2]) self._buff = self._buff[end2:] pos = 0 if self._closed and self._buff is not None: assert self._strs is not None if self._buff: if not self._retain: length = self._strs.endmatch(self._buff) if length is not None: self._buff = self._buff[:-length] self._output(self._buff) elif self._retain and self._hold is not None: self._output(self._hold) self._buff = None def _find_separator(self, data: AnyStr) -> Optional[tuple[int, int]]: raise NotImplementedError("Not used by this subclass") # pragma: no cover def _handle_segment( self, item: AnyStr, first: bool = False, last: bool = False ) -> None: raise NotImplementedError("Not used by this subclass") # pragma: no cover def _handle_separator(self, item: AnyStr) -> None: if self._translate: assert self._strs is not None item = self._strs.LF assert self._hold is not None self._hold += item
[docs]def get_newline_splitter( newline: Optional[str] = None, retain: bool = False ) -> Splitter[str]: """ .. versionadded:: 0.4.0 Return a splitter for splitting on newlines following the same rules as the ``newline`` option to `open()`. Specifically: - If ``newline`` is `None`, a splitter that splits on all ASCII newlines and converts them to ``"\\n"`` is returned. - If ``newline`` is ``""`` (the empty string), a splitter that splits on all ASCII newlines and leaves them as-is is returned. - If ``newline`` is ``"\\n"``, ``"\\r\\n"``, or ``"\\r"``, a splitter that splits on the given string is returned. - If ``newline`` is any other value, a `ValueError` is raised. Note that this function is limited to splitting on `str`\\s and does not support `bytes`. :param bool retain: Whether the returned splitter should include the newlines in split items (`True`) or discard them (`False`, default) """ if newline is None: return UniversalNewlineSplitter(retain=retain, translate=True) elif newline == "": return UniversalNewlineSplitter(retain=retain, translate=False) elif newline in ("\n", "\r\n", "\r"): return TerminatedSplitter(newline, retain=retain) else: raise ValueError(f"Invalid 'newline' value: {newline!r}")
[docs]class SplitterClosedError(ValueError): """ .. versionadded:: 0.4.0 Raised when `~Splitter.feed()` or `~Splitter.split()` is called on a splitter after its `~Splitter.close()` method is called """ pass
[docs]class SplitterEmptyError(Exception): """ .. versionadded:: 0.4.0 Raised when `~Splitter.get()` is called on a splitter that does not have any unfetched items to return """ pass
@dataclass class NewlineStrs(Generic[AnyStr]): regex: re.Pattern[AnyStr] CR: AnyStr LF: AnyStr @property def CRLF(self) -> AnyStr: return self.CR + self.LF @classmethod def for_type(cls, data: AnyStr) -> NewlineStrs[AnyStr]: if isinstance(data, str): return cls(regex=re.compile(r"\r\n?|\n"), CR="\r", LF="\n") else: return cls(regex=re.compile(rb"\r\n?|\n"), CR=b"\r", LF=b"\n") def search( self, data: AnyStr, closed: bool, pos: int = 0 ) -> Optional[tuple[int, int]]: m = self.regex.search(data, pos=pos) if m and not (m.group() == self.CR and m.end() == len(data) and not closed): return m.span() else: return None def match(self, data: AnyStr, closed: bool, pos: int = 0) -> Optional[int]: # A return value of -1 means that that ``data[pos:] == CR`` and # `closed` is False m = self.regex.match(data, pos=pos) if not m: return None elif m.group() == self.CR and m.end() == len(data) and not closed: return -1 else: return len(m.group()) def endmatch(self, data: AnyStr) -> Optional[int]: if data.endswith(self.CRLF): return 2 elif data.endswith((self.CR, self.LF)): return 1 else: return None
[docs]@dataclass(repr=False) class SplitterState(Generic[AnyStr]): """ .. versionadded:: 0.4.0 A representation of the internal state of a splitter, returned by `~Splitter.getstate()`. This can be passed to `~Splitter.setstate()` to restore the spitter's internal state to what it was previously. A given `SplitterState` should only be passed to the `~Splitter.setstate()` method of a splitter of the same class and with the same constructor arguments as the splitter that produced the `SplitterState`; otherwise, the behavior is undefined. Instances of this class should be treated as opaque objects and should not be inspected, nor should any observed property be relied upon to be the same in future library versions. """ items: list[AnyStr] buff: Optional[AnyStr] hold: Optional[AnyStr] closed: bool first: bool