About Streaming algorithm: Flajolet Martin Algorithm

Published on 2024-11-10

Could be Interesting! Big Data Data Science

I love streaming algorithms - they make you think! Since they can process data in (near) real-time, they are ideal for applications where immediate feedback is needed. Most of today's recommendation or machine learning systems are built on batch process, however.

Streaming algorithms can process data in (near) real-time and they are ideal for applications where immediate feedback is needed, such as fraud detection, recommendation systems, monitoring systems, and real-time analytics.

A bit about Batch Processing Most recommendation models are trained or updated using historical data, typically offline. What that means is, first, we start off with 1. Data Collection (for example: in an e-commerce system, the very first step is collecting consumer spending behaviors. There are three metrics, by the way, RFM - Recency, Frequency and Monetary Value, you get the gist!). I am just grouping this as one step but it is not only data collection but also data cleanse, aggregation, transformation, etc. 2. Once data is ready, the next step is Model Training with machine learning algorithms (for example: collaborative filtering, matrix factorization, content-based filter) based on the historical data from #1 to generate a model. 3. After the model is trained, predictions or recommendations are made for users or items. This is called offline prediction. For example, recommendations based on similar items, similar categories, items bought by profile similar as users and more to persuade the users to buy more. 4. And of course, there is a re-training process where model is re-trained periodically (for example: weekly or monthly) with new data to keep the recommendations relevant. In some cases, historical data may be purged or archived too.

In contrast to Batch Processing, the machine learning model built with streaming algorithms or Online Prediction provides approximate solutions (or recommendations) to the users immediately.

One of the trade-offs of streaming algorithms is, yes - you are right, approximation or estimation.

Onto Flajolet Martin Algorithm Previously, we talked about Bloom filters to estimate whether a username exists in the system or not. Bloom filters is also one of the streaming algorithms. Today, let's tackle Flajolet Martin Algorithm.

First, why is Flajolet Martin Algorithm useful? It utilizes probabilistic algorithm to estimate the number/count of distinct elements in a large dataset. It is particularly useful in scenarios where we cannot store or keep track of every individual element due to memory or storage limitations, such as in streaming data or big data applications.

Pseudo Code of FM algorithm: 1. Selecting a hash function h so each element in the set is mapped to a string to at least log n bits. 2. For each element x, r(x) = length of trailing zeroes in h(x) 3. R = max(r(x)) 4. number of Distinct elements = 2^R

The hash function follows below pattern:

h(x) = ax + b mod c
where a is an odd numbers and c is the capping limit of hash range (2^k). When a is even and b is odd, the hash function always returns odd numbers which causes the trailing zeros to be 0 all the time. When a is even and b is even, the hash function could return the same value for two different inputs of x. Hence, we should use odd numbers for a. c is a constant value and this value should be wisely chosen depending on the dataset.

Now, FM algorithm is sensitive to the parameters of hash function and thus, the results from FM algorithm can vary significantly depending on the hash function values. Depending on the memory (log n bits) available, we should determine the number of hash functions to utilize to optimize the accuracy of FM algorithm. The idea is the more hash functions we use, the better the estimation. After which, we could use the mean of the results from number of hash functions as an approximation for the count of distinct value. However, averaging would be susceptible to outliers.Therefore, the median of mean approach or Means by Bucketing to average the estimates resulting from all hash functions would be a better approach. Got it?

Summary In summary, we can do the followings to improve the accuracy of the algorithm. For instance, if we have 100 memory allocation slots, we can 1. Use 100 hash function to get 100 highest trailing zeros from each function. 2. Then, we can use Means by Bucketing (100 highest trailing zeros in memory are split into 10 buckets with 10 elements in each bucket, and then we compute the means from each bucket, resulting in 10 means) 3. Median of means (10 means are sorted into an ascending order and the median of means is picked as the approximate value for the count of distinct elements)

This post won't be complete without the code, so, here we go:

def computeDistinctValues(element, returnResult = True):
    # to illustrate the memory constraint, let's assume memory is an array of 100 elements

    for i in range(len(memory)): # loop through 100 hash functions 
        temp=memory.popleft() # store the previous value from hash function (loop 100 times) to compare
        hashValue=((((2 * i) + 1) * element) + i) % 2 ** 64 # h(x)=ax+b mod c <== generate a and b based on i values and c (2^64) is the length of the hash range 
        binary=bin(hashValue)[2:]
        numTrailingZero=len(binary)-len(binary.strip('0'))
        if temp is not None:
            if (temp < numTrailingZero): # see if previous value is greater than current value (with that, we get the highest trailing zeros)
                memory.append(numTrailingZero) # if so, replace it by appending into the deque
            else:
                memory.append(temp) # else, re-input the previous value to the deque
        else:
            memory.append(numTrailingZero) # initializaton 

    if returnResult: # when the stream is requesting the current result
        result = 0
        means=[] # initialization of arrays to store the means of 10 buckets of numTrailingZero from memory - 1 bucket to hold 10 numTrailingZeros
        for i in range(len(memory): # memory stores the r value from 100 hash functions where r is the max number of zeros at tail
            result += memory[i]
            if (i % 10 == 9): # for every 10 hash function 
                means.append(result / 10) # get the mean 
                result = 0 # reset the result to zero
        means.sort() # sort the mean of 10 hash function by ascending order
        result = (means[4] + means[5]) / 2 # median of means         
        result = 2 ** result # 2^r 
        return result
    else: # no need to return a result
        pass

This is all fine and rosy but what if we want to approximate or estimate the mode from streaming data and not the count of distinct elements. For example, if we were to find the most frequently watched movie thus far from the data stream, how would we do it?