Shooting the Gap Statistic with Map Reduce

Finding the right K in Kmeans, n in LSA, n in LSI, or whichever grouping algorithm you use is fuzzy, especially with text. However, Tibshirani,Walther and Hastie developed a superb algorithm to deal with this, much better than finding an inflection point in your data as it handles abnormalities and ensures a proper distribution.

An easier understanding of the algorithm can be found through a blog post from the Data Science Lab.

This algorithm is intensive but,luckily, map-reduce exists. Splitting the procedure among a large number of machines is most helpful. Python and Scala offer map reduce and concurrent frameworks that can handle fairly significant amounts of data either directly or via a framework such as Celery.

Scala’s callback mechanism is a personal favorite of mine although the tools for experimentation are not as readily available as they are with Python. Tools such as gensim and sci-kit learn are readily available and easy to test in Python.

Using map reduce for this algorithm is almost a necessity for anything more than a small data set, especially for text data. However, unlike the Pham et. al algorithm, Tibshirani does not rely on the previous output at all. In this way it can be spread over a large number of machines to obtain a comparable result (use Pham for smaller datasets such as less than 30,000 text documents that fit on a single machine).

The following is an example of the fastest way I ran this algorithm using Python. Whereas a single threaded algorithm was running 20 tests in 10 minutes, the following test were running 10.4 tests per minute on a 3.5 ghz core i-7 with 15 gb of RAM.

The proper tradeoffs must be made between speed and memory.

Firstly, Python offers a multiprocessing version of map. The map function is applied using a multiprocessing pool which is easy to establish.


from multiprocessing import Pool
import psutil

pool=Pool(psutil.cpu_count(logical=True)

You will need to fiddle with an equation that works with Pool’s processors function. It may not be the best for your data set.

This pool can be used (please read the Data Science Lab article first) to find Wk, run multiple Kmeans and split up among other processes.

Since I am text mining, my Wk and distance functions now become the following. I will not be releasing my text based KMeans here, sorry but note that it involves using a special function for finding clusters based on the dissimilarity rating.

  
def Wk(X):
    if X.shape[0]>1:
        try:
            mul=float(1/float(2*X.shape[0]))*float(2*X.shape[0])
            mvect=scipy.mean(X,axis=0)[0].todense()
            mp=map(lambda x: float(mul*angulareDistance(x[0].todense(),mvect)),X)
            res=sum(mp)
        except:
            res=0
    elif X.shape[0] is 1:
        res=1
    else:
        res=0
    return res
def calculateQueuedCosines(x):
    d=[]
    for j in xrange(len(x[1])):
        d.append(cosine(x[0].todense(),x[1][j]))
    return d.index(max(d))

def performWK(X):
    '''
    This function performs WK for a map and requires an input matrix,predictions from KMeans, and a k value.
    0=>ref
    1=>preds
    2=>K
    '''
    preds=X[1]
    results=[]
    for currk in range(X[2]):
        set=None
        for i in range(X[0].shape[0]):
            if preds[i] == currk:
                if set is None:     
                    set=scipy.sparse.csr_matrix(X[0][i])
                else:
                    set=scipy.sparse.vstack((set,X[0][i]))
        res=0
        try:
            if set is not None:
                if set.shape[0]>1:
                    try:
                        mvect=set.sum(axis=0)/set.shape[0]
                        mp=map(lambda x: float(angulareDistance(x[0].todense(),mvect)),set)
                        res=sum(mp)
                    except:
                        res=0
                elif set.shape[0] is 1:
                    res=1
        except Exception,e:
            print str(e)
        results.append(res)
        gc.collect()
        del gc.garbage[:]
    return results

def performTest(X):
    '''
    For performing tests in separate processes.
    
    Requires a tuple with
    0=>ks
    1=>mtx
    '''
    kmn=KMeans()
    kmn.n_clusters=X[0]
    kmn.fit(X[1])
    preds=kmn.predict(X[1])
    return preds
	
def mapWKResults(X):
    return numpy.log(sum(numpy.nan_to_num(X)))

I then use these functions to split up intense processes among multiple cores. Keep in mind that this is looking to work with text data. The distance formula becomes the angular distance formual with cosines (1-2*acos(cos(veca,vecb))/pi).

  
def discoverCategories(self,file,refs=20,maxTime=600,ks=range(1,500),mx=20):
        '''
        A 1 to n estimator of the best categories using cosines to be used as  a backup.
        This is a sci-kit learn /python/scipy learning experience as well.
        
        Reference: https://datasciencelab.wordpress.com/2013/12/27/finding-the-k-in-k-means-clustering/
        '''
        pool=Pool(psutil.cpu_count(logical=False))
        vectorizer=CountVectorizer(analyzer='word', binary=False, decode_error='strict',dtype=numpy.int64, encoding='utf-8', input='content',lowercase=True, max_df=1.0, max_features=None, min_df=1,ngram_range=(1, 1), preprocessor=None, stop_words=None,strip_accents=None, token_pattern='(?u)\\b\\w\\w+\\b',tokenizer=None, vocabulary=None)
        logging.info("Starting KMN "+datetime.datetime.fromtimestamp(time.time()).strftime("%m/%d/%Y"))
        kmn=KMeans()
        lines=[]
        with open(file,'rb') as fp:
            lines=fp.read().split("\n")
            
        if len(lines)>0:
            mtx=vectorizer.fit_transform(lines)
            tfidf=TfidfTransformer(norm='l2',smooth_idf=True,sublinear_tf=False,use_idf=True)
            mtx=tfidf.fit_transform(mtx)
            del tfidf
            del vectorizer
            lines=[]
            del lines
            gc.collect()
            del gc.garbage[:]
            
            (ymin, xmin), (ymax, xmax)=self.bounding_box(mtx)
            Wks=[]
            Wkbs=[]
            sk=[]
            
            for k in ks:
                #get the overall WK
                print "Testing at Cluster "+str(k)
                tempk=[]
                kmn.n_clusters=k
                kmn.fit(mtx)
                preds=kmn.predict(mtx)
                sets=[]
                for currk in range(k):
                    set=None
                    for i in range(mtx.shape[0]):
                        if preds[i]==currk:
                            if set is None:     
                                set=scipy.sparse.csr_matrix(mtx[i])
                            else:
                                set=scipy.sparse.vstack((set,mtx[i]))
                    if set.shape[0]>0:
                        sets.append(set)
                res=pool.map(Wk,sets)
                Wks.append(numpy.log(sum(res)))

                del set
                gc.collect()
                del gc.garbage[:]
                
                BWkbs=[]
                #generate individual sets and calculate Gap(k)
                mres=[]
                for i in range(refs):
                    print "Setting Up Test "+str(i)
                    ref=scipy.sparse.rand(xmax,ymax,format='csr')             
                    mres.append(ref)
                    if len(mres) is mx or (i+1 is refs and len(mres)>0):
                        print "Performing Async Tests" #if we were to create our own distributed framework with pyHFS,asyncore, and a db
                        preds=pool.map(performTest,[[k,x] for x in mres])
                        res=pool.map(performWK,[[mres[j],preds[j],k] for j in range(len(mres))])
                        BWkbs.extend(pool.map(mapWKResults,res))
                        mres=[]
                        gc.collect()
                        del gc.garbage[:]
                    del ref
                    gc.collect()
                    del gc.garbage[:]
                s=sum(BWkbs)/refs
                Wkbs.append(s)
                sk.append(numpy.sqrt(sum((numpy.asarray(BWkbs)-s)**2)/refs))
        
        sk=numpy.asarray(sk)*numpy.sqrt(1+1/refs)
        return(ks,Wks,Wkbs,sk)

The results take a while but there is a basic rule based on non-zero values in a matrix for text data. This can be used to determine the maximum amount of clusters to be run, ensuring some buffer room.

  
  import numpy
  (mtx.shape[0]*mtx.shape[1])/numpy.count_nonzero(mtx)

The code splits up the most intense function into map and reduce tasks. Map reduce could be use in each case (please see the sum functions attached to the final result).

  
preds=pool.map(performTest,[[k,x] for x in mres])
res=pool.map(performWK,[[mres[j],preds[j],k] for j in range(len(mres))])
BWkbs.extend(pool.map(mapWKResults,res))

These are the Kmeans functions, reduction across all vectors, and a slight performance boost by performing the reduction on the result to obtain our Wk value for the overall equation.

Ideally, this process can be used to find the best number of categories or groups in a variety of algorithms such as fuzzy clustering with membership matrices or LSA.