In this note, we provide code to compute useful statistics in a dataset in $O(1)$ space.
Sliding window max and min
from collections import deque
def sliding_window_maximum(nums, k):
"""
nums: observed data stream
k: window size ( k <= len(nums) )
"""
res = []
q = deque()
for i in range(k):
num = nums[i]
while q and nums[q[-1]] <= num:
q.pop()
q.append(i)
res.append(nums[q[0]])
for i in range(k, len(nums)):
num = nums[i]
# adjust window
if q and q[0] <= i-k:
q.popleft()
while q and nums[q[-1]] <= num:
q.pop()
q.append(i)
res.append(nums[q[0]])
return res
# test
nums = [1, 1, 2, 3, -1, 3, 2]
k = 2 # => [1, 2, 3, 3, 3, 3]
print(sliding_window_maximum(nums, k))
def sliding_window_minimum(nums, k):
nums_neg = [-num for num in nums]
neg_res = sliding_window_maximum(nums_neg, k)
return [-num for num in neg_res]
nums = [1, 1, 2, 3, -1, 3, 2]
k = 2 # => [1, 1, 2, -1, -1, 2]
print(sliding_window_minimum(nums, k))
The outputs are the following
[1, 2, 3, 3, 3, 3]
[1, 1, 2, -1, -1, 2]
$O(1)$ insertion, maximum, average and mode
from collections import defaultdict
class DataStream():
def __init__(self):
self.data = []
# average
self.n = 0
self.curr_sum = 0
# mode
self.freq_map = defaultdict(int)
self.max_freq = 0
self.mode = None
self.mode_count = defaultdict(int)
# maximum
self.curr_max = -float("inf")
# minimum
self.curr_min = float("inf")
def push(self, x):
self.data.append(x)
# update maximum and minimum
self.curr_max = max(self.curr_max, x)
self.curr_min = min(self.curr_min, x)
# update average
self.curr_sum += x
self.n += 1
# update mode
self.freq_map[x] += 1
freq = self.freq_map[x]
self.mode_count[freq] += 1
if freq > self.max_freq:
self.max_freq = freq
self.mode = x
elif freq == self.max_freq:
self.mode = x
def get_mode(self):
return self.mode
def get_average(self):
return self.curr_sum / self.n
def get_maximum(self):
return self.curr_max
def get_minimum(self):
return self.curr_min
track = DataStream()
stream = [3, 3, 1, 2, 2, 2, 4]
for i in range(len(stream)):
num = stream[i]
track.push(num)
print(
track.get_mode(), track.get_average(), track.get_maximum(), track.get_minimum()
)
The outputs:
3 3.0 3 3
3 3.0 3 3
3 2.3333333333333335 3 1
3 2.25 3 1
2 2.2 3 1
2 2.1666666666666665 3 1
2 2.4285714285714284 4 1
$O(1)$ popping, update max, average, mode
from collections import defaultdict
class DataStream():
def __init__(self):
# all data
self.data = []
# max (observation, max is nondecreasing, keep track of a
# running max with history for each point in self.data)
self.max_stack = []
# average
self.n = 0
self.curr_sum = 0
# mode (keep track of a frequency count)
self.freq = defaultdict(int)
self.mode_stack = []
self.max_freq = 0
def push(self, x):
self.data.append(x)
# update average
self.curr_sum += x
self.n += 1
# update max
if not self.max_stack or self.max_stack[-1] <= x:
self.max_stack.append(x)
else:
# repeat the current maximum
self.max_stack.append(self.max_stack[-1])
# update mode
self.freq[x] += 1
freq_x = self.freq[x]
if freq_x >= self.max_freq:
self.max_freq = freq_x
self.mode_stack.append(x)
else:
self.mode_stack.append(self.mode_stack[-1])
def pop(self):
val = self.data.pop()
# update average
self.curr_sum -= val
self.n -= 1
# update max
self.max_stack.pop()
# update mode
self.mode_stack.pop()
self.freq[val] -= 1
return val
def get_average(self):
return round(self.curr_sum / self.n, 2)
def get_max(self):
if not self.max_stack:
return None
return self.max_stack[-1]
def get_mode(self):
if not self.mode_stack:
return None
return self.mode_stack[-1]
track = DataStream()
track.push(3)
print(track.get_average(), track.get_max(), track.get_mode())
track.push(3)
print(track.get_average(), track.get_max(), track.get_mode())
track.push(2)
print(track.get_average(), track.get_max(), track.get_mode())
track.pop()
print(track.get_average(), track.get_max(), track.get_mode())
track.push(4)
print(track.get_average(), track.get_max(), track.get_mode())
track.push(4)
print(track.get_average(), track.get_max(), track.get_mode())
track.pop()
print(track.get_average(), track.get_max(), track.get_mode())
track.pop()
print(track.get_average(), track.get_max(), track.get_mode())
Outputs:
3.0 3 3
3.0 3 3
2.67 3 3
3.0 3 3
3.33 4 3
3.5 4 4
3.33 4 3
3.0 3 3
$O(1)$ operations, last $k$ elements only
from collections import defaultdict, deque
class DataStream():
def __init__(self, k):
# size of window
self.k = k
# data is moving window
self.data = deque()
# avearge
self.sum = 0
# max
self.max_data = deque()
def push(self, x):
# need to update deque if len is going to be larger than k
if len(self.data) == k:
# update data
old = self.data.popleft()
# update current sum
self.sum -= old
# if old popped was max, update
if self.max_data[0] == old:
self.max_data.popleft()
# append new value
self.data.append(x)
# update sum
self.sum += x
# update max
while self.max_data and self.max_data[-1] < x:
self.max_data.pop()
self.max_data.append(x)
def get_average(self):
if len(self.data) < self.k:
return round(self.sum / len(self.data), 2)
return round(self.sum / self.k, 2)
def get_max(self):
return round(self.max_data[0], 2)
# testing
data_stream = [1, 2, 3, 3, 2, -1, 2]
k = 3
track = DataStream(k=3)
track.push(data_stream[0])
track.push(data_stream[1])
track.push(data_stream[2])# starts to be more than windowtrack.push(data_stream[3])
print(track.data, track.get_average(), track.get_max())
track.push(data_stream[4])
print(track.data)
track.push(data_stream[5])
print(track.data, track.get_average(), track.get_max())
Outputs:
deque([2, 3, 3]) 2.67 3
deque([3, 3, 2])
deque([3, 2, -1]) 1.33 3
We implement the get_mode functionality separately with a sliding window approach.
import heapq
from collections import deque, defaultdict
class ModeTracker():
def __init__(self, k):
self.k = k
self.window = deque()
self.freq_map = defaultdict(int)
self.max_heap = []
self.removed_elements = defaultdict(int)
def push(self, num):
if len(self.window) == self.k:
old = self.window.popleft()
self._remove_element(old)
self.window.append(num)
self._add_element(num)
def _add_element(self, num):
self.freq_map[num] += 1
heapq.heappush(self.max_heap, (-self.freq_map[num], num))
def _remove_element(self, num):
self.freq_map[num] -= 1
if self.freq_map[num] == 0:
del self.freq_map[num]
self.removed_elements[num] += 1
def get_mode(self):
while self.max_heap:
freq, num = self.max_heap[0]
freq = -freq
if self.removed_elements[num] > 0:
heapq.heappop(self.max_heap)
self.removed_elements[num] -= 1
else:
return num
return None
track = ModeTracker(k=3)
nums = [1, 2, 2, 3, 3, 4, 2, 2, 1, 1, 2, 2, 3]
for num in nums:
track.push(num)
print(list(track.window), track.get_mode())
Outputs:
[1] 1
[1, 2] 1
[1, 2, 2] 2
[2, 2, 3] 2
[2, 3, 3] 3
[3, 3, 4] 3
[3, 4, 2] 2
[4, 2, 2] 2
[2, 2, 1] 2
[2, 1, 1] 1
[1, 1, 2] 1
[1, 2, 2] 1
[2, 2, 3] 2
Other related questions
- $O(1)$ median (link)
- $O(1)$ median query supposing we only care the most recent $k$
- Top-$k$ elements