In this note, we provide code to compute useful statistics in a dataset in $O(1)$ space.

Sliding window max and min

from collections import deque
def sliding_window_maximum(nums, k):
    """
        nums: observed data stream
        k:    window size ( k <= len(nums) )
    """
    res = [] 
    q = deque()
    for i in range(k):
        num = nums[i]
        while q and nums[q[-1]] <= num:
            q.pop()
        q.append(i)
    res.append(nums[q[0]])
    for i in range(k, len(nums)):
        num = nums[i]
        # adjust window
        if q and q[0] <= i-k:
            q.popleft()
        while q and nums[q[-1]] <= num:
            q.pop()
        q.append(i)
        res.append(nums[q[0]])
    return res

# test
nums = [1, 1, 2, 3, -1, 3, 2]
k = 2 # => [1, 2, 3, 3, 3, 3]
print(sliding_window_maximum(nums, k))

def sliding_window_minimum(nums, k):
    nums_neg = [-num for num in nums]
    neg_res = sliding_window_maximum(nums_neg, k)
    return [-num for num in neg_res]

nums = [1, 1, 2, 3, -1, 3, 2]
k = 2  # => [1, 1, 2, -1, -1, 2]
print(sliding_window_minimum(nums, k))

The outputs are the following

[1, 2, 3, 3, 3, 3]
[1, 1, 2, -1, -1, 2]

$O(1)$ insertion, maximum, average and mode

from collections import defaultdict
class DataStream():
    def __init__(self):
        self.data = []
        # average
        self.n = 0
        self.curr_sum = 0
        # mode
        self.freq_map = defaultdict(int)
        self.max_freq = 0
        self.mode = None
        self.mode_count = defaultdict(int)

        # maximum
        self.curr_max = -float("inf")
        # minimum
        self.curr_min = float("inf")
    def push(self, x):
        self.data.append(x)
        # update maximum and minimum
        self.curr_max = max(self.curr_max, x)
        self.curr_min = min(self.curr_min, x)

        # update average
        self.curr_sum += x
        self.n += 1

        # update mode
        self.freq_map[x] += 1
        freq = self.freq_map[x]
        self.mode_count[freq] += 1
        if freq > self.max_freq:
            self.max_freq = freq
            self.mode = x
        elif freq == self.max_freq:
            self.mode = x
    def get_mode(self):
        return self.mode
    def get_average(self):
        return self.curr_sum / self.n
    def get_maximum(self):
        return self.curr_max
    def get_minimum(self):
        return self.curr_min

track = DataStream()
stream = [3, 3, 1, 2, 2, 2, 4]
for i in range(len(stream)):
    num = stream[i]
    track.push(num)
    print(
        track.get_mode(), track.get_average(), track.get_maximum(), track.get_minimum()
    )

The outputs:

3 3.0 3 3
3 3.0 3 3
3 2.3333333333333335 3 1
3 2.25 3 1
2 2.2 3 1
2 2.1666666666666665 3 1
2 2.4285714285714284 4 1

$O(1)$ popping, update max, average, mode


from collections import defaultdict
class DataStream():
    def __init__(self):
        # all data
        self.data = []
        # max (observation, max is nondecreasing, keep track of a
        # running max with history for each point in self.data)
        self.max_stack = []

        # average
        self.n = 0
        self.curr_sum = 0

        # mode (keep track of a frequency count)
        self.freq = defaultdict(int)
        self.mode_stack = []
        self.max_freq = 0
    
    def push(self, x):
        self.data.append(x)
        # update average
        self.curr_sum += x
        self.n += 1
        # update max
        if not self.max_stack or self.max_stack[-1] <= x:
            self.max_stack.append(x)
        else:
            # repeat the current maximum
            self.max_stack.append(self.max_stack[-1])
        # update mode
        self.freq[x] += 1
        freq_x = self.freq[x]
        if freq_x >= self.max_freq:
            self.max_freq = freq_x
            self.mode_stack.append(x)
        else:
            self.mode_stack.append(self.mode_stack[-1])
    
    def pop(self):
        val = self.data.pop()
        # update average
        self.curr_sum -= val
        self.n -= 1
        # update max
        self.max_stack.pop()
        # update mode
        self.mode_stack.pop()
        self.freq[val] -= 1
        return val
    
    def get_average(self):
        return round(self.curr_sum / self.n, 2)
    def get_max(self):
        if not self.max_stack:
            return None
        return self.max_stack[-1]
    def get_mode(self):
        if not self.mode_stack:
            return None
            
        return self.mode_stack[-1]

track = DataStream()
track.push(3)
print(track.get_average(), track.get_max(), track.get_mode())
track.push(3)
print(track.get_average(), track.get_max(), track.get_mode())
track.push(2)
print(track.get_average(), track.get_max(), track.get_mode())
track.pop()
print(track.get_average(), track.get_max(), track.get_mode())
track.push(4)
print(track.get_average(), track.get_max(), track.get_mode())
track.push(4)
print(track.get_average(), track.get_max(), track.get_mode())
track.pop()
print(track.get_average(), track.get_max(), track.get_mode())
track.pop()
print(track.get_average(), track.get_max(), track.get_mode())

Outputs:

3.0 3 3
3.0 3 3
2.67 3 3
3.0 3 3
3.33 4 3
3.5 4 4
3.33 4 3
3.0 3 3

$O(1)$ operations, last $k$ elements only


from collections import defaultdict, deque
class DataStream():
    def __init__(self, k):
        # size of window
        self.k = k
        # data is moving window
        self.data = deque()
        # avearge
        self.sum = 0
        # max
        self.max_data = deque()
    def push(self, x):
        # need to update deque if len is going to be larger than k
        if len(self.data) == k:
            # update data
            old = self.data.popleft()
            # update current sum 
            self.sum -= old

            # if old popped was max, update
            if self.max_data[0] == old:
                self.max_data.popleft()
        # append new value 
        self.data.append(x)

        # update sum 
        self.sum += x

        # update max
        while self.max_data and self.max_data[-1] < x:
            self.max_data.pop()
        self.max_data.append(x)
    
    def get_average(self):
        if len(self.data) < self.k:
            return round(self.sum / len(self.data), 2)
        return round(self.sum / self.k, 2)
    def get_max(self):
        return round(self.max_data[0], 2)

# testing
data_stream = [1, 2, 3, 3, 2, -1, 2]
k = 3
track = DataStream(k=3)
track.push(data_stream[0])
track.push(data_stream[1])
track.push(data_stream[2])# starts to be more than windowtrack.push(data_stream[3])
print(track.data, track.get_average(), track.get_max())
track.push(data_stream[4])
print(track.data)
track.push(data_stream[5])
print(track.data, track.get_average(), track.get_max())

Outputs:

deque([2, 3, 3]) 2.67 3
deque([3, 3, 2])
deque([3, 2, -1]) 1.33 3

We implement the get_mode functionality separately with a sliding window approach.

import heapq
from collections import deque, defaultdict
class ModeTracker():
    def __init__(self, k):
        self.k = k
        self.window = deque()
        self.freq_map = defaultdict(int)
        self.max_heap = []
        self.removed_elements = defaultdict(int)
    def push(self, num):
        if len(self.window) == self.k:
            old = self.window.popleft()
            self._remove_element(old)
        self.window.append(num)
        self._add_element(num)
    def _add_element(self, num):
        self.freq_map[num] += 1
        heapq.heappush(self.max_heap, (-self.freq_map[num], num))
    def _remove_element(self, num):
        self.freq_map[num] -= 1
        if self.freq_map[num] == 0:
            del self.freq_map[num]
        self.removed_elements[num] += 1
    def get_mode(self):
        while self.max_heap:
            freq, num = self.max_heap[0]
            freq = -freq
            if self.removed_elements[num] > 0:
                heapq.heappop(self.max_heap)
                self.removed_elements[num] -= 1
            else:
                return num
        return None

track = ModeTracker(k=3)
nums = [1, 2, 2, 3, 3, 4, 2, 2, 1, 1, 2, 2, 3]
for num in nums:
    track.push(num)
    print(list(track.window), track.get_mode())

Outputs:

[1] 1
[1, 2] 1
[1, 2, 2] 2
[2, 2, 3] 2
[2, 3, 3] 3
[3, 3, 4] 3
[3, 4, 2] 2
[4, 2, 2] 2
[2, 2, 1] 2
[2, 1, 1] 1
[1, 1, 2] 1
[1, 2, 2] 1
[2, 2, 3] 2
  1. $O(1)$ median (link)
  2. $O(1)$ median query supposing we only care the most recent $k$
  3. Top-$k$ elements