| name | data-streaming-aggregations |
| description | Windowing, sessionization, time-series aggregation, and late data handling for streaming systems |
Streaming Aggregations
Scope: Windowing strategies, sessionization, time-series aggregation, watermarks, late data handling
Lines: 385
Last Updated: 2025-10-27
Format Version: 1.0 (Atomic)
When to Use This Skill
Use this skill when:
- Aggregating unbounded streams (counts, sums, averages)
- Implementing time-based windows (tumbling, sliding, session)
- Building real-time dashboards and metrics
- Detecting patterns in time-series data
- Handling out-of-order and late-arriving data
- Computing sessionized analytics (user sessions, click streams)
- Building real-time anomaly detection
- Creating streaming leaderboards and rankings
Core Concepts
Window Types
Tumbling Windows
ā Fixed size, non-overlapping
ā Example: Count events per 5 minutes
ā Use: Periodic reports, batched processing
ā Memory: O(window_size)
Sliding Windows
ā Fixed size, overlapping
ā Example: Moving average over last 10 minutes
ā Use: Continuous metrics, smoothed trends
ā Memory: O(window_size * slide_factor)
Session Windows
ā Dynamic size, gap-based
ā Example: User sessions with 30-min inactivity
ā Use: User behavior, conversation threads
ā Memory: O(active_sessions)
Global Windows
ā Unbounded, single window
ā Example: All-time counts
ā Use: Stateful processing without time bounds
ā Memory: O(cardinality)
Time Semantics
Event Time
ā Time when event occurred
ā Requires: Timestamps in data
ā Accurate but complex (late data)
ā Use: Financial, billing, analytics
Processing Time
ā Time when event processed
ā Simple, low latency
ā Inaccurate for time-based logic
ā Use: Monitoring, system metrics
Ingestion Time
ā Time when event entered system
ā Middle ground
ā Use: Approximation when no event time
Watermarks
Watermark(T)
ā "All events before T have arrived"
ā Heuristic, not guarantee
ā Triggers window computation
Strategies:
ā Perfect: Wait forever (impractical)
ā Bounded delay: T = max_timestamp - delay
ā Percentile: Allow X% late data
ā Punctuation: Explicit markers in stream
Late Data Handling
Strategies:
ā Drop: Ignore late data (simplest)
ā Update: Recompute window (expensive)
ā Side output: Route to separate stream
ā Allowed lateness: Accept within window
Trade-offs:
ā Accuracy vs Latency
ā Completeness vs Timeliness
Patterns
Pattern 1: Tumbling Window Aggregation (Rust/Timely)
use timely::dataflow::operators::{ToStream, Map, Inspect};
use differential_dataflow::input::Input;
use differential_dataflow::operators::{Reduce, Consolidate};
use differential_dataflow::operators::arrange::ArrangeByKey;
#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
struct Event {
user_id: u32,
value: i32,
timestamp: u64,
}
fn main() {
timely::execute_from_args(std::env::args(), |worker| {
let mut input = worker.dataflow::<u64, _, _>(|scope| {
let (input, events) = scope.new_collection();
let window_size = 300_000u64;
events
.map(move |event| {
let window_id = event.timestamp / window_size;
((event.user_id, window_id), event.value)
})
.reduce(|_key, values, output| {
let sum: i32 = values.iter()
.map(|(value, diff)| value * diff)
.sum();
let count: i32 = values.iter()
.map(|(_, diff)| diff)
.sum();
output.push(((sum, count), 1));
})
.inspect(|((user_id, window_id), (sum, count))| {
println!("User {}, Window {}: sum={}, count={}",
user_id, window_id, sum, count);
});
input
});
input.insert(Event { user_id: 1, value: 10, timestamp: 100_000 });
input.insert(Event { user_id: 1, value: 20, timestamp: 150_000 });
input.insert(Event { user_id: 1, value: 30, timestamp: 350_000 });
input.advance_to(1);
worker.step_while(|| input.time().less_than(&1));
}).expect("Execution failed");
}
Pattern 2: Sliding Window (Python)
import time
from collections import deque
from dataclasses import dataclass
from typing import Deque, Dict
import threading
@dataclass
class Event:
timestamp: float
user_id: str
value: float
class SlidingWindowAggregator:
"""Sliding window with event-time semantics"""
def __init__(self, window_size: float, slide_interval: float):
self.window_size = window_size
self.slide_interval = slide_interval
self.buffers: Dict[str, Deque[Event]] = {}
self.lock = threading.Lock()
def process(self, event: Event):
"""Process event and return windows that are ready"""
with self.lock:
if event.user_id not in self.buffers:
self.buffers[event.user_id] = deque()
buffer = self.buffers[event.user_id]
buffer.append(event)
cutoff = event.timestamp - self.window_size
while buffer and buffer[0].timestamp < cutoff:
buffer.popleft()
total = sum(e.value for e in buffer)
count = len(buffer)
avg = total / count if count > 0 else 0.0
return {
'user_id': event.user_id,
'window_start': cutoff,
'window_end': event.timestamp,
'sum': total,
'count': count,
'avg': avg
}
def main():
aggregator = SlidingWindowAggregator(
window_size=10.0,
slide_interval=2.0
)
events = [
Event(timestamp=1.0, user_id='alice', value=10),
Event(timestamp=2.0, user_id='alice', value=20),
Event(timestamp=5.0, user_id='alice', value=30),
Event(timestamp=12.0, user_id='alice', value=40),
]
for event in events:
result = aggregator.process(event)
print(f"Window result: {result}")
if __name__ == '__main__':
main()
Pattern 3: Session Windows (Go)
package main
import (
"fmt"
"sort"
"time"
)
type Event struct {
UserID string
Timestamp time.Time
Value int
}
type Session struct {
UserID string
Start time.Time
End time.Time
Events []Event
Sum int
}
type SessionWindowAggregator struct {
inactivityGap time.Duration
sessions map[string]*Session
}
func NewSessionWindowAggregator(gap time.Duration) *SessionWindowAggregator {
return &SessionWindowAggregator{
inactivityGap: gap,
sessions: make(map[string]*Session),
}
}
func (swa *SessionWindowAggregator) Process(event Event) *Session {
session, exists := swa.sessions[event.UserID]
if !exists || event.Timestamp.Sub(session.End) > swa.inactivityGap {
if exists {
completed := session
fmt.Printf("Session complete: %+v\n", completed)
}
session = &Session{
UserID: event.UserID,
Start: event.Timestamp,
End: event.Timestamp,
Events: []Event{event},
Sum: event.Value,
}
swa.sessions[event.UserID] = session
} else {
session.End = event.Timestamp
session.Events = append(session.Events, event)
session.Sum += event.Value
}
return nil
}
func (swa *SessionWindowAggregator) FlushExpired(currentTime time.Time) []*Session {
var expired []*Session
for userID, session := range swa.sessions {
if currentTime.Sub(session.End) > swa.inactivityGap {
expired = append(expired, session)
delete(swa.sessions, userID)
}
}
return expired
}
func main() {
aggregator := NewSessionWindowAggregator(5 * time.Minute)
events := []Event{
{UserID: "alice", Timestamp: time.Now(), Value: 10},
{UserID: "alice", Timestamp: time.Now().Add(1 * time.Minute), Value: 20},
{UserID: "alice", Timestamp: time.Now().Add(10 * time.Minute), Value: 30},
{UserID: "bob", Timestamp: time.Now().Add(2 * time.Minute), Value: 15},
}
for _, event := range events {
aggregator.Process(event)
}
expired := aggregator.FlushExpired(time.Now().Add(20 * time.Minute))
for _, session := range expired {
fmt.Printf("Expired session: %+v\n", session)
}
}
Pattern 4: Watermark-Based Late Data Handling (Python)
from dataclasses import dataclass
from typing import Dict, List, Optional
import heapq
@dataclass
class Event:
timestamp: int
key: str
value: int
@dataclass
class Window:
start: int
end: int
key: str
sum: int = 0
count: int = 0
class WatermarkAggregator:
"""Window aggregator with watermark-based triggering"""
def __init__(
self,
window_size: int,
allowed_lateness: int,
watermark_delay: int
):
self.window_size = window_size
self.allowed_lateness = allowed_lateness
self.watermark_delay = watermark_delay
self.windows: Dict[tuple, Window] = {}
self.max_timestamp = 0
self.watermark = 0
self.late_events: List[Event] = []
def process(self, event: Event) -> List[Window]:
"""Process event and emit completed windows"""
self.max_timestamp = max(self.max_timestamp, event.timestamp)
self.watermark = self.max_timestamp - self.watermark_delay
window_start = (event.timestamp // self.window_size) * self.window_size
window_end = window_start + self.window_size
window_key = (event.key, window_start)
if event.timestamp < self.watermark - self.allowed_lateness:
self.late_events.append(event)
print(f"Late event dropped: {event}")
return []
if window_key not in self.windows:
self.windows[window_key] = Window(
start=window_start,
end=window_end,
key=event.key
)
window = self.windows[window_key]
window.sum += event.value
window.count += 1
return self._emit_completed_windows()
def _emit_completed_windows(self) -> List[Window]:
"""Emit windows that watermark has passed"""
completed = []
to_remove = []
for (key, window_start), window in self.windows.items():
if self.watermark > window.end + self.allowed_lateness:
completed.append(window)
to_remove.append((key, window_start))
for key in to_remove:
del self.windows[key]
return completed
def main():
aggregator = WatermarkAggregator(
window_size=10,
allowed_lateness=5,
watermark_delay=3
)
events = [
Event(timestamp=5, key='user1', value=10),
Event(timestamp=15, key='user1', value=20),
Event(timestamp=8, key='user1', value=15),
Event(timestamp=25, key='user1', value=30),
Event(timestamp=3, key='user1', value=5),
]
for event in events:
completed = aggregator.process(event)
for window in completed:
print(f"Window complete: {window}")
if __name__ == '__main__':
main()
Pattern 5: Time-Series Aggregation with Downsampling
import numpy as np
from dataclasses import dataclass
from typing import Dict, List
from collections import defaultdict
@dataclass
class TimeSeriesPoint:
timestamp: int
metric: str
value: float
class TimeSeriesAggregator:
"""Multi-resolution time-series aggregation"""
def __init__(self):
self.raw: Dict[str, List[TimeSeriesPoint]] = defaultdict(list)
self.minute: Dict[str, List[tuple]] = defaultdict(list)
self.hour: Dict[str, List[tuple]] = defaultdict(list)
def ingest(self, point: TimeSeriesPoint):
"""Ingest raw point and update aggregations"""
self.raw[point.metric].append(point)
minute_bucket = (point.timestamp // 60) * 60
self._update_aggregation(point.metric, minute_bucket, point.value, self.minute)
hour_bucket = (point.timestamp // 3600) * 3600
self._update_aggregation(point.metric, hour_bucket, point.value, self.hour)
def _update_aggregation(
self,
metric: str,
bucket: int,
value: float,
storage: Dict[str, List[tuple]]
):
"""Update aggregation bucket"""
buckets = storage[metric]
if not buckets or buckets[-1][0] != bucket:
buckets.append((bucket, value, value, value, 1))
else:
old_time, old_sum, old_min, old_max, old_count = buckets[-1]
buckets[-1] = (
old_time,
old_sum + value,
min(old_min, value),
max(old_max, value),
old_count + 1
)
def query(
self,
metric: str,
start: int,
end: int,
resolution: str = 'minute'
) -> List[tuple]:
"""Query aggregated data at specified resolution"""
storage = {
'minute': self.minute,
'hour': self.hour,
'raw': self.raw
}[resolution]
if resolution == 'raw':
points = storage[metric]
filtered = [p for p in points if start <= p.timestamp <= end]
return [(p.timestamp, p.value) for p in filtered]
buckets = storage[metric]
filtered = [
(timestamp, total/count, min_val, max_val)
for timestamp, total, min_val, max_val, count in buckets
if start <= timestamp <= end
]
return filtered
def main():
agg = TimeSeriesAggregator()
for i in range(7200):
point = TimeSeriesPoint(
timestamp=i,
metric='cpu_usage',
value=50 + 10 * np.sin(i / 60.0)
)
agg.ingest(point)
minute_data = agg.query('cpu_usage', 0, 3600, resolution='minute')
print(f"Minute-level aggregation: {len(minute_data)} points")
hour_data = agg.query('cpu_usage', 0, 7200, resolution='hour')
print(f"Hour-level aggregation: {len(hour_data)} points")
for timestamp, avg, min_val, max_val in hour_data:
print(f"Hour {timestamp//3600}: avg={avg:.2f}, min={min_val:.2f}, max={max_val:.2f}")
if __name__ == '__main__':
main()
Pattern 6: Top-K Streaming Aggregation
use std::collections::BinaryHeap;
use std::cmp::Reverse;
#[derive(Debug, Clone, Eq, PartialEq)]
struct Item {
key: String,
count: usize,
}
impl Ord for Item {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.count.cmp(&other.count)
}
}
impl PartialOrd for Item {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
struct TopKAggregator {
k: usize,
counts: std::collections::HashMap<String, usize>,
heap: BinaryHeap<Reverse<Item>>,
}
impl TopKAggregator {
fn new(k: usize) -> Self {
Self {
k,
counts: std::collections::HashMap::new(),
heap: BinaryHeap::new(),
}
}
fn update(&mut self, key: String) {
let count = self.counts.entry(key.clone()).or_insert(0);
*count += 1;
self.rebuild_heap();
}
fn rebuild_heap(&mut self) {
self.heap.clear();
for (key, count) in &self.counts {
let item = Item {
key: key.clone(),
count: *count,
};
if self.heap.len() < self.k {
self.heap.push(Reverse(item));
} else if let Some(Reverse(min)) = self.heap.peek() {
if item.count > min.count {
self.heap.pop();
self.heap.push(Reverse(item));
}
}
}
}
fn top_k(&self) -> Vec<Item> {
let mut items: Vec<_> = self.heap.iter()
.map(|Reverse(item)| item.clone())
.collect();
items.sort_by(|a, b| b.count.cmp(&a.count));
items
}
}
fn main() {
let mut aggregator = TopKAggregator::new(3);
let events = vec!["apple", "banana", "apple", "cherry", "apple", "banana", "date"];
for event in events {
aggregator.update(event.to_string());
}
println!("Top 3 items:");
for item in aggregator.top_k() {
println!(" {}: {}", item.key, item.count);
}
}
Quick Reference
Window Selection Guide
Use Tumbling: Periodic reports, non-overlapping batches
Use Sliding: Moving averages, continuous metrics
Use Session: User behavior, conversation analysis
Use Global: Unbounded state, all-time aggregations
Watermark Strategies
watermark = max_timestamp - fixed_delay
watermark = percentile(timestamps, 99)
watermark = max_timestamp - 2 * stddev(inter_arrival_time)
Time Extraction
let event_time = |event: &Event| event.timestamp;
let window_id = event.timestamp / window_size;
if current_time - last_event_time > session_gap {
}
Performance Optimization
Reduce State Size
ā Compact old windows
ā Use approximate algorithms (HyperLogLog, Count-Min Sketch)
ā Expire inactive keys
Batch Processing
ā Buffer events before aggregating
ā Periodic window evaluation
Incremental Updates
ā Use differential dataflow for efficient re-aggregation
ā Maintain summary statistics (sum, count) instead of raw data
Anti-Patterns
ā NEVER: Use processing time for event-time logic
ā Results depend on processing speed, not actual event timing
ā NEVER: Wait indefinitely for late data
ā Set allowed lateness bounds
ā NEVER: Store unbounded state in global windows
ā Use approximate algorithms or periodic cleanup
ā NEVER: Ignore watermarks
ā Windows never complete, state grows unbounded
ā NEVER: Use sliding windows with small slide interval on high-volume streams
ā Creates many overlapping windows, high memory usage
ā NEVER: Recompute entire window on late data
ā Use incremental updates
ā NEVER: Assume events arrive in order
ā Always design for out-of-order delivery
ā NEVER: Use session windows without timeouts
ā Sessions never close, memory leak
ā NEVER: Emit window before watermark passes
ā Incomplete results
ā NEVER: Drop late data without logging
ā Monitor late data rates for tuning
Related Skills
timely-dataflow.md - Foundation for windowing with progress tracking
differential-dataflow.md - Incremental window updates
dataflow-coordination.md - Watermarks and coordination
stream-processing.md - High-level stream processing with Kafka
Last Updated: 2025-10-27
Format Version: 1.0 (Atomic)