py-13-windowed-aggregator
1.000
Challenge ยท difficulty 5/5
# Windowed streaming aggregator
Implement a file **`solution.py`** containing a class `WindowedAggregator` that ingests
timestamped, grouped events and computes aggregate statistics over a sliding time window.
```python
class WindowedAggregator:
def __init__(self, window: float):
"""window = length of the time window (same units as timestamps)."""
def add(self, ts: float, group: str, value: float) -> None:
"""Record an event. Events may arrive OUT OF ORDER (ts not monotonic)."""
def stats(self, group: str, at: float) -> dict:
"""Aggregate the events of `group` whose timestamp is in the half-open
window (at - window, at] โ i.e. at-window < ts <= at."""
```
## Semantics
- The window is **half-open**: an event whose timestamp is exactly `at - window` is **excluded**,
and an event whose timestamp is exactly `at` is **included**. Formally, an event with timestamp
`ts` is in the window iff `at - window < ts <= at`.
- **Groups are independent.** `stats` for a group must never observe events recorded under any other
group.
- **Out-of-order ingestion is allowed.** `add` may be called with timestamps in any order. `stats`
considers every event added so far regardless of insertion order; calling `add` with the same
events in a different order yields identical `stats` results.
- **Duplicates count.** Adding the same `(ts, group)` pair (or even the same `(ts, group, value)`)
more than once records multiple independent events, all of which contribute to the aggregates.
- `stats` returns a dict with **exactly** these keys:
- `"count"`: `int` โ number of events in the window.
- `"sum"`: `float` โ sum of their values (`0.0` if there are none).
- `"min"`: `float` or `None` โ minimum value, or `None` if there are no events.
- `"max"`: `float` or `None` โ maximum value, or `None` if there are no events.
- `"mean"`: `float` or `None` โ `sum / count`, or `None` if there are no events.
- An **unknown group**, or a window that **contains no events**, returns
`{"count": 0, "sum": 0.0, "min": None, "max": None, "mean": None}`.
## Example
```python
agg = WindowedAggregator(window=10.0)
agg.add(100.0, "cpu", 5.0)
agg.add(95.0, "cpu", 1.0)
agg.add(105.0, "cpu", 3.0)
agg.add(100.0, "mem", 50.0) # different group, ignored by "cpu" stats
# Window for at=105, window=10 is (95, 105]:
# ts=95 is excluded (boundary at-window)
# ts=100 is included
# ts=105 is included
s = agg.stats("cpu", at=105.0)
assert s == {"count": 2, "sum": 8.0, "min": 3.0, "max": 5.0, "mean": 4.0}
# Advance `at` so the window (100, 110] only contains ts=105:
s2 = agg.stats("cpu", at=110.0)
assert s2 == {"count": 1, "sum": 3.0, "min": 3.0, "max": 3.0, "mean": 3.0}
# Unknown group:
assert agg.stats("disk", at=105.0) == {
"count": 0, "sum": 0.0, "min": None, "max": None, "mean": None,
}
```
tests/test_windowed_aggregator.py
import random
from solution import WindowedAggregator
ZERO = {"count": 0, "sum": 0.0, "min": None, "max": None, "mean": None}
def test_empty_and_unknown_group_returns_zero_dict():
agg = WindowedAggregator(window=10.0)
assert agg.stats("nope", at=0.0) == ZERO
agg.add(5.0, "g", 1.0)
# group exists, but window (90, 100] contains nothing
assert agg.stats("g", at=100.0) == ZERO
# still-unknown group
assert agg.stats("other", at=5.0) == ZERO
def test_basic_single_event_stats():
agg = WindowedAggregator(window=10.0)
agg.add(100.0, "g", 4.0)
s = agg.stats("g", at=100.0)
assert s == {"count": 1, "sum": 4.0, "min": 4.0, "max": 4.0, "mean": 4.0}
def test_inclusion_at_upper_bound():
# event exactly at `at` is INCLUDED
agg = WindowedAggregator(window=10.0)
agg.add(105.0, "g", 7.0)
s = agg.stats("g", at=105.0)
assert s["count"] == 1
assert s["sum"] == 7.0
def test_exclusion_at_lower_bound():
# event exactly at at-window is EXCLUDED (half-open lower bound)
agg = WindowedAggregator(window=10.0)
agg.add(95.0, "g", 7.0)
s = agg.stats("g", at=105.0) # window (95, 105]
assert s == ZERO
def test_boundary_pair_together():
agg = WindowedAggregator(window=10.0)
agg.add(95.0, "g", 1.0) # excluded
agg.add(105.0, "g", 3.0) # included
s = agg.stats("g", at=105.0)
assert s == {"count": 1, "sum": 3.0, "min": 3.0, "max": 3.0, "mean": 3.0}
def test_min_max_mean_sum_count_correctness():
agg = WindowedAggregator(window=100.0)
values = [3.0, -2.0, 10.5, 4.0, 0.0]
for i, v in enumerate(values):
agg.add(float(i), "g", v)
# timestamps are 0..4; window (4-100, 4] = (-96, 4] covers all of them
s = agg.stats("g", at=4.0)
assert s["count"] == 5
assert s["sum"] == sum(values)
assert s["min"] == min(values)
assert s["max"] == max(values)
assert s["mean"] == sum(values) / len(values)
def test_group_isolation():
agg = WindowedAggregator(window=10.0)
agg.add(100.0, "a", 1.0)
agg.add(100.0, "a", 2.0)
agg.add(100.0, "b", 100.0)
sa = agg.stats("a", at=100.0)
sb = agg.stats("b", at=100.0)
assert sa == {"count": 2, "sum": 3.0, "min": 1.0, "max": 2.0, "mean": 1.5}
assert sb == {"count": 1, "sum": 100.0, "min": 100.0, "max": 100.0, "mean": 100.0}
def test_out_of_order_matches_in_order():
events = [(100.0, 5.0), (95.0, 1.0), (105.0, 3.0), (102.0, 9.0), (98.0, 2.0)]
ordered = WindowedAggregator(window=10.0)
for ts, v in sorted(events):
ordered.add(ts, "g", v)
shuffled = WindowedAggregator(window=10.0)
rnd = list(events)
random.Random(1234).shuffle(rnd)
for ts, v in rnd:
shuffled.add(ts, "g", v)
for at in (95.0, 100.0, 105.0, 110.0, 120.0):
assert ordered.stats("g", at=at) == shuffled.stats("g", at=at)
def test_duplicate_ts_group_events_all_count():
agg = WindowedAggregator(window=10.0)
agg.add(100.0, "g", 2.0)
agg.add(100.0, "g", 2.0) # identical (ts, group, value)
agg.add(100.0, "g", 5.0) # same (ts, group), different value
s = agg.stats("g", at=100.0)
assert s["count"] == 3
assert s["sum"] == 9.0
assert s["min"] == 2.0
assert s["max"] == 5.0
assert s["mean"] == 3.0
def test_moving_window_includes_and_excludes():
agg = WindowedAggregator(window=10.0)
for ts in (90.0, 95.0, 100.0, 105.0, 110.0):
agg.add(ts, "g", ts) # value == ts for easy checking
# at=100 -> window (90, 100]: ts in {95, 100}
s = agg.stats("g", at=100.0)
assert s["count"] == 2
assert s["sum"] == 95.0 + 100.0
assert s["min"] == 95.0 and s["max"] == 100.0
# at=105 -> window (95, 105]: ts in {100, 105}
s = agg.stats("g", at=105.0)
assert s["count"] == 2
assert s["min"] == 100.0 and s["max"] == 105.0
# at=110 -> window (100, 110]: ts in {105, 110}
s = agg.stats("g", at=110.0)
assert s["count"] == 2
assert s["min"] == 105.0 and s["max"] == 110.0
# at far in the future -> window past all events
assert agg.stats("g", at=1000.0) == ZERO
def test_stats_is_non_destructive_and_repeatable():
agg = WindowedAggregator(window=10.0)
agg.add(100.0, "g", 1.0)
agg.add(105.0, "g", 3.0)
first = agg.stats("g", at=105.0)
second = agg.stats("g", at=105.0)
assert first == second # querying must not consume events
def test_exact_keys_present():
agg = WindowedAggregator(window=10.0)
agg.add(100.0, "g", 1.0)
s = agg.stats("g", at=100.0)
assert set(s.keys()) == {"count", "sum", "min", "max", "mean"}
assert isinstance(s["count"], int)
# empty case keys too
e = agg.stats("g", at=1000.0)
assert set(e.keys()) == {"count", "sum", "min", "max", "mean"}
def test_float_window_and_timestamps():
agg = WindowedAggregator(window=1.5)
agg.add(10.0, "g", 1.0)
agg.add(10.4, "g", 2.0)
agg.add(10.5, "g", 3.0)
# at=11.0 -> window (9.5, 11.0]: all three included
s = agg.stats("g", at=11.0)
assert s["count"] == 3
# at=11.5 -> window (10.0, 11.5]: ts=10.0 excluded (lower bound)
s2 = agg.stats("g", at=11.5)
assert s2["count"] == 2
assert s2["min"] == 2.0 and s2["max"] == 3.0
System prompt
You are an expert programmer. Solve the task exactly as specified. Output your solution as fenced code blocks using the required file name(s) and the exact function/type signatures requested. Prefer correctness; do not include prose outside code unless asked.
Proposed solution
```python
from collections import defaultdict
from typing import Dict, List, Optional
class WindowedAggregator:
def __init__(self, window: float):
"""window = length of the time window (same units as timestamps)."""
self.window = window
# Map group name -> list of (timestamp, value) tuples
self._events: Dict[str, List[tuple[float, float]]] = defaultdict(list)
def add(self, ts: float, group: str, value: float) -> None:
"""Record an event. Events may arrive OUT OF ORDER (ts not monotonic)."""
self._events[group].append((ts, value))
def stats(self, group: str, at: float) -> dict:
"""Aggregate the events of `group` whose timestamp is in the half-open
window (at - window, at] โ i.e. at-window < ts <= at."""
if group not in self._events:
return {
"count": 0,
"sum": 0.0,
"min": None,
"max": None,
"mean": None,
}
events = self._events[group]
# Filter events based on the half-open window condition
# at - window < ts <= at
start = at - self.window
window_events = [(ts, val) for ts, val in events if start < ts <= at]
if not window_events:
return {
"count": 0,
"sum": 0.0,
"min": None,
"max": None,
"mean": None,
}
count = len(window_events)
total = sum(val for _, val in window_events)
min_val = min(val for _, val in window_events)
max_val = max(val for _, val in window_events)
mean_val = total / count
return {
"count": count,
"sum": total,
"min": min_val,
"max": max_val,
"mean": mean_val,
}
```Test output (stdout)
............. [100%] 13 passed in 0.02s