Module nmtf.modules.nmtf_utils

Non-negative matrix and tensor factorization utility functions

Expand source code
"""Non-negative matrix and tensor factorization utility functions

"""

# Author: Paul Fogel

# License: MIT
# Jan 4, '20

from tkinter import *
from tkinter import ttk
from tqdm import tqdm
import math
from scipy.stats import hypergeom
from scipy.optimize import nnls
import numpy as np

EPSILON = np.finfo(np.float32).eps

class StatusBox:
    def __init__(self):
        self.root = Tk()
        self.root.title('irMF status - Python kernel')
        self.root.minsize(width=230, height=60)
        self.frame = Frame(self.root, borderwidth=6)
        self.frame.pack()
        self.var = StringVar()
        self.status = Label(self.frame, textvariable=self.var, width=60, height=1)
        self.status.pack(fill=NONE, padx=6, pady=6)
        self.pbar = ttk.Progressbar(self.frame, orient=HORIZONTAL, max=100, mode='determinate')
        self.pbar.pack(fill=NONE, padx=6, pady=6)
        Button(self.frame, text='Cancel', command=self.close_dialog).pack(fill=NONE, padx=6, pady=6)
        self.cancel_pressed = False
        self.n_steps = 0

    def close_dialog(self):
        self.cancel_pressed = True

    def update_bar(self, delay=1, step=1):
        self.n_steps += step
        self.pbar.step(step)
        self.pbar.after(delay, lambda: self.root.quit())
        self.root.mainloop()

    def init_bar(self, delay=1):
        self.update_bar(delay=1, step=100 - self.n_steps)
        self.n_steps = 0

    def update_status(self, delay=1, status=''):
        self.var.set(status)
        self.status.after(delay, lambda: self.root.quit())
        self.root.mainloop()

    def close(self):
        self.root.destroy()

    def myPrint(self, status=''):
        print(status)

class StatusBoxTqdm:
    def __init__(self, verbose=0):
        self.LogIter = verbose
        if self.LogIter == 0:
            self.pbar = tqdm(total=100)

        self.cancel_pressed = False

    def update_bar(self, delay=0, step=1):
        if self.LogIter == 0:
            self.pbar.update(n=step)

    def init_bar(self, delay=0):
        if self.LogIter == 0:
            self.pbar.n = 0

    def update_status(self, delay=0, status=''):
        if self.LogIter == 0:
            self.pbar.set_description(status, refresh=False)
            self.pbar.refresh()

    def close(self):
        if self.LogIter == 0:
            self.pbar.clear()
            self.pbar.close()

    def myPrint(self, status=''):
       if self.LogIter == 1:
            print(status, end='\n')

def NMFDet(Mt, Mw, NMFExactDet):
    """Volume occupied by Left and Right factoring vectors

    Input:
        Mt: Left hand matrix
        Mw: Right hand matrix
        NMFExactDet if = 0 compute an approximate determinant in reduced space n x n or p x p
        through random sampling in the largest dimension
    Output:
        detXcells: determinant

    Reference
    ---------
    
    P. Fogel et al (2016) Applications of a Novel Clustering Approach Using Non-Negative Matrix Factorization to Environmental
        Research in Public Health Int. J. Environ. Res. Public Health 2016, 13, 509; doi:10.3390/ijerph13050509

    """
     
    n, nc = Mt.shape
    p, nc = Mw.shape
    nxp = n * p
    if (NMFExactDet > 0) | (n == p):
        Xcells = np.zeros((nxp, nc))
        for k in range(0, nc):
            Xcells[:, k] = np.reshape(np.reshape(Mt[:, k], (n, 1)) @ np.reshape(Mw[:, k], (1, p)), nxp)
            norm_k = np.linalg.norm(Xcells[:, k])
            if norm_k > 0 :
                Xcells[:, k] = Xcells[:, k] / norm_k
            else:
                Xcells[:, k] = 0
    else:
        if n > p:
            Xcells = np.zeros((p ** 2, nc))
            ID = np.arange(n)
            np.random.shuffle(ID)
            ID = ID[0:p]
            for k in range(0, nc):
                Xcells[:, k] = np.reshape(np.reshape(Mt[ID, k], (p, 1)) @ np.reshape(Mw[:, k], (1, p)), p ** 2)
                norm_k = np.linalg.norm(Xcells[:, k])
                if norm_k > 0 :
                    Xcells[:, k] = Xcells[:, k] / norm_k
                else:
                    Xcells[:, k] = 0
        else:
            Xcells = np.zeros((n ** 2, nc))
            ID = np.arange(p)
            np.random.shuffle(ID)
            ID = ID[0:n]
            for k in range(0, nc):
                Xcells[:, k] = np.reshape(np.reshape(Mt[:, k], (n, 1)) @ np.reshape(Mw[ID, k], (1, n)), n ** 2)
                norm_k = np.linalg.norm(Xcells[:, k])
                if norm_k > 0 :
                    Xcells[:, k] = Xcells[:, k] / norm_k
                else:
                    Xcells[:, k] = 0

    detXcells = np.linalg.det(Xcells.T @ Xcells)
    return detXcells

def NMFGetConvexScores(Mt, Mw, Mh, flag, AddMessage):
    """Rescale scores to sum up to 1 (used with deconvolution)
    Input:
        Mt: Left factoring matrix
        Mw: Right factoring matrix
        flag:  Current value
    Output:

       Mt: Left factoring matrix
        Mw: Right factoring matrix
        flag: += 1: Negative weights found
    """
    ErrMessage = ''
    cancel_pressed = 0

    n, nc = Mt.shape
    n_Mh = Mh.shape[0]
    try:
        Malpha = np.linalg.inv(Mt.T @ Mt) @ (Mt.T @ np.ones(n))
    except:
        Malpha = np.linalg.pinv(Mt.T @ Mt) @ (Mt.T @ np.ones(n))

    if np.where(Malpha < 0)[0].size > 0:
        flag += 1
        Malpha = nnls(Mt, np.ones(n))[0]

    n_zeroed = 0
    for k in range(0, nc):
        Mt[:, k] *= Malpha[k]
        if n_Mh > 0:
            Mh[:, k] *= Malpha[k]
        if Malpha[k] > 0:
            Mw[:, k] /= Malpha[k]
        else:
            n_zeroed += 1
        
    if n_zeroed > 0:
        AddMessage.insert(len(AddMessage), 'Ncomp=' + str(nc) + ': ' + str(n_zeroed) + ' components were zeroed')

    # Goodness of fit
    R2 = 1 - np.linalg.norm(np.sum(Mt.T, axis=0).T - np.ones(n)) ** 2 / n
    AddMessage.insert(len(AddMessage), 'Ncomp=' + str(nc) + ': Goodness of mixture fit = ' + str(round(R2, 2)))
    # AddMessage.insert(len(AddMessage), 'Ncomp=' + str(nc) + ': Goodness of mixture fit before adjustement = ' + str(round(R2, 2)))

    # for i in range(0, n):
    #     Mt[i, :] /= np.sum(Mt[i, :])

    return [Mt, Mw, Mh, flag, AddMessage, ErrMessage, cancel_pressed]

def percentile_exc(a, q):
    """Percentile, exclusive

    Input:
        a: Matrix
        q: Percentile
    Output:
        Percentile
    """
    return np.percentile(np.concatenate((np.array([np.min(a)]), a.flatten(), np.array([np.max(a)]))), q)

def RobustMax(V0, AddMessage, myStatusBox):
    """Robust max of column vectors

    For each column:
         = weighted mean of column elements larger than 95% percentile
        for each row, weight = specificity of the column value wrt other columns
    Input:
        V0: column vectors
    Output: Robust max by column

    Reference
    ---------

    P. Fogel et al (2016) Applications of a Novel Clustering Approach Using Non-Negative Matrix Factorization to Environmental
        Research in Public Health Int. J. Environ. Res. Public Health 2016, 13, 509; doi:10.3390/ijerph13050509

    """
    ErrMessage = ''
    cancel_pressed = 0

    V = V0.copy()
    n, nc = V.shape
    if nc > 1:
        ncI = 1 / nc
        lnncI = 1 / math.log(nc)

    ind = max(math.ceil(n * .05) - 1, min(n - 1, 2))
    Scale = np.max(V, axis=0)
    for k in range(0, nc):
        V[:, k] = V[:, k] / Scale[k]

    RobMax = np.max(V, axis=0)
    RobMax0 = 1e+99 * np.ones(nc)
    iIter = 0
    maxIterations = 100
    pbar_step = 100 / maxIterations
    myStatusBox.init_bar(delay=1)

    while ((np.linalg.norm(RobMax - RobMax0) / np.linalg.norm(RobMax)) ** 2 > 1e-6) & (iIter < maxIterations):
        for k in range(0, nc):
            V = V[np.argsort(-V[:, k]), :]
            if nc > 1:
                den = np.repeat(np.sum(V, axis=1), nc).reshape((n, nc))
                den[den == 0] = 1.e-10
                Prob = V / den
                Prob[Prob == 0] = 1.e-10
                Specificity = (np.ones(n) + np.sum(Prob * np.log(Prob), axis=1) * lnncI)
                Specificity[Prob[:, k] < ncI] = 0
            else:
                Specificity = np.ones(n)

            Specificity[ind:n] = 0
            RobMax0[k] = RobMax[k]
            RobMax[k] = np.sum(V[:, k] * Specificity) / np.sum(Specificity)
            V[V[:, k] > RobMax[k], k] = RobMax[k]

        myStatusBox.update_bar(delay=1, step=pbar_step)
        if myStatusBox.cancel_pressed:
            cancel_pressed = 1
            return RobMax * Scale, AddMessage, ErrMessage, cancel_pressed

        iIter += 1

    if iIter == maxIterations:
        AddMessage.insert(len(AddMessage),
                          'Warning: Max iterations reached while calculating robust max (N = ' + str(n) + ').')

    return [RobMax * Scale, AddMessage, ErrMessage, cancel_pressed]

def Leverage(V, NMFUseRobustLeverage, AddMessage, myStatusBox):
    """Calculate leverages

    Input:
        V: Input column vectors
        NMFUseRobustLeverage: Estimate robust through columns of V
    Output:
        Vn: Leveraged column vectors
    
    Reference
    ---------
    
    P. Fogel et al (2016) Applications of a Novel Clustering Approach Using Non-Negative Matrix Factorization to Environmental
        Research in Public Health Int. J. Environ. Res. Public Health 2016, 13, 509; doi:10.3390/ijerph13050509

    """

    ErrMessage = ''
    cancel_pressed = 0

    n, nc = V.shape
    Vn = np.zeros((n, nc))
    Vr = np.zeros((n, nc))
    if NMFUseRobustLeverage > 0:
        MaxV, AddMessage, ErrMessage, cancel_pressed = RobustMax(V, AddMessage, myStatusBox)
        if cancel_pressed == 1:
            return Vn, AddMessage, ErrMessage, cancel_pressed
    else:
        MaxV = np.max(V, axis=0)

    pbar_step = 100 / nc
    myStatusBox.init_bar(delay=1)
    for k in range(0, nc):
        Vr[V[:, k] > 0, k] = 1
        Vn[:, k] = MaxV[k] - V[:, k]
        Vn[Vn[:, k] < 0, k] = 0
        Vn[:, k] = Vn[:, k] ** 2
        for k2 in range(0, nc):
            if k2 != k:
                Vn[:, k] = Vn[:, k] + V[:, k2] ** 2

        Status = 'Leverage: Comp ' + str(k+1)
        myStatusBox.update_status(delay=1, status=Status)
        myStatusBox.update_bar(delay=1, step=pbar_step)
        if myStatusBox.cancel_pressed:
            cancel_pressed = 1
            return Vn, AddMessage, ErrMessage, cancel_pressed

    Vn = 10 ** (-Vn / (2 * np.mean(Vn))) * Vr
    return [Vn, AddMessage, ErrMessage, cancel_pressed]

def BuildClusters(Mt, Mw, Mb, MtPct, MwPct, NBlocks, BlkSize, NMFCalculateLeverage, NMFUseRobustLeverage, NMFAlgo,
                  NMFRobustClusterByStability, CellPlotOrderedClusters, AddMessage, myStatusBox):
    """Builder clusters from leverages

    """
    NBlocks = int(NBlocks)
    myStatusBox.update_status(delay=1, status='Build clusters...')
    ErrMessage = ''
    cancel_pressed = 0
    n, nc = np.shape(Mt)
    p = np.shape(Mw)[0]
    if NMFAlgo >= 5:
        BlockClust = np.zeros(NBlocks)
    else:
        BlockClust = np.array([])
        Mbn = np.array([])

    RCt = np.zeros(n)
    RCw = np.zeros(p)
    NCt = np.zeros(nc)
    NCw = np.zeros(NBlocks * nc)
    RowClust = np.zeros(n)
    ColClust = np.zeros(p)
    ilast = 0
    jlast = 0

    if NMFCalculateLeverage == 1:
        myStatusBox.update_status(delay=1, status="Leverages - Left components...")
        Mtn, AddMessage, ErrMessage, cancel_pressed = Leverage(Mt, NMFUseRobustLeverage, AddMessage, myStatusBox)
        myStatusBox.update_status(delay=1, status="Leverages - Right components...")
        Mwn, AddMessage, ErrMessage, cancel_pressed = Leverage(Mw, NMFUseRobustLeverage, AddMessage, myStatusBox)
        if NMFAlgo >= 5:
            myStatusBox.update_status(delay=1, status="Leverages - Block components...")
            Mbn, AddMessage, ErrMessage, cancel_pressed = Leverage(Mb, NMFUseRobustLeverage, AddMessage, myStatusBox)
    else:
        Mtn = Mt
        Mwn = Mw
        if NMFAlgo >= 5:
            Mbn = Mb

    if NMFAlgo >= 5:
        for iBlock in range(0, NBlocks):
            if nc > 1:
                BlockClust[iBlock] = np.argmax(Mbn[iBlock, :]) + 1
            else:
                BlockClust[iBlock] = 1

    for i in range(0, n):
        if nc > 1:
            if (isinstance(MtPct, np.ndarray)) & (NMFRobustClusterByStability > 0):
                RowClust[i] = np.argmax(MtPct[i, :]) + 1
            else:
                RowClust[i] = np.argmax(Mtn[i, :]) + 1
        else:
            RowClust[i] = 1

    for j in range(0, p):
        if nc > 1:
            if (isinstance(MwPct, np.ndarray)) & (NMFRobustClusterByStability > 0):
                ColClust[j] = np.argmax(MwPct[j, :]) + 1
            else:
                ColClust[j] = np.argmax(Mwn[j, :]) + 1
        else:
            ColClust[j] = 1

    if (CellPlotOrderedClusters == 1) & (nc >= 3):
        MtS = np.zeros(n)
        MwS = np.zeros(p)
        for i in range(0, n):
            if RowClust[i] == 1:
                MtS[i] = sum(k * Mtn[i, k] for k in range(0, 2)) / \
                         max(sum(Mtn[i, k] for k in range(0, 2)), 1.e-10)
            elif RowClust[i] == nc:
                MtS[i] = sum(k * Mtn[i, k] for k in range(nc - 2, nc)) / \
                         max(sum(Mtn[i, k] for k in range(nc - 2, nc)), 1.e-10)
            else:
                MtS[i] = sum(k * Mtn[i, k] for k in range(int(RowClust[i] - 2), int(RowClust[i] + 1))) / \
                         max(sum(Mtn[i, k] for k in range(int(RowClust[i] - 2), int(RowClust[i] + 1))), 1.e-10)

        for j in range(0, p):
            if ColClust[j] == 1:
                MwS[j] = sum(k * Mwn[j, k] for k in range(0, 2)) / \
                         max(sum(Mwn[j, k] for k in range(0, 2)), 1.e-10)
            elif ColClust[j] == nc:
                MwS[j] = sum(k * Mwn[j, k] for k in range(nc - 2, nc)) / \
                         max(sum(Mwn[j, k] for k in range(nc - 2, nc)), 1.e-10)
            else:
                MwS[j] = sum(k * Mwn[j, k] for k in range(int(ColClust[j] - 2), int(ColClust[j] + 1))) / \
                         max(sum(Mwn[j, k] for k in range(int(ColClust[j] - 2), int(ColClust[j] + 1))), 1.e-10)

    for k in range(0, nc):
        Mindex1 = np.where(RowClust == k + 1)[0]
        if len(Mindex1) > 0:
            if len(Mindex1) == 1:
                Mindex = Mindex1,
            elif (nc == 2) & (k == 1):
                Mindex = Mindex1[np.argsort(Mtn[Mindex1, k])]
            elif (CellPlotOrderedClusters == 1) & (nc >= 3):
                Mindex = Mindex1[np.argsort(MtS[Mindex1])]
            else:
                Mindex = Mindex1[np.argsort(-Mtn[Mindex1, k])]

            RCt[ilast:len(Mindex) + ilast] = Mindex
            ilast += len(Mindex)

        NCt[k] = ilast

    for iBlock in range(0, NBlocks):
        if iBlock == 0:
            j1 = 0
            j2 = int(abs(BlkSize[iBlock]))
        else:
            j1 = j2
            j2 += int(abs(BlkSize[iBlock]))

        for k in range(0, nc):
            Mindex2 = np.where(ColClust[j1:j2] == k + 1)[0]
            if len(Mindex2) > 0:
                Mindex2 = Mindex2 + j1
                if len(Mindex2) == 1:
                    Mindex = Mindex2
                elif (nc == 2) & (k == 1):
                    Mindex = Mindex2[np.argsort(Mwn[Mindex2, k])]
                elif (CellPlotOrderedClusters == 1) & (nc >= 3):
                    Mindex = Mindex2[np.argsort(MwS[Mindex2])]
                else:
                    Mindex = Mindex2[np.argsort(-Mwn[Mindex2, k])]

                RCw[jlast:len(Mindex) + jlast] = Mindex
                jlast += len(Mindex)

            NCw[iBlock * nc + k] = jlast

    return [Mtn, Mwn, Mbn, RCt, RCw, NCt, NCw, RowClust, ColClust, BlockClust, AddMessage, ErrMessage, cancel_pressed]

def ClusterPvalues(ClusterSize, nbGroups, Mt, RCt, NCt,RowGroups, ListGroups, Ngroup):
    """Calculate Pvalue of each group versus cluster

    """
    n, nc = Mt.shape
    ClusterSize = ClusterSize.astype(np.int)
    nbGroups = int(nbGroups)
    RCt = RCt.astype(np.int)
    NCt = NCt.astype(np.int)
    ClusterSize = np.reshape(ClusterSize, nc)
    RCt = np.reshape(RCt, (n,))
    NCt = np.reshape(NCt, (nc,))
    RowGroups = np.reshape(RowGroups, (n,))

    ClusterGroup = np.zeros(nc)
    ClusterProb = np.zeros(nc)
    ClusterNgroup = np.zeros((nc,nbGroups))
    ClusterNWgroup = np.zeros((nc,nbGroups))
    prun = 0

    for k in range(0, nc):
        if ClusterSize[k] > 0:
            # Find main group (only if clustersize>2)
            kfound0 = 0
            for iGroup in range(0, nbGroups):
                if k == 0:
                    MX = np.where(RowGroups[RCt[0:NCt[0]]] == ListGroups[iGroup])[0]
                    if len(MX) >= 1:
                        ClusterNWgroup[k, iGroup] = np.sum(
                            Mt[RCt[0:NCt[0]][MX], k]
                        )
                        ClusterNgroup[k, iGroup] = len(MX)
                else:
                    MX = np.where(RowGroups[RCt[NCt[k-1]:NCt[k]]] == ListGroups[iGroup])[0]
                    if len(MX) >= 1:
                        ClusterNWgroup[k, iGroup] = np.sum(
                            Mt[RCt[NCt[k-1]:NCt[k]][MX], k]
                        )
                        ClusterNgroup[k, iGroup] = len(MX)

                if ClusterNgroup[k, iGroup] > kfound0:
                    kfound0 = ClusterNgroup[k, iGroup]
                    ClusterGroup[k] = iGroup

            SumClusterNWgroup = sum(ClusterNWgroup[k, :]);
            for iGroup in range(0, nbGroups):
                ClusterNWgroup[k, iGroup] = ClusterSize[k] * ClusterNWgroup[k, iGroup] / SumClusterNWgroup

        else:
            for iGroup in range(0, nbGroups):
                ClusterNgroup[k, iGroup] = 0
                ClusterNWgroup[k, iGroup] = 0

            ClusterGroup[k] = 1

    for k in range(0, nc):
        if ClusterSize[k] > 2:
            ClusterProb[k] = hypergeom.sf(ClusterNgroup[k, int(ClusterGroup[k])], n, Ngroup[int(ClusterGroup[k])], ClusterSize[k], loc=0) + \
                             hypergeom.pmf(ClusterNgroup[k, int(ClusterGroup[k])], n, Ngroup[int(ClusterGroup[k])], ClusterSize[k], loc=0)
        else:
            ClusterProb[k] = 1

    for k in range(0, nc):
        for iGroup in range(0, nbGroups):
            if ClusterNWgroup[k, iGroup]:
                prun += ClusterNWgroup[k, iGroup] * math.log(
                    ClusterNWgroup[k, iGroup] / (ClusterSize[k] * Ngroup[iGroup] / n))

    return [prun, ClusterGroup, ClusterProb, ClusterNgroup, ClusterNWgroup]

def GlobalSign(Nrun, nbGroups, Mt, RCt, NCt, RowGroups, ListGroups, Ngroup, myStatusBox):
    """Calculate global significance of association with a covariate
        following multiple factorization trials
    """

    n, nc = Mt.shape
    Nrun = int(Nrun)
    nbGroups = int(nbGroups)
    RCt = RCt.astype(np.int)
    NCt = NCt.astype(np.int)
    ClusterSize = np.zeros(nc)
    RCt = np.reshape(RCt, n)
    NCt = np.reshape(NCt, nc)
    cancel_pressed = 0
    for k in range(0, nc):
        if k == 0:
            ClusterSize[k] = NCt[0]
        else:
            ClusterSize[k] = NCt[k] - NCt[k - 1]

    if nbGroups > 1:
        RowGroups = np.reshape(RowGroups, (n,))
        StepIter = np.round(Nrun / 10)
        pbar_step = 10
        Pglob = 1
        for irun in range(0, Nrun):
            if irun % StepIter == 0:
                myStatusBox.update_status(delay=1,
                                          status='Calculating global significance: ' + str(irun) + ' / ' + str(Nrun))
                myStatusBox.update_bar(delay=1, step=pbar_step)
                if myStatusBox.cancel_pressed:
                    cancel_pressed = 1
                    return [ClusterSize, Pglob, prun, ClusterProb, ClusterGroup, ClusterNgroup, cancel_pressed]

            prun, ClusterGroup, ClusterProb, ClusterNgroup, ClusterNWgroup = ClusterPvalues(ClusterSize, nbGroups, Mt,
                                                                                            RCt, NCt, RowGroups,
                                                                                            ListGroups, Ngroup)
            if irun == 0:
                ClusterProb0 = np.copy(ClusterProb)
                ClusterGroup0 = np.copy(ClusterGroup)
                ClusterNgroup0 = np.copy(ClusterNgroup)
                RowGroups0 = np.copy(RowGroups)
                prun0 = prun;
            else:
                if prun >= prun0:
                    Pglob += 1

            if irun < Nrun - 1:
                # permute row groups
                Boot = np.random.permutation
                RowGroups = RowGroups0[np.random.permutation(n)]
            else:
                # Restore
                ClusterProb = ClusterProb0
                ClusterGroup = ClusterGroup0
                ClusterNgroup = ClusterNgroup0
                RowGroups = RowGroups0
                prun = prun0
                Pglob /= Nrun
    else:
        Pglob = np.NaN
        prun = np.NaN
        ClusterProb = np.array([])
        ClusterGroup = np.array([])
        ClusterNgroup = np.array([])

    return [ClusterSize, Pglob, prun, ClusterProb, ClusterGroup, ClusterNgroup, cancel_pressed]

def shift(arr, num, fill_value=EPSILON):
    """Shift a vector

    Parameters
    ----------
        arr: Input column vector
        num: number of indexes to shift ( < 0: To the left )
        
    Returns
    -------
        result: shifted column vector
    
    Examples
    --------
    >>> import pytest
    >>> import numpy as np
    >>> from nmtf.modules.nmtf_utils import shift
    >>> arr = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    >>> b = arr
    >>> num = -2
    >>> fill_value = 0
    >>> shift(b, num, fill_value)
    array([ 3,  4,  5,  6,  7,  8,  9, 10,  0,  0])
    >>> num = 2
    array([0, 0, 1, 2, 3, 4, 5, 6, 7, 8])
    """

    result = np.empty_like(arr)
    if num > 0:
        result[:num] = fill_value
        result[num:] = arr[:-num]
    elif num < 0:
        result[num:] = fill_value
        result[:num] = arr[-num:]
    else:
        result[:] = arr
    return result

def sparse_opt(b, alpha, two_sided):
    """Return the L2-closest vector with sparsity alpha 
    
    Input:
        b: original vector
    Output:
        x: sparse vector
    
    Reference
    ---------

    V. K. Potluru & all (2013) Block Coordinate Descent for Sparse NMF arXiv:1301.3527v2 [cs.LG]

    Examples
    --------
    >>> from nmtf.modules.nmtf_utils import sparse_opt
    >>> b_ = np.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    >>> alpha_ = 1
    >>> two_sided_ = True
    >>> sparse_opt(b_, alpha_, two_sided_)
    array([ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0., nan])
    >>> two_sided_ = False
    >>> sparse_opt(b_, alpha_, two_sided_)
    array([ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0., nan])
    """
    m = b.size
    if two_sided is False:
        m_alpha = (np.sqrt(m) - np.linalg.norm(b, ord=1)/np.linalg.norm(b, ord=2))/(np.sqrt(m)-1)
        if (alpha == 0) or (alpha <= m_alpha):
            return b
    
    b_rank = np.argsort(-b)
    ranks = np.empty_like(b_rank)
    ranks[b_rank] = np.arange(m)
    b_norm= np.linalg.norm(b)
    a = b[b_rank] / b_norm
    k = math.sqrt(m) - alpha * (math.sqrt(m)-1)
    p0 = m
    mylambda0 = np.nan
    mu0 = np.nan
    mylambda = mylambda0
    mu = mu0
    
    for p in range(int(np.ceil(k**2)), m+1):
        mylambda0 = mylambda
        mu0 = mu
        mylambda = -np.sqrt((p * np.linalg.norm(a[0:p])**2 - np.linalg.norm(a[0:p], ord=1)**2)/(p-k**2))
        mu = -(np.linalg.norm(a[0:p], ord=1) + k*mylambda) / p
        if a[p-1] < -mu:
            p0 = p-1
            mylambda = mylambda0
            mu = mu0
            break
    
    x = np.zeros(m)
    x[0:p0] = -b_norm * (a[0:p0] + mu) / mylambda
    return x[ranks]

Functions

def BuildClusters(Mt, Mw, Mb, MtPct, MwPct, NBlocks, BlkSize, NMFCalculateLeverage, NMFUseRobustLeverage, NMFAlgo, NMFRobustClusterByStability, CellPlotOrderedClusters, AddMessage, myStatusBox)

Builder clusters from leverages

Expand source code
def BuildClusters(Mt, Mw, Mb, MtPct, MwPct, NBlocks, BlkSize, NMFCalculateLeverage, NMFUseRobustLeverage, NMFAlgo,
                  NMFRobustClusterByStability, CellPlotOrderedClusters, AddMessage, myStatusBox):
    """Builder clusters from leverages

    """
    NBlocks = int(NBlocks)
    myStatusBox.update_status(delay=1, status='Build clusters...')
    ErrMessage = ''
    cancel_pressed = 0
    n, nc = np.shape(Mt)
    p = np.shape(Mw)[0]
    if NMFAlgo >= 5:
        BlockClust = np.zeros(NBlocks)
    else:
        BlockClust = np.array([])
        Mbn = np.array([])

    RCt = np.zeros(n)
    RCw = np.zeros(p)
    NCt = np.zeros(nc)
    NCw = np.zeros(NBlocks * nc)
    RowClust = np.zeros(n)
    ColClust = np.zeros(p)
    ilast = 0
    jlast = 0

    if NMFCalculateLeverage == 1:
        myStatusBox.update_status(delay=1, status="Leverages - Left components...")
        Mtn, AddMessage, ErrMessage, cancel_pressed = Leverage(Mt, NMFUseRobustLeverage, AddMessage, myStatusBox)
        myStatusBox.update_status(delay=1, status="Leverages - Right components...")
        Mwn, AddMessage, ErrMessage, cancel_pressed = Leverage(Mw, NMFUseRobustLeverage, AddMessage, myStatusBox)
        if NMFAlgo >= 5:
            myStatusBox.update_status(delay=1, status="Leverages - Block components...")
            Mbn, AddMessage, ErrMessage, cancel_pressed = Leverage(Mb, NMFUseRobustLeverage, AddMessage, myStatusBox)
    else:
        Mtn = Mt
        Mwn = Mw
        if NMFAlgo >= 5:
            Mbn = Mb

    if NMFAlgo >= 5:
        for iBlock in range(0, NBlocks):
            if nc > 1:
                BlockClust[iBlock] = np.argmax(Mbn[iBlock, :]) + 1
            else:
                BlockClust[iBlock] = 1

    for i in range(0, n):
        if nc > 1:
            if (isinstance(MtPct, np.ndarray)) & (NMFRobustClusterByStability > 0):
                RowClust[i] = np.argmax(MtPct[i, :]) + 1
            else:
                RowClust[i] = np.argmax(Mtn[i, :]) + 1
        else:
            RowClust[i] = 1

    for j in range(0, p):
        if nc > 1:
            if (isinstance(MwPct, np.ndarray)) & (NMFRobustClusterByStability > 0):
                ColClust[j] = np.argmax(MwPct[j, :]) + 1
            else:
                ColClust[j] = np.argmax(Mwn[j, :]) + 1
        else:
            ColClust[j] = 1

    if (CellPlotOrderedClusters == 1) & (nc >= 3):
        MtS = np.zeros(n)
        MwS = np.zeros(p)
        for i in range(0, n):
            if RowClust[i] == 1:
                MtS[i] = sum(k * Mtn[i, k] for k in range(0, 2)) / \
                         max(sum(Mtn[i, k] for k in range(0, 2)), 1.e-10)
            elif RowClust[i] == nc:
                MtS[i] = sum(k * Mtn[i, k] for k in range(nc - 2, nc)) / \
                         max(sum(Mtn[i, k] for k in range(nc - 2, nc)), 1.e-10)
            else:
                MtS[i] = sum(k * Mtn[i, k] for k in range(int(RowClust[i] - 2), int(RowClust[i] + 1))) / \
                         max(sum(Mtn[i, k] for k in range(int(RowClust[i] - 2), int(RowClust[i] + 1))), 1.e-10)

        for j in range(0, p):
            if ColClust[j] == 1:
                MwS[j] = sum(k * Mwn[j, k] for k in range(0, 2)) / \
                         max(sum(Mwn[j, k] for k in range(0, 2)), 1.e-10)
            elif ColClust[j] == nc:
                MwS[j] = sum(k * Mwn[j, k] for k in range(nc - 2, nc)) / \
                         max(sum(Mwn[j, k] for k in range(nc - 2, nc)), 1.e-10)
            else:
                MwS[j] = sum(k * Mwn[j, k] for k in range(int(ColClust[j] - 2), int(ColClust[j] + 1))) / \
                         max(sum(Mwn[j, k] for k in range(int(ColClust[j] - 2), int(ColClust[j] + 1))), 1.e-10)

    for k in range(0, nc):
        Mindex1 = np.where(RowClust == k + 1)[0]
        if len(Mindex1) > 0:
            if len(Mindex1) == 1:
                Mindex = Mindex1,
            elif (nc == 2) & (k == 1):
                Mindex = Mindex1[np.argsort(Mtn[Mindex1, k])]
            elif (CellPlotOrderedClusters == 1) & (nc >= 3):
                Mindex = Mindex1[np.argsort(MtS[Mindex1])]
            else:
                Mindex = Mindex1[np.argsort(-Mtn[Mindex1, k])]

            RCt[ilast:len(Mindex) + ilast] = Mindex
            ilast += len(Mindex)

        NCt[k] = ilast

    for iBlock in range(0, NBlocks):
        if iBlock == 0:
            j1 = 0
            j2 = int(abs(BlkSize[iBlock]))
        else:
            j1 = j2
            j2 += int(abs(BlkSize[iBlock]))

        for k in range(0, nc):
            Mindex2 = np.where(ColClust[j1:j2] == k + 1)[0]
            if len(Mindex2) > 0:
                Mindex2 = Mindex2 + j1
                if len(Mindex2) == 1:
                    Mindex = Mindex2
                elif (nc == 2) & (k == 1):
                    Mindex = Mindex2[np.argsort(Mwn[Mindex2, k])]
                elif (CellPlotOrderedClusters == 1) & (nc >= 3):
                    Mindex = Mindex2[np.argsort(MwS[Mindex2])]
                else:
                    Mindex = Mindex2[np.argsort(-Mwn[Mindex2, k])]

                RCw[jlast:len(Mindex) + jlast] = Mindex
                jlast += len(Mindex)

            NCw[iBlock * nc + k] = jlast

    return [Mtn, Mwn, Mbn, RCt, RCw, NCt, NCw, RowClust, ColClust, BlockClust, AddMessage, ErrMessage, cancel_pressed]
def ClusterPvalues(ClusterSize, nbGroups, Mt, RCt, NCt, RowGroups, ListGroups, Ngroup)

Calculate Pvalue of each group versus cluster

Expand source code
def ClusterPvalues(ClusterSize, nbGroups, Mt, RCt, NCt,RowGroups, ListGroups, Ngroup):
    """Calculate Pvalue of each group versus cluster

    """
    n, nc = Mt.shape
    ClusterSize = ClusterSize.astype(np.int)
    nbGroups = int(nbGroups)
    RCt = RCt.astype(np.int)
    NCt = NCt.astype(np.int)
    ClusterSize = np.reshape(ClusterSize, nc)
    RCt = np.reshape(RCt, (n,))
    NCt = np.reshape(NCt, (nc,))
    RowGroups = np.reshape(RowGroups, (n,))

    ClusterGroup = np.zeros(nc)
    ClusterProb = np.zeros(nc)
    ClusterNgroup = np.zeros((nc,nbGroups))
    ClusterNWgroup = np.zeros((nc,nbGroups))
    prun = 0

    for k in range(0, nc):
        if ClusterSize[k] > 0:
            # Find main group (only if clustersize>2)
            kfound0 = 0
            for iGroup in range(0, nbGroups):
                if k == 0:
                    MX = np.where(RowGroups[RCt[0:NCt[0]]] == ListGroups[iGroup])[0]
                    if len(MX) >= 1:
                        ClusterNWgroup[k, iGroup] = np.sum(
                            Mt[RCt[0:NCt[0]][MX], k]
                        )
                        ClusterNgroup[k, iGroup] = len(MX)
                else:
                    MX = np.where(RowGroups[RCt[NCt[k-1]:NCt[k]]] == ListGroups[iGroup])[0]
                    if len(MX) >= 1:
                        ClusterNWgroup[k, iGroup] = np.sum(
                            Mt[RCt[NCt[k-1]:NCt[k]][MX], k]
                        )
                        ClusterNgroup[k, iGroup] = len(MX)

                if ClusterNgroup[k, iGroup] > kfound0:
                    kfound0 = ClusterNgroup[k, iGroup]
                    ClusterGroup[k] = iGroup

            SumClusterNWgroup = sum(ClusterNWgroup[k, :]);
            for iGroup in range(0, nbGroups):
                ClusterNWgroup[k, iGroup] = ClusterSize[k] * ClusterNWgroup[k, iGroup] / SumClusterNWgroup

        else:
            for iGroup in range(0, nbGroups):
                ClusterNgroup[k, iGroup] = 0
                ClusterNWgroup[k, iGroup] = 0

            ClusterGroup[k] = 1

    for k in range(0, nc):
        if ClusterSize[k] > 2:
            ClusterProb[k] = hypergeom.sf(ClusterNgroup[k, int(ClusterGroup[k])], n, Ngroup[int(ClusterGroup[k])], ClusterSize[k], loc=0) + \
                             hypergeom.pmf(ClusterNgroup[k, int(ClusterGroup[k])], n, Ngroup[int(ClusterGroup[k])], ClusterSize[k], loc=0)
        else:
            ClusterProb[k] = 1

    for k in range(0, nc):
        for iGroup in range(0, nbGroups):
            if ClusterNWgroup[k, iGroup]:
                prun += ClusterNWgroup[k, iGroup] * math.log(
                    ClusterNWgroup[k, iGroup] / (ClusterSize[k] * Ngroup[iGroup] / n))

    return [prun, ClusterGroup, ClusterProb, ClusterNgroup, ClusterNWgroup]
def GlobalSign(Nrun, nbGroups, Mt, RCt, NCt, RowGroups, ListGroups, Ngroup, myStatusBox)

Calculate global significance of association with a covariate following multiple factorization trials

Expand source code
def GlobalSign(Nrun, nbGroups, Mt, RCt, NCt, RowGroups, ListGroups, Ngroup, myStatusBox):
    """Calculate global significance of association with a covariate
        following multiple factorization trials
    """

    n, nc = Mt.shape
    Nrun = int(Nrun)
    nbGroups = int(nbGroups)
    RCt = RCt.astype(np.int)
    NCt = NCt.astype(np.int)
    ClusterSize = np.zeros(nc)
    RCt = np.reshape(RCt, n)
    NCt = np.reshape(NCt, nc)
    cancel_pressed = 0
    for k in range(0, nc):
        if k == 0:
            ClusterSize[k] = NCt[0]
        else:
            ClusterSize[k] = NCt[k] - NCt[k - 1]

    if nbGroups > 1:
        RowGroups = np.reshape(RowGroups, (n,))
        StepIter = np.round(Nrun / 10)
        pbar_step = 10
        Pglob = 1
        for irun in range(0, Nrun):
            if irun % StepIter == 0:
                myStatusBox.update_status(delay=1,
                                          status='Calculating global significance: ' + str(irun) + ' / ' + str(Nrun))
                myStatusBox.update_bar(delay=1, step=pbar_step)
                if myStatusBox.cancel_pressed:
                    cancel_pressed = 1
                    return [ClusterSize, Pglob, prun, ClusterProb, ClusterGroup, ClusterNgroup, cancel_pressed]

            prun, ClusterGroup, ClusterProb, ClusterNgroup, ClusterNWgroup = ClusterPvalues(ClusterSize, nbGroups, Mt,
                                                                                            RCt, NCt, RowGroups,
                                                                                            ListGroups, Ngroup)
            if irun == 0:
                ClusterProb0 = np.copy(ClusterProb)
                ClusterGroup0 = np.copy(ClusterGroup)
                ClusterNgroup0 = np.copy(ClusterNgroup)
                RowGroups0 = np.copy(RowGroups)
                prun0 = prun;
            else:
                if prun >= prun0:
                    Pglob += 1

            if irun < Nrun - 1:
                # permute row groups
                Boot = np.random.permutation
                RowGroups = RowGroups0[np.random.permutation(n)]
            else:
                # Restore
                ClusterProb = ClusterProb0
                ClusterGroup = ClusterGroup0
                ClusterNgroup = ClusterNgroup0
                RowGroups = RowGroups0
                prun = prun0
                Pglob /= Nrun
    else:
        Pglob = np.NaN
        prun = np.NaN
        ClusterProb = np.array([])
        ClusterGroup = np.array([])
        ClusterNgroup = np.array([])

    return [ClusterSize, Pglob, prun, ClusterProb, ClusterGroup, ClusterNgroup, cancel_pressed]
def Leverage(V, NMFUseRobustLeverage, AddMessage, myStatusBox)

Calculate leverages

Input

V: Input column vectors NMFUseRobustLeverage: Estimate robust through columns of V

Output

Vn: Leveraged column vectors

Reference

P. Fogel et al (2016) Applications of a Novel Clustering Approach Using Non-Negative Matrix Factorization to Environmental Research in Public Health Int. J. Environ. Res. Public Health 2016, 13, 509; doi:10.3390/ijerph13050509

Expand source code
def Leverage(V, NMFUseRobustLeverage, AddMessage, myStatusBox):
    """Calculate leverages

    Input:
        V: Input column vectors
        NMFUseRobustLeverage: Estimate robust through columns of V
    Output:
        Vn: Leveraged column vectors
    
    Reference
    ---------
    
    P. Fogel et al (2016) Applications of a Novel Clustering Approach Using Non-Negative Matrix Factorization to Environmental
        Research in Public Health Int. J. Environ. Res. Public Health 2016, 13, 509; doi:10.3390/ijerph13050509

    """

    ErrMessage = ''
    cancel_pressed = 0

    n, nc = V.shape
    Vn = np.zeros((n, nc))
    Vr = np.zeros((n, nc))
    if NMFUseRobustLeverage > 0:
        MaxV, AddMessage, ErrMessage, cancel_pressed = RobustMax(V, AddMessage, myStatusBox)
        if cancel_pressed == 1:
            return Vn, AddMessage, ErrMessage, cancel_pressed
    else:
        MaxV = np.max(V, axis=0)

    pbar_step = 100 / nc
    myStatusBox.init_bar(delay=1)
    for k in range(0, nc):
        Vr[V[:, k] > 0, k] = 1
        Vn[:, k] = MaxV[k] - V[:, k]
        Vn[Vn[:, k] < 0, k] = 0
        Vn[:, k] = Vn[:, k] ** 2
        for k2 in range(0, nc):
            if k2 != k:
                Vn[:, k] = Vn[:, k] + V[:, k2] ** 2

        Status = 'Leverage: Comp ' + str(k+1)
        myStatusBox.update_status(delay=1, status=Status)
        myStatusBox.update_bar(delay=1, step=pbar_step)
        if myStatusBox.cancel_pressed:
            cancel_pressed = 1
            return Vn, AddMessage, ErrMessage, cancel_pressed

    Vn = 10 ** (-Vn / (2 * np.mean(Vn))) * Vr
    return [Vn, AddMessage, ErrMessage, cancel_pressed]
def NMFDet(Mt, Mw, NMFExactDet)

Volume occupied by Left and Right factoring vectors

Input

Mt: Left hand matrix Mw: Right hand matrix NMFExactDet if = 0 compute an approximate determinant in reduced space n x n or p x p through random sampling in the largest dimension

Output

detXcells: determinant

Reference

P. Fogel et al (2016) Applications of a Novel Clustering Approach Using Non-Negative Matrix Factorization to Environmental Research in Public Health Int. J. Environ. Res. Public Health 2016, 13, 509; doi:10.3390/ijerph13050509

Expand source code
def NMFDet(Mt, Mw, NMFExactDet):
    """Volume occupied by Left and Right factoring vectors

    Input:
        Mt: Left hand matrix
        Mw: Right hand matrix
        NMFExactDet if = 0 compute an approximate determinant in reduced space n x n or p x p
        through random sampling in the largest dimension
    Output:
        detXcells: determinant

    Reference
    ---------
    
    P. Fogel et al (2016) Applications of a Novel Clustering Approach Using Non-Negative Matrix Factorization to Environmental
        Research in Public Health Int. J. Environ. Res. Public Health 2016, 13, 509; doi:10.3390/ijerph13050509

    """
     
    n, nc = Mt.shape
    p, nc = Mw.shape
    nxp = n * p
    if (NMFExactDet > 0) | (n == p):
        Xcells = np.zeros((nxp, nc))
        for k in range(0, nc):
            Xcells[:, k] = np.reshape(np.reshape(Mt[:, k], (n, 1)) @ np.reshape(Mw[:, k], (1, p)), nxp)
            norm_k = np.linalg.norm(Xcells[:, k])
            if norm_k > 0 :
                Xcells[:, k] = Xcells[:, k] / norm_k
            else:
                Xcells[:, k] = 0
    else:
        if n > p:
            Xcells = np.zeros((p ** 2, nc))
            ID = np.arange(n)
            np.random.shuffle(ID)
            ID = ID[0:p]
            for k in range(0, nc):
                Xcells[:, k] = np.reshape(np.reshape(Mt[ID, k], (p, 1)) @ np.reshape(Mw[:, k], (1, p)), p ** 2)
                norm_k = np.linalg.norm(Xcells[:, k])
                if norm_k > 0 :
                    Xcells[:, k] = Xcells[:, k] / norm_k
                else:
                    Xcells[:, k] = 0
        else:
            Xcells = np.zeros((n ** 2, nc))
            ID = np.arange(p)
            np.random.shuffle(ID)
            ID = ID[0:n]
            for k in range(0, nc):
                Xcells[:, k] = np.reshape(np.reshape(Mt[:, k], (n, 1)) @ np.reshape(Mw[ID, k], (1, n)), n ** 2)
                norm_k = np.linalg.norm(Xcells[:, k])
                if norm_k > 0 :
                    Xcells[:, k] = Xcells[:, k] / norm_k
                else:
                    Xcells[:, k] = 0

    detXcells = np.linalg.det(Xcells.T @ Xcells)
    return detXcells
def NMFGetConvexScores(Mt, Mw, Mh, flag, AddMessage)

Rescale scores to sum up to 1 (used with deconvolution)

Input

Mt: Left factoring matrix Mw: Right factoring matrix flag: Current value

Output

Mt: Left factoring matrix Mw: Right factoring matrix flag: += 1: Negative weights found

Expand source code
def NMFGetConvexScores(Mt, Mw, Mh, flag, AddMessage):
    """Rescale scores to sum up to 1 (used with deconvolution)
    Input:
        Mt: Left factoring matrix
        Mw: Right factoring matrix
        flag:  Current value
    Output:

       Mt: Left factoring matrix
        Mw: Right factoring matrix
        flag: += 1: Negative weights found
    """
    ErrMessage = ''
    cancel_pressed = 0

    n, nc = Mt.shape
    n_Mh = Mh.shape[0]
    try:
        Malpha = np.linalg.inv(Mt.T @ Mt) @ (Mt.T @ np.ones(n))
    except:
        Malpha = np.linalg.pinv(Mt.T @ Mt) @ (Mt.T @ np.ones(n))

    if np.where(Malpha < 0)[0].size > 0:
        flag += 1
        Malpha = nnls(Mt, np.ones(n))[0]

    n_zeroed = 0
    for k in range(0, nc):
        Mt[:, k] *= Malpha[k]
        if n_Mh > 0:
            Mh[:, k] *= Malpha[k]
        if Malpha[k] > 0:
            Mw[:, k] /= Malpha[k]
        else:
            n_zeroed += 1
        
    if n_zeroed > 0:
        AddMessage.insert(len(AddMessage), 'Ncomp=' + str(nc) + ': ' + str(n_zeroed) + ' components were zeroed')

    # Goodness of fit
    R2 = 1 - np.linalg.norm(np.sum(Mt.T, axis=0).T - np.ones(n)) ** 2 / n
    AddMessage.insert(len(AddMessage), 'Ncomp=' + str(nc) + ': Goodness of mixture fit = ' + str(round(R2, 2)))
    # AddMessage.insert(len(AddMessage), 'Ncomp=' + str(nc) + ': Goodness of mixture fit before adjustement = ' + str(round(R2, 2)))

    # for i in range(0, n):
    #     Mt[i, :] /= np.sum(Mt[i, :])

    return [Mt, Mw, Mh, flag, AddMessage, ErrMessage, cancel_pressed]
def RobustMax(V0, AddMessage, myStatusBox)

Robust max of column vectors

For each column: = weighted mean of column elements larger than 95% percentile for each row, weight = specificity of the column value wrt other columns

Input

V0: column vectors Output: Robust max by column

Reference

P. Fogel et al (2016) Applications of a Novel Clustering Approach Using Non-Negative Matrix Factorization to Environmental Research in Public Health Int. J. Environ. Res. Public Health 2016, 13, 509; doi:10.3390/ijerph13050509

Expand source code
def RobustMax(V0, AddMessage, myStatusBox):
    """Robust max of column vectors

    For each column:
         = weighted mean of column elements larger than 95% percentile
        for each row, weight = specificity of the column value wrt other columns
    Input:
        V0: column vectors
    Output: Robust max by column

    Reference
    ---------

    P. Fogel et al (2016) Applications of a Novel Clustering Approach Using Non-Negative Matrix Factorization to Environmental
        Research in Public Health Int. J. Environ. Res. Public Health 2016, 13, 509; doi:10.3390/ijerph13050509

    """
    ErrMessage = ''
    cancel_pressed = 0

    V = V0.copy()
    n, nc = V.shape
    if nc > 1:
        ncI = 1 / nc
        lnncI = 1 / math.log(nc)

    ind = max(math.ceil(n * .05) - 1, min(n - 1, 2))
    Scale = np.max(V, axis=0)
    for k in range(0, nc):
        V[:, k] = V[:, k] / Scale[k]

    RobMax = np.max(V, axis=0)
    RobMax0 = 1e+99 * np.ones(nc)
    iIter = 0
    maxIterations = 100
    pbar_step = 100 / maxIterations
    myStatusBox.init_bar(delay=1)

    while ((np.linalg.norm(RobMax - RobMax0) / np.linalg.norm(RobMax)) ** 2 > 1e-6) & (iIter < maxIterations):
        for k in range(0, nc):
            V = V[np.argsort(-V[:, k]), :]
            if nc > 1:
                den = np.repeat(np.sum(V, axis=1), nc).reshape((n, nc))
                den[den == 0] = 1.e-10
                Prob = V / den
                Prob[Prob == 0] = 1.e-10
                Specificity = (np.ones(n) + np.sum(Prob * np.log(Prob), axis=1) * lnncI)
                Specificity[Prob[:, k] < ncI] = 0
            else:
                Specificity = np.ones(n)

            Specificity[ind:n] = 0
            RobMax0[k] = RobMax[k]
            RobMax[k] = np.sum(V[:, k] * Specificity) / np.sum(Specificity)
            V[V[:, k] > RobMax[k], k] = RobMax[k]

        myStatusBox.update_bar(delay=1, step=pbar_step)
        if myStatusBox.cancel_pressed:
            cancel_pressed = 1
            return RobMax * Scale, AddMessage, ErrMessage, cancel_pressed

        iIter += 1

    if iIter == maxIterations:
        AddMessage.insert(len(AddMessage),
                          'Warning: Max iterations reached while calculating robust max (N = ' + str(n) + ').')

    return [RobMax * Scale, AddMessage, ErrMessage, cancel_pressed]
def percentile_exc(a, q)

Percentile, exclusive

Input

a: Matrix q: Percentile

Output

Percentile

Expand source code
def percentile_exc(a, q):
    """Percentile, exclusive

    Input:
        a: Matrix
        q: Percentile
    Output:
        Percentile
    """
    return np.percentile(np.concatenate((np.array([np.min(a)]), a.flatten(), np.array([np.max(a)]))), q)
def shift(arr, num, fill_value=1.1920929e-07)

Shift a vector

Parameters

arr: Input column vector
num: number of indexes to shift ( < 0: To the left )

Returns

result: shifted column vector

Examples

>>> import pytest
>>> import numpy as np
>>> from nmtf.modules.nmtf_utils import shift
>>> arr = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
>>> b = arr
>>> num = -2
>>> fill_value = 0
>>> shift(b, num, fill_value)
array([ 3,  4,  5,  6,  7,  8,  9, 10,  0,  0])
>>> num = 2
array([0, 0, 1, 2, 3, 4, 5, 6, 7, 8])
Expand source code
def shift(arr, num, fill_value=EPSILON):
    """Shift a vector

    Parameters
    ----------
        arr: Input column vector
        num: number of indexes to shift ( < 0: To the left )
        
    Returns
    -------
        result: shifted column vector
    
    Examples
    --------
    >>> import pytest
    >>> import numpy as np
    >>> from nmtf.modules.nmtf_utils import shift
    >>> arr = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    >>> b = arr
    >>> num = -2
    >>> fill_value = 0
    >>> shift(b, num, fill_value)
    array([ 3,  4,  5,  6,  7,  8,  9, 10,  0,  0])
    >>> num = 2
    array([0, 0, 1, 2, 3, 4, 5, 6, 7, 8])
    """

    result = np.empty_like(arr)
    if num > 0:
        result[:num] = fill_value
        result[num:] = arr[:-num]
    elif num < 0:
        result[num:] = fill_value
        result[:num] = arr[-num:]
    else:
        result[:] = arr
    return result
def sparse_opt(b, alpha, two_sided)

Return the L2-closest vector with sparsity alpha

Input

b: original vector

Output

x: sparse vector

Reference

V. K. Potluru & all (2013) Block Coordinate Descent for Sparse NMF arXiv:1301.3527v2 [cs.LG]

Examples

>>> from nmtf.modules.nmtf_utils import sparse_opt
>>> b_ = np.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
>>> alpha_ = 1
>>> two_sided_ = True
>>> sparse_opt(b_, alpha_, two_sided_)
array([ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0., nan])
>>> two_sided_ = False
>>> sparse_opt(b_, alpha_, two_sided_)
array([ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0., nan])
Expand source code
def sparse_opt(b, alpha, two_sided):
    """Return the L2-closest vector with sparsity alpha 
    
    Input:
        b: original vector
    Output:
        x: sparse vector
    
    Reference
    ---------

    V. K. Potluru & all (2013) Block Coordinate Descent for Sparse NMF arXiv:1301.3527v2 [cs.LG]

    Examples
    --------
    >>> from nmtf.modules.nmtf_utils import sparse_opt
    >>> b_ = np.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    >>> alpha_ = 1
    >>> two_sided_ = True
    >>> sparse_opt(b_, alpha_, two_sided_)
    array([ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0., nan])
    >>> two_sided_ = False
    >>> sparse_opt(b_, alpha_, two_sided_)
    array([ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0., nan])
    """
    m = b.size
    if two_sided is False:
        m_alpha = (np.sqrt(m) - np.linalg.norm(b, ord=1)/np.linalg.norm(b, ord=2))/(np.sqrt(m)-1)
        if (alpha == 0) or (alpha <= m_alpha):
            return b
    
    b_rank = np.argsort(-b)
    ranks = np.empty_like(b_rank)
    ranks[b_rank] = np.arange(m)
    b_norm= np.linalg.norm(b)
    a = b[b_rank] / b_norm
    k = math.sqrt(m) - alpha * (math.sqrt(m)-1)
    p0 = m
    mylambda0 = np.nan
    mu0 = np.nan
    mylambda = mylambda0
    mu = mu0
    
    for p in range(int(np.ceil(k**2)), m+1):
        mylambda0 = mylambda
        mu0 = mu
        mylambda = -np.sqrt((p * np.linalg.norm(a[0:p])**2 - np.linalg.norm(a[0:p], ord=1)**2)/(p-k**2))
        mu = -(np.linalg.norm(a[0:p], ord=1) + k*mylambda) / p
        if a[p-1] < -mu:
            p0 = p-1
            mylambda = mylambda0
            mu = mu0
            break
    
    x = np.zeros(m)
    x[0:p0] = -b_norm * (a[0:p0] + mu) / mylambda
    return x[ranks]

Classes

class StatusBox
Expand source code
class StatusBox:
    def __init__(self):
        self.root = Tk()
        self.root.title('irMF status - Python kernel')
        self.root.minsize(width=230, height=60)
        self.frame = Frame(self.root, borderwidth=6)
        self.frame.pack()
        self.var = StringVar()
        self.status = Label(self.frame, textvariable=self.var, width=60, height=1)
        self.status.pack(fill=NONE, padx=6, pady=6)
        self.pbar = ttk.Progressbar(self.frame, orient=HORIZONTAL, max=100, mode='determinate')
        self.pbar.pack(fill=NONE, padx=6, pady=6)
        Button(self.frame, text='Cancel', command=self.close_dialog).pack(fill=NONE, padx=6, pady=6)
        self.cancel_pressed = False
        self.n_steps = 0

    def close_dialog(self):
        self.cancel_pressed = True

    def update_bar(self, delay=1, step=1):
        self.n_steps += step
        self.pbar.step(step)
        self.pbar.after(delay, lambda: self.root.quit())
        self.root.mainloop()

    def init_bar(self, delay=1):
        self.update_bar(delay=1, step=100 - self.n_steps)
        self.n_steps = 0

    def update_status(self, delay=1, status=''):
        self.var.set(status)
        self.status.after(delay, lambda: self.root.quit())
        self.root.mainloop()

    def close(self):
        self.root.destroy()

    def myPrint(self, status=''):
        print(status)

Methods

def close(self)
Expand source code
def close(self):
    self.root.destroy()
def close_dialog(self)
Expand source code
def close_dialog(self):
    self.cancel_pressed = True
def init_bar(self, delay=1)
Expand source code
def init_bar(self, delay=1):
    self.update_bar(delay=1, step=100 - self.n_steps)
    self.n_steps = 0
def myPrint(self, status='')
Expand source code
def myPrint(self, status=''):
    print(status)
def update_bar(self, delay=1, step=1)
Expand source code
def update_bar(self, delay=1, step=1):
    self.n_steps += step
    self.pbar.step(step)
    self.pbar.after(delay, lambda: self.root.quit())
    self.root.mainloop()
def update_status(self, delay=1, status='')
Expand source code
def update_status(self, delay=1, status=''):
    self.var.set(status)
    self.status.after(delay, lambda: self.root.quit())
    self.root.mainloop()
class StatusBoxTqdm (verbose=0)
Expand source code
class StatusBoxTqdm:
    def __init__(self, verbose=0):
        self.LogIter = verbose
        if self.LogIter == 0:
            self.pbar = tqdm(total=100)

        self.cancel_pressed = False

    def update_bar(self, delay=0, step=1):
        if self.LogIter == 0:
            self.pbar.update(n=step)

    def init_bar(self, delay=0):
        if self.LogIter == 0:
            self.pbar.n = 0

    def update_status(self, delay=0, status=''):
        if self.LogIter == 0:
            self.pbar.set_description(status, refresh=False)
            self.pbar.refresh()

    def close(self):
        if self.LogIter == 0:
            self.pbar.clear()
            self.pbar.close()

    def myPrint(self, status=''):
       if self.LogIter == 1:
            print(status, end='\n')

Methods

def close(self)
Expand source code
def close(self):
    if self.LogIter == 0:
        self.pbar.clear()
        self.pbar.close()
def init_bar(self, delay=0)
Expand source code
def init_bar(self, delay=0):
    if self.LogIter == 0:
        self.pbar.n = 0
def myPrint(self, status='')
Expand source code
def myPrint(self, status=''):
   if self.LogIter == 1:
        print(status, end='\n')
def update_bar(self, delay=0, step=1)
Expand source code
def update_bar(self, delay=0, step=1):
    if self.LogIter == 0:
        self.pbar.update(n=step)
def update_status(self, delay=0, status='')
Expand source code
def update_status(self, delay=0, status=''):
    if self.LogIter == 0:
        self.pbar.set_description(status, refresh=False)
        self.pbar.refresh()