โ† run

py-13-windowed-aggregator

1.000
13/13 testsยท architecture
Challenge ยท difficulty 5/5
# Windowed streaming aggregator

Implement a file **`solution.py`** containing a class `WindowedAggregator` that ingests
timestamped, grouped events and computes aggregate statistics over a sliding time window.

```python
class WindowedAggregator:
    def __init__(self, window: float):
        """window = length of the time window (same units as timestamps)."""

    def add(self, ts: float, group: str, value: float) -> None:
        """Record an event. Events may arrive OUT OF ORDER (ts not monotonic)."""

    def stats(self, group: str, at: float) -> dict:
        """Aggregate the events of `group` whose timestamp is in the half-open
        window (at - window, at] โ€” i.e. at-window < ts <= at."""
```

## Semantics

- The window is **half-open**: an event whose timestamp is exactly `at - window` is **excluded**,
  and an event whose timestamp is exactly `at` is **included**. Formally, an event with timestamp
  `ts` is in the window iff `at - window < ts <= at`.
- **Groups are independent.** `stats` for a group must never observe events recorded under any other
  group.
- **Out-of-order ingestion is allowed.** `add` may be called with timestamps in any order. `stats`
  considers every event added so far regardless of insertion order; calling `add` with the same
  events in a different order yields identical `stats` results.
- **Duplicates count.** Adding the same `(ts, group)` pair (or even the same `(ts, group, value)`)
  more than once records multiple independent events, all of which contribute to the aggregates.
- `stats` returns a dict with **exactly** these keys:
  - `"count"`: `int` โ€” number of events in the window.
  - `"sum"`: `float` โ€” sum of their values (`0.0` if there are none).
  - `"min"`: `float` or `None` โ€” minimum value, or `None` if there are no events.
  - `"max"`: `float` or `None` โ€” maximum value, or `None` if there are no events.
  - `"mean"`: `float` or `None` โ€” `sum / count`, or `None` if there are no events.
- An **unknown group**, or a window that **contains no events**, returns
  `{"count": 0, "sum": 0.0, "min": None, "max": None, "mean": None}`.

## Example

```python
agg = WindowedAggregator(window=10.0)
agg.add(100.0, "cpu", 5.0)
agg.add(95.0,  "cpu", 1.0)
agg.add(105.0, "cpu", 3.0)
agg.add(100.0, "mem", 50.0)   # different group, ignored by "cpu" stats

# Window for at=105, window=10 is (95, 105]:
#   ts=95  is excluded (boundary at-window)
#   ts=100 is included
#   ts=105 is included
s = agg.stats("cpu", at=105.0)
assert s == {"count": 2, "sum": 8.0, "min": 3.0, "max": 5.0, "mean": 4.0}

# Advance `at` so the window (100, 110] only contains ts=105:
s2 = agg.stats("cpu", at=110.0)
assert s2 == {"count": 1, "sum": 3.0, "min": 3.0, "max": 3.0, "mean": 3.0}

# Unknown group:
assert agg.stats("disk", at=105.0) == {
    "count": 0, "sum": 0.0, "min": None, "max": None, "mean": None,
}
```
tests/test_windowed_aggregator.py
import random

from solution import WindowedAggregator

ZERO = {"count": 0, "sum": 0.0, "min": None, "max": None, "mean": None}


def test_empty_and_unknown_group_returns_zero_dict():
    agg = WindowedAggregator(window=10.0)
    assert agg.stats("nope", at=0.0) == ZERO
    agg.add(5.0, "g", 1.0)
    # group exists, but window (90, 100] contains nothing
    assert agg.stats("g", at=100.0) == ZERO
    # still-unknown group
    assert agg.stats("other", at=5.0) == ZERO


def test_basic_single_event_stats():
    agg = WindowedAggregator(window=10.0)
    agg.add(100.0, "g", 4.0)
    s = agg.stats("g", at=100.0)
    assert s == {"count": 1, "sum": 4.0, "min": 4.0, "max": 4.0, "mean": 4.0}


def test_inclusion_at_upper_bound():
    # event exactly at `at` is INCLUDED
    agg = WindowedAggregator(window=10.0)
    agg.add(105.0, "g", 7.0)
    s = agg.stats("g", at=105.0)
    assert s["count"] == 1
    assert s["sum"] == 7.0


def test_exclusion_at_lower_bound():
    # event exactly at at-window is EXCLUDED (half-open lower bound)
    agg = WindowedAggregator(window=10.0)
    agg.add(95.0, "g", 7.0)
    s = agg.stats("g", at=105.0)  # window (95, 105]
    assert s == ZERO


def test_boundary_pair_together():
    agg = WindowedAggregator(window=10.0)
    agg.add(95.0, "g", 1.0)   # excluded
    agg.add(105.0, "g", 3.0)  # included
    s = agg.stats("g", at=105.0)
    assert s == {"count": 1, "sum": 3.0, "min": 3.0, "max": 3.0, "mean": 3.0}


def test_min_max_mean_sum_count_correctness():
    agg = WindowedAggregator(window=100.0)
    values = [3.0, -2.0, 10.5, 4.0, 0.0]
    for i, v in enumerate(values):
        agg.add(float(i), "g", v)
    # timestamps are 0..4; window (4-100, 4] = (-96, 4] covers all of them
    s = agg.stats("g", at=4.0)
    assert s["count"] == 5
    assert s["sum"] == sum(values)
    assert s["min"] == min(values)
    assert s["max"] == max(values)
    assert s["mean"] == sum(values) / len(values)


def test_group_isolation():
    agg = WindowedAggregator(window=10.0)
    agg.add(100.0, "a", 1.0)
    agg.add(100.0, "a", 2.0)
    agg.add(100.0, "b", 100.0)
    sa = agg.stats("a", at=100.0)
    sb = agg.stats("b", at=100.0)
    assert sa == {"count": 2, "sum": 3.0, "min": 1.0, "max": 2.0, "mean": 1.5}
    assert sb == {"count": 1, "sum": 100.0, "min": 100.0, "max": 100.0, "mean": 100.0}


def test_out_of_order_matches_in_order():
    events = [(100.0, 5.0), (95.0, 1.0), (105.0, 3.0), (102.0, 9.0), (98.0, 2.0)]

    ordered = WindowedAggregator(window=10.0)
    for ts, v in sorted(events):
        ordered.add(ts, "g", v)

    shuffled = WindowedAggregator(window=10.0)
    rnd = list(events)
    random.Random(1234).shuffle(rnd)
    for ts, v in rnd:
        shuffled.add(ts, "g", v)

    for at in (95.0, 100.0, 105.0, 110.0, 120.0):
        assert ordered.stats("g", at=at) == shuffled.stats("g", at=at)


def test_duplicate_ts_group_events_all_count():
    agg = WindowedAggregator(window=10.0)
    agg.add(100.0, "g", 2.0)
    agg.add(100.0, "g", 2.0)  # identical (ts, group, value)
    agg.add(100.0, "g", 5.0)  # same (ts, group), different value
    s = agg.stats("g", at=100.0)
    assert s["count"] == 3
    assert s["sum"] == 9.0
    assert s["min"] == 2.0
    assert s["max"] == 5.0
    assert s["mean"] == 3.0


def test_moving_window_includes_and_excludes():
    agg = WindowedAggregator(window=10.0)
    for ts in (90.0, 95.0, 100.0, 105.0, 110.0):
        agg.add(ts, "g", ts)  # value == ts for easy checking

    # at=100 -> window (90, 100]: ts in {95, 100}
    s = agg.stats("g", at=100.0)
    assert s["count"] == 2
    assert s["sum"] == 95.0 + 100.0
    assert s["min"] == 95.0 and s["max"] == 100.0

    # at=105 -> window (95, 105]: ts in {100, 105}
    s = agg.stats("g", at=105.0)
    assert s["count"] == 2
    assert s["min"] == 100.0 and s["max"] == 105.0

    # at=110 -> window (100, 110]: ts in {105, 110}
    s = agg.stats("g", at=110.0)
    assert s["count"] == 2
    assert s["min"] == 105.0 and s["max"] == 110.0

    # at far in the future -> window past all events
    assert agg.stats("g", at=1000.0) == ZERO


def test_stats_is_non_destructive_and_repeatable():
    agg = WindowedAggregator(window=10.0)
    agg.add(100.0, "g", 1.0)
    agg.add(105.0, "g", 3.0)
    first = agg.stats("g", at=105.0)
    second = agg.stats("g", at=105.0)
    assert first == second  # querying must not consume events


def test_exact_keys_present():
    agg = WindowedAggregator(window=10.0)
    agg.add(100.0, "g", 1.0)
    s = agg.stats("g", at=100.0)
    assert set(s.keys()) == {"count", "sum", "min", "max", "mean"}
    assert isinstance(s["count"], int)
    # empty case keys too
    e = agg.stats("g", at=1000.0)
    assert set(e.keys()) == {"count", "sum", "min", "max", "mean"}


def test_float_window_and_timestamps():
    agg = WindowedAggregator(window=1.5)
    agg.add(10.0, "g", 1.0)
    agg.add(10.4, "g", 2.0)
    agg.add(10.5, "g", 3.0)
    # at=11.0 -> window (9.5, 11.0]: all three included
    s = agg.stats("g", at=11.0)
    assert s["count"] == 3
    # at=11.5 -> window (10.0, 11.5]: ts=10.0 excluded (lower bound)
    s2 = agg.stats("g", at=11.5)
    assert s2["count"] == 2
    assert s2["min"] == 2.0 and s2["max"] == 3.0
System prompt
You are an expert programmer. Solve the task exactly as specified. Output your solution as fenced code blocks using the required file name(s) and the exact function/type signatures requested. Prefer correctness; do not include prose outside code unless asked.
Proposed solution
Looking at the requirements, I need to implement a `WindowedAggregator` class that:

1. Stores timestamped events grouped by group name
2. Handles out-of-order events
3. Computes statistics over a sliding time window (half-open: (at-window, at])
4. Returns count, sum, min, max, and mean for the specified group in the window

Let me break down the implementation:

- I'll use a dictionary to store events per group, where each group maps to a list of (timestamp, value) tuples
- For `add`, I'll simply append the event to the appropriate group's list
- For `stats`, I'll filter events in the window and compute the required statistics

```python
class WindowedAggregator:
    def __init__(self, window: float):
        """window = length of the time window (same units as timestamps)."""
        self.window = window
        self.events = {}  # group -> list of (timestamp, value) tuples
    
    def add(self, ts: float, group: str, value: float) -> None:
        """Record an event. Events may arrive OUT OF ORDER (ts not monotonic)."""
        if group not in self.events:
            self.events[group] = []
        self.events[group].append((ts, value))
    
    def stats(self, group: str, at: float) -> dict:
        """Aggregate the events of `group` whose timestamp is in the half-open
        window (at - window, at] โ€” i.e. at-window < ts <= at."""
        # Get events for the group, or empty list if group doesn't exist
        group_events = self.events.get(group, [])
        
        # Filter events in the window: at - window < ts <= at
        window_start = at - self.window
        filtered_values = [value for ts, value in group_events if window_start < ts <= at]
        
        count = len(filtered_values)
        
        if count == 0:
            return {
                "count": 0,
                "sum": 0.0,
                "min": None,
                "max": None,
                "mean": None
            }
        
        total_sum = sum(filtered_values)
        min_val = min(filtered_values)
        max_val = max(filtered_values)
        mean_val = total_sum / count
        
        return {
            "count": count,
            "sum": total_sum,
            "min": min_val,
            "max": max_val,
            "mean": mean_val
        }
```

Test output (stdout)

.............                                                            [100%]
13 passed in 0.02s