oca-ocb-core/odoo-bringout-oca-ocb-base/odoo/tools/intervals.py
Ernad Husremovic 2d3ee4855a 19.0 vanilla
2026-03-09 09:30:27 +01:00

158 lines
5.9 KiB
Python

# Part of Odoo. See LICENSE file for full copyright and licensing details.
from __future__ import annotations
import itertools
import typing
import warnings
if typing.TYPE_CHECKING:
from collections.abc import Iterable, Iterator
from collections.abc import Set as AbstractSet
T = typing.TypeVar('T')
def _boundaries(intervals: Intervals[T] | Iterable[tuple[T, T, AbstractSet]], opening: str, closing: str) -> Iterator[tuple[T, str, AbstractSet]]:
""" Iterate on the boundaries of intervals. """
for start, stop, recs in intervals:
if start < stop:
yield (start, opening, recs)
yield (stop, closing, recs)
class Intervals(typing.Generic[T]):
""" Collection of ordered disjoint intervals with some associated records.
Each interval is a triple ``(start, stop, records)``, where ``records``
is a recordset.
By default, adjacent intervals are merged (1, 3, a) and (3, 5, b) become
(1, 5, a | b). This behaviour can be prevented by setting
`keep_distinct=True`.
"""
def __init__(self, intervals: Iterable[tuple[T, T, AbstractSet]] | None = None, *, keep_distinct: bool = False):
self._items: list[tuple[T, T, AbstractSet]] = []
self._keep_distinct = keep_distinct
if intervals:
# normalize the representation of intervals
append = self._items.append
starts: list[T] = []
items: AbstractSet | None = None
if self._keep_distinct:
boundaries = sorted(_boundaries(sorted(intervals), 'start', 'stop'), key=lambda i: i[0])
else:
boundaries = sorted(_boundaries(intervals, 'start', 'stop'))
for value, flag, value_items in boundaries:
if flag == 'start':
starts.append(value)
if items is None:
items = value_items
else:
items = items.union(value_items)
else:
start = starts.pop()
if not starts:
append((start, value, items))
items = None
def __bool__(self):
return bool(self._items)
def __len__(self):
return len(self._items)
def __iter__(self):
return iter(self._items)
def __reversed__(self):
return reversed(self._items)
def __or__(self, other):
""" Return the union of two sets of intervals. """
return Intervals(itertools.chain(self._items, other._items), keep_distinct=self._keep_distinct)
def __and__(self, other):
""" Return the intersection of two sets of intervals. """
return self._merge(other, False)
def __sub__(self, other):
""" Return the difference of two sets of intervals. """
return self._merge(other, True)
def _merge(self, other: Intervals | Iterable[tuple[T, T, AbstractSet]], difference: bool) -> Intervals:
""" Return the difference or intersection of two sets of intervals. """
result = Intervals(keep_distinct=self._keep_distinct)
append = result._items.append
# using 'self' and 'other' below forces normalization
bounds1 = _boundaries(self, 'start', 'stop')
bounds2 = _boundaries(Intervals(other, keep_distinct=self._keep_distinct), 'switch', 'switch')
start = None # set by start/stop
recs1 = None # set by start
enabled = difference # changed by switch
if self._keep_distinct:
bounds = sorted(itertools.chain(bounds1, bounds2), key=lambda i: i[0])
else:
bounds = sorted(itertools.chain(bounds1, bounds2))
for value, flag, recs in bounds:
if flag == 'start':
start = value
recs1 = recs
elif flag == 'stop':
if enabled and start < value:
append((start, value, recs1))
start = None
else:
if not enabled and start is not None:
start = value
if enabled and start is not None and start < value:
append((start, value, recs1))
enabled = not enabled
return result
def remove(self, interval):
""" Remove an interval from the set. """
warnings.warn("Deprecated since 19.0, do not mutate intervals", DeprecationWarning)
self._items.remove(interval)
def items(self):
""" Return the intervals. """
warnings.warn("Deprecated since 19.0, just iterate over Intervals", DeprecationWarning)
return self._items
def intervals_overlap(interval_a: tuple[T, T], interval_b: tuple[T, T]) -> bool:
"""Check whether intervals intersect.
:param interval_a:
:param interval_b:
:return: True if two non-zero intervals overlap
"""
start_a, stop_a = interval_a
start_b, stop_b = interval_b
return start_a < stop_b and stop_a > start_b
def invert_intervals(intervals: Iterable[tuple[T, T]], first_start: T, last_stop: T) -> list[tuple[T, T]]:
"""Return the intervals between the intervals that were passed in.
The expected use case is to turn "available intervals" into "unavailable intervals".
:examples:
([(1, 2), (4, 5)], 0, 10) -> [(0, 1), (2, 4), (5, 10)]
:param intervals:
:param first_start: start of whole interval
:param last_stop: stop of whole interval
"""
items = []
prev_stop = first_start
for start, stop in sorted(intervals):
if prev_stop and prev_stop < start and start <= last_stop:
items.append((prev_stop, start))
prev_stop = max(prev_stop, stop)
if last_stop and prev_stop < last_stop:
items.append((prev_stop, last_stop))
# abuse Intervals to merge contiguous intervals
return [(start, stop) for start, stop, _ in Intervals([(start, stop, set()) for start, stop in items])]