Median of integers stream

Tags: , , , ,

Median of integers stream

We solve two problems which involved streams, first was to find first non repeated character in stream and second was LRU cache. Let’s discuss another problem which is to find median of integers stream. Problem statement is like this: Given continuous stream of integers, find median of integers stream received till given point of time. Median can be asked at multiple times.

To understand problem better, ask yourself, what is a median?

The median is the value separating the higher half from the lower half of a data sample. For a data set, it may be thought of as the “middle” value.

Wikipedia

For example, in the data set {1, 3, 3, 6, 7, 8, 9}, the median is 6, the fourth largest, and also the fourth smallest, number in the sample.

Median of sorted array of integers is element at middle index of array if size of array is odd and average of elements at mid and mid +1 elements if size of array is even.

Now, that we understood the definition of median. let’s go back to our problem and take an example to understand it further. Problem is that we get integers from a stream, one by one and at any given point of time, we have to return median of set of integers received till now. 
First, stream throws 12, then 7 and then 8. What will be the median now? It will be 8, because if we arrange 12,7,8 in sorted order, 8 is element at middle. What if we get 11 next? Well, now sorted order looks like 7,8,11,12. As size of set is even, we take average of mid and mid+1 element which is 9.5.

Median of integers stream : thoughts

What will be the brute force solution? As integers are processed from stream, store them in an array. Can we store element randomly? If yes, to find median, we have to sort array every time. Complexity of this method to find median in stream of integers will be O(n log n) dominated by the sorting algorithm.
How about we insert element in array in sorted order. This will make complexity of processing integer from stream O(n2), as we have to move n elements to right in worst case.
Another underlying problem in using array here is that we do not know how many integers will come out of stream, so it will be very difficult to pre-allocate memory for it. Linked list can solve that problem, however, it does not reduce complexity of processing, at the same increases the complexity of finding median to O(n) from O(1).

Think of this, do we need completely sorted set of  integers before we can calculate the median? Actually, we need kth smallest element of array if size of set is odd and average of kth and k+1th element if size of set is even, k will be n/2. 

However, we do not have pre-processed array with us. What is the property of the median? Median is greater than all elements on left of it and less than all elements on the right side of it, where the number of elements on both groups is equal or differs by 1.

Median of integers stream : Heaps

How about we split the incoming integers into two halves. Whenever median is asked, we can get the maximum of one half and return it as median, if the size of two halves differ by 1 or return of average of the max of one half and minimum of other halves if the size of two halves is equal.

What data structure is best to find min and max in constant time? Heap it is. In this case, we will need two heaps, one max and another min heap. Max heap will store all the elements on the left side of median and min heap will store all the elements on the right side of the median.

How to balance the size difference between the two heaps? Insert new processed integer into the max heap,  if the size of the max heap is 2 more than min heap, extract the maximum element from the max heap and put it in min heap.

Also, maintain the property that all the elements on the max heap should be less than elements on the min heap. So, whenever the root of the max heap greater than the root of min heap, it should be removed from the max heap and added to the min heap.

Let’s take an example and understand the method first and the make concrete algorithm out of it. We have the first number from the stream as 12, what should we do? We decided to put every number on the max heap to start with.

median of integer stream
Add the integer to max heap as both heaps are empty at this point of time

Now, comes the integer 7. First of all, we add a new integer to the max heap. This will create a difference in size of the min and max heap more than one. In that case, we will take out the max from the max heap and put it into the min heap.

median of integers stream
7 is added to the max heap, which makes size difference of more than 1.
So, the root of the max heap (12) is moved to min heap

Next integer is 18, what happens now. We add into the max heap. Difference between sizes is not more than 1, However, the root of the max heap (18) is greater than the root of min heap (12). In this case, too, we take the root of the max heap and move it to the min heap. At this point, if the median of integers stream is asked, return the root of min heap which is 12.

18 is added to max heap, however now the root of max heap is more than the root of the min heap, so it should be removed from the max heap
median of stream
18 is removed from the max heap and added to the min heap.

Come the integer 10, it goes into the max heap, does not create any size difference and the root of the max heap is less than the root of the min heap. At this point, the median of the stream of integers till now is 11 ((10+12)/2).

median of stream of integers
10 is added to the max heap.

.New integer from the stream is 11. As usual, add the new integer to the max heap, size difference remains less than 2 and 11 is less than the root of the min heap (12).
What should be the median now? At this point, the size of the max heap is more than the min heap, hence we will return the root of the max heap (11)

median of integer stream
11 is added to max heap

Median of a stream of integers: Algorithm

  1. Process integer from the stream and add it to the max heap.
  2. If the root of max heap greater than the root of the min heap:
    1. Delete the root from the max heap
    2. Add removed integer from the max heap to the min heap
  3. If the size difference between the two heaps is more than 2:
    1. Remove the root of the heap which has more elements.
    2. Add removed node to another heap.
  4. To calculate the median:
    1. If the size of both heaps equal, return average of their roots.
    2. Else, return the root of the heap with more elements.

Median of integers stream : Implementation

Implementation involves priority queue in Java, refer to Stack Overflow question on how to use priority queue as a max heap.

package com.company;

import java.util.Collections;
import java.util.PriorityQueue;

/**
 * Created by sangar on 18.10.18.
 */
public class MedianOfIntegerStream {
    private PriorityQueue maxHeap;
    private PriorityQueue minHeap;

    public MedianOfIntegerStream(){
        maxHeap = new PriorityQueue(Collections.reverseOrder());
        minHeap = new PriorityQueue();
    }

    public double getMedian(){
        if(maxHeap.size() == minHeap.size())
            return (double)((int)maxHeap.peek() + (int)minHeap.peek())/2;

        if(maxHeap.size() > minHeap.size())
            return (double)(int)maxHeap.peek();

        return (double)(int)minHeap.peek();

    }

    public void processInteger(int data){
        maxHeap.add(data);

        if(maxHeap.size() - minHeap.size() > 1
                || ( minHeap.size() > 0 
				&& (int)maxHeap.peek() > (int)minHeap.peek())){
            minHeap.add(maxHeap.poll());
        }

        if(minHeap.size() - maxHeap.size() > 1){
            maxHeap.add(minHeap.poll());
        }
    }
}

Test cases for median in integers stream

package test;

import com.company.MedianOfIntegerStream;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;

/**
 * Created by sangar on 23.9.18.
 */
public class MedianOfIntegerStreamTest {

    MedianOfIntegerStream tester = new MedianOfIntegerStream();

    @Test
    public void baseTest() {

        tester.processInteger(12);
        tester.processInteger(7);

        assertEquals(9.5, tester.getMedian() );
    }

    @Test
    public void maxHeapWithMoreElementsTest() {

        tester.processInteger(12);
        tester.processInteger(7);
        tester.processInteger(9);

        assertEquals(9, tester.getMedian() );
    }

    @Test
    public void minHeapWithMoreElementsTest() {

        tester.processInteger(12);
        tester.processInteger(7);
        tester.processInteger(9);
        tester.processInteger(13);
        tester.processInteger(15);

        assertEquals(12, tester.getMedian() );
    }

    @Test
    public void minHeapSizeMoreThanTwoDifferenceTest() {

        tester.processInteger(12);
        tester.processInteger(7);
        tester.processInteger(9);
        tester.processInteger(13);
        tester.processInteger(15);
        tester.processInteger(17);
        tester.processInteger(19);

        assertEquals(13, tester.getMedian() );
    }

    @Test
    public void maxHeapGetsTheElementTest() {

        tester.processInteger(12);
        tester.processInteger(7);
        tester.processInteger(9);
        tester.processInteger(13);
        tester.processInteger(15);
        tester.processInteger(17);
        tester.processInteger(5);
        assertEquals(12, tester.getMedian() );
    }
}

Complexity of processing is O(log n) to insert an element into any heap. However, fetching median in stream of integers at any given time is O(1).

Please share if there is something wrong or missing. Please signup if you want to receive curated interview material for your preparation.