Source code for Pipeline.DPA

# coding=utf-8
# Non-parametric Density Peak clustering: 
# Automatic topography of high-dimensional data sets 
#
# Author: Maria d'Errico <mariaderr@gmail.com>
#
# Licence: BSD 3 clause

import numpy as np
import matplotlib.pyplot as plt
from matplotlib import cm
from matplotlib.colors import ListedColormap, LinearSegmentedColormap


from sklearn.base import BaseEstimator, ClusterMixin, DensityMixin, ClassifierMixin, TransformerMixin
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
from sklearn.utils.multiclass import unique_labels
from sklearn.metrics import euclidean_distances
from sklearn.neighbors import NearestNeighbors
from sklearn.neighbors import kneighbors_graph
from math import log, sqrt, exp, lgamma, pi, pow
from Pipeline.twoNN import twoNearestNeighbors
from Pipeline.PAk import PointAdaptive_kNN


VALID_METRIC = ['precomputed', 'euclidean','cosine']
VALID_DIM = ['auto', 'twoNN']
VALID_DENSITY = ['PAk', 'kNN']

[docs]def _DensityPeakAdvanced(densities, err_densities, k_hat, distances, indices, Z): """Main function implementing the Density Peak Advanced clustering algorithm: * Automatic detection of cluster centers * Point assignament to clusters in order of decreasing `g` * Topography reconstruction: search of saddle points and cluster merging Parameters ---------- densities : array [n_samples] The logarithm of the density at each point. err_densities : array [n_samples] The uncertainty in the density estimation, obtained by computing the inverse of the Fisher information matrix. k_hat : array [n_samples] The optimal number of neighbors for which the condition of constant density holds. distances: array [n_samples, k_max+1] Distances to the k_max neighbors of each points. The point itself is included in the array. indices : array [n_samples, k_max+1] Indices of the k_max neighbors of each points. The point itself is included in the array. Z : float, default = 1 The number of standard deviations, which fixes the level of statistical confidence at which one decides to consider a cluster meaningful. Attributes ---------- labels : array [Nclus] The clustering labels assigned to each point in the data set. halos : array [Nclus] The clustering labels assigned to each point in the data set. Points identified as halos have clustering lable equal to ``-1``. topography : array [Nclus, Nclus] Let be Nclus the number of clusters, the topography consists in a Nclus × Nclus symmetric matrix, in which the diagonal entries are the heights of the peaks and the off-diagonal entries are the heights of the saddle points. centers : array [Nclus] The list of points identified as the centers of the Nclus statistically significant clusters. """ from Pipeline import _DPA # We define as cluster centers the local maxima of g, where g is defined as density-err_density. g = [densities[i]-err_densities[i] for i in range(0,len(densities))] # Automatic detection of cluster centers #--------------------------------------- N = len(densities) centers = _DPA.get_centers(N, indices, k_hat, g) Nclus = len(centers) # Assign points to clusters #-------------------------- # Assign all the points that are not centers to the same cluster as the nearest point with higher g. # This assignation is performed in order of decreasing g clu_labels = _DPA.initial_assignment(g, N, indices, centers) # Topography reconstruction #-------------------------- # Finding saddle points between pair of clusters c and c'. # Halo points are also dentified as the points whose density is lower than # the density of the lowest saddle point, manely the set of points # whose assignation is not reliable. The clustering labels for halo point is set to -1. Rho_bord, Rho_bord_err, clu_labels, clu_halos, Nclus, centers_m = _DPA.get_borders(N, k_hat, indices, clu_labels, Nclus, g, densities, err_densities, Z, centers) topography = [] for i in range(0, Nclus-1): for j in range(i+1, Nclus): topography.append([i,j, Rho_bord[i][j], Rho_bord_err[i][j]]) labels = clu_labels halos = clu_halos return labels, halos, topography, g, centers_m
[docs]class DensityPeakAdvanced(ClusterMixin, BaseEstimator): """Class definition for the non-parametric Density Peak clustering. The default pipeline makes use of the `PAk` density estimator and of the `TWO-NN` intristic dimension estimator. The densities and the corresponding errors can also be provided as precomputed arrays. Parameters ---------- Z : float, default = 1 The number of standard deviations, which fixes the level of statistical confidence at which one decides to consider a cluster meaningful. metric : string, or callable The distance metric to use. If metric is a string, it must be one of the options allowed by scipy.spatial.distance.pdist for its metric parameter, or a metric listed in :obj:`VALID_METRIC = [precomputed, euclidean,cosine]`. If metric is ``precomputed``, X is assumed to be a distance matrix. Alternatively, if metric is a callable function, it is called on each pair of instances (rows) and the resulting value recorded. The callable should take two arrays from X as input and return a value indicating the distance between them. Default is ``euclidean``. densities : array [n_samples], default = None The logarithm of the density at each point. If provided, the following parameters are ignored: ``density_algo``, ``k_max``, ``D_thr``. err_densities : array [n_samples], default = None The uncertainty in the density estimation, obtained by computing the inverse of the Fisher information matrix. k_hat : array [n_samples], default = None The optimal number of neighbors for which the condition of constant density holds. nn_distances : array [n_samples, k_max+1] Distances to the k_max neighbors of each points. nn_indices : array [n_samples, k_max+1] Indices of the k_max neighbors of each points. affinity : string or callable, default 'precomputed' How to construct the affinity matrix. - ``nearest_neighbors`` : construct the affinity matrix by computing a graph of nearest neighbors. - ``rbf`` : construct the affinity matrix using a radial basis function (RBF) kernel. - ``precomputed`` : interpret ``X`` as a precomputed affinity matrix. - ``precomputed_nearest_neighbors`` : interpret ``X`` as a sparse graph of precomputed nearest neighbors, and constructs the affinity matrix by selecting the ``n_neighbors`` nearest neighbors. - one of the kernels supported by :func:`~sklearn.metrics.pairwise_kernels`. density_algo : string, default = "PAk" Define the algorithm to use as density estimator. It mast be one of the options allowed by :obj:`VALID_DENSITY = [PAk, kNN]`. k_max : int, default=1000 This parameter is considered if density_algo is ``PAk`` or ``kNN``, it is ignored otherwise. k_max set the maximum number of nearest-neighbors considered by the density estimator. If ``density_algo=PAk``, k_max is used by the algorithm in the search for the largest number of neighbors ``k_hat`` for which the condition of constant density holds, within a given level of confidence. If ``density_algo=kNN``, k_max set the number of neighbors to be used by the standard k-Nearest Neighbor algorithm. If the number of points in the sample N is less than the default value, k_max will be set automatically to the value ``N/2``. D_thr : float, default=23.92812698 This parameter is considered if density_algo is ``PAk``, it is ignored otherwise. Set the level of confidence in the PAk density estimator. The default value corresponds to a p-value of :math:`10^{-6}` for a :math:`\chiˆ2` distribution with one degree of freedom. dim : int, default = None Intrinsic dimensionality of the sample. If dim is provided, the following parameters are ignored: ``dim_algo``, ``blockAn``, ``block_ratio``, ``frac``. dim_algo : string, or callable, default="twoNN" Method for intrinsic dimensionality calculation. If dim_algo is ``auto``, dim is assumed to be equal to n_samples. If dim_algo is a string, it must be one of the options allowed by :obj:`VALID_DIM = [auto, twoNN]`. blockAn : bool, default=True This parameter is considered if dim_algo is ``twoNN``, it is ignored otherwise. If blockAn is True the algorithm perform a block analysis that allows discriminating the relevant dimensions as a function of the block size. This allows to study the stability of the estimation with respect to changes in the neighborhood size, which is crucial for ID estimations when the data lie on a manifold perturbed by a high-dimensional noise. block_ratio : int, default=5 This parameter is considered if dim_algo is ``twoNN``, it is ignored otherwise. Set the minimum size of the blocks as `n_samples/block_ratio`. If ``blockAn=False``, ``block_ratio`` is ignored. frac : float, default=1 This parameter is considered if dim_algo is ``twoNN``, it is ignored otherwise. Define the fraction of points in the data set used for ID calculation. By default the full data set is used. n_jobs : int or None, optional (default=None) The number of jobs to use for the computation. This works by computing each of the n_init runs in parallel. ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context. ``-1`` means using all processors. See :term:`Glossary <n_jobs>` for more details. Attributes ---------- labels_ : array [Nclus] The clustering labels assigned to each point in the data set. halos_ : array [Nclus] The clustering labels assigned to each point in the data set. Points identified as halos have label equal to zero. topography_ : array [Nclus, Nclus] Let be Nclus the number of clusters, the topography consists in a Nclus × Nclus symmetric matrix, in which the diagonal entries are the heights of the peaks and the off-diagonal entries are the heights of the saddle points. nn_distances_ : array [n_samples, k_max+1] Distances to the k_max neighbors of each points. The point itself is included in the array. nn_indices_ : array [n_samples, k_max+1] Indices of the k_max neighbors of each points. The point itself is included in the array. k_hat_ : array [n_samples] The optimal number of neighbors for which the condition of constant density holds. centers_ :array [Nclus] The clustering labels assigned to each point in the data set. dim_ : int, Intrinsic dimensionality of the sample. If ``dim`` is not provided, ``dim_`` is set to the number of features in the input file. k_max_ : int The maximum number of nearest-neighbors considered by the procedure that returns the largest number of neighbors ``k_hat`` for which the condition of constant density holds, within a given level of confidence. If the number of points in the sample `N` is less than the default value, k_max_ will be set automatically to the value ``N/2``. densities_ : array [n_samples] If not provided by the parameter ``densities``, it is computed by using the `PAk` density estimator. err_densities_ : array [n_samples] The uncertainty in the density estimation. If not provided by the parameter ``densities``, it is computed by using the `PAk` density estimator. Example ------- References ---------- M. d'Errico, E. Facco, A. Laio, A. Rodriguez, Information Sciences, Volume 560, June 2021, 476-492, https://www.sciencedirect.com/science/article/pii/S0020025521000116?dgcid=author (also available on arXiv: https://arxiv.org/abs/1802.10549) """ def __init__(self, Z=1, metric="euclidean", densities=None, err_densities=None, k_hat=None, nn_distances=None, nn_indices=None, affinity='precomputed', density_algo="PAk", k_max=1000, D_thr=23.92812698, dim=None, dim_algo="twoNN", blockAn=True, block_ratio=5, frac=1, n_jobs=None): self.Z = Z self.metric = metric self.densities = densities self.err_densities = err_densities self.k_hat = k_hat self.nn_distances = nn_distances self.nn_indices = nn_indices self.affinity = affinity self.density_algo = density_algo self.k_max = k_max self.D_thr = D_thr self.dim = dim self.dim_algo = dim_algo self.blockAn = blockAn self.block_ratio = block_ratio self.frac = frac self.n_jobs = n_jobs if metric not in VALID_METRIC and not callable(metric): raise ValueError("invalid metric: '{0}'".format(metric)) if dim_algo not in VALID_DIM: raise ValueError("invalid dim_algo: '{0}'".format(dim_algo)) if density_algo not in VALID_DENSITY: raise ValueError("invalid dim_algo: '{0}'".format(density_algo)) #if not (self.densities and self.err_densities and self.k_hat): # # TODO: decide whether to raise a worning instead and automatically run PAk. # raise ValueError("DPA requires the error estimation and optimal neighborhood along \ # with the densities. If not available, use the default PAk estimator") if self.dim_algo == "twoNN" and self.frac > 1: raise ValueError("frac should be between 0 and 1.") if self.nn_distances is not None and self.nn_indices is not None: if self.nn_distances.shape[1] != self.nn_indices.shape[1]: raise ValueError("check nn_distances and nn_indices. Mismatch in array dimension.")
[docs] def fit(self, X, y=None): """Fit the DPA clustering on the data. Parameters ---------- X : array [n_samples, n_samples] if metric == “precomputed”, or, [n_samples, n_features] otherwise The input samples. Similarities / affinities between instances if ``affinity='precomputed'``. y : Ignored Not used, present here for API consistency by convention. Returns ------- self : object Returns self. """ # Input validation X = check_array(X, accept_sparse=['csr', 'csc', 'coo'], dtype=np.float64, ensure_min_samples=2) allow_squared = self.affinity in ["precomputed", "precomputed_nearest_neighbors"] if X.shape[0] == X.shape[1] and not allow_squared: warnings.warn("The DPA clustering API has changed. ``fit``" "now constructs an affinity matrix from data. To use" " a custom affinity matrix, " "set ``affinity=precomputed``.") self.k_max_ = self.k_max self.dim_ = self.dim if not self.dim: if self.dim_algo == "auto": self.dim_ = X.shape[1] elif self.dim_algo == "twoNN": if self.block_ratio >= X.shape[0]: raise ValueError("block_ratio is larger than the sample size, the minimum size for \ block analysis would be zero. Please set a value lower than "+str(X.shape[0])) self.dim_ = twoNearestNeighbors(blockAn=self.blockAn, block_ratio=self.block_ratio, metric=self.metric, frac=self.frac, n_jobs=self.n_jobs).fit(X).dim_ else: pass # If densities, uncertainties and k_hat are provided as input, compute only the # matrix of nearest neighbor: self.densities_ = self.densities self.err_densities_ = self.err_densities self.k_hat_ = self.k_hat if self.densities_ is not None and self.err_densities_ is not None and self.k_hat_ is not None: # If the nearest neighbors matrix is precomputed: if self.nn_distances is not None and self.nn_indices is not None: self.k_max_ = max(self.k_hat_) self.nn_distances_ = self.nn_distances self.nn_indices_ = self.nn_indices else: self.k_max_ = max(self.k_hat_) if self.metric == "precomputed": nbrs = NearestNeighbors(n_neighbors=self.k_max_+1, # The point i is counted in its neighborhood algorithm="brute", metric=self.metric, n_jobs=self.n_jobs).fit(X) else: nbrs = NearestNeighbors(n_neighbors=self.k_max_+1, # The point i is counted in its neighborhood algorithm="auto", metric=self.metric, n_jobs=self.n_jobs).fit(X) self.nn_distances_, self.nn_indices_ = nbrs.kneighbors(X) elif self.density_algo == "PAk": # If the nearest neighbors matrix is precomputed: if self.nn_distances is not None and self.nn_indices is not None: self.k_max_ = self.nn_distances.shape[1]-1 PAk = PointAdaptive_kNN(k_max=self.k_max_, D_thr=self.D_thr, metric=self.metric, nn_distances=self.nn_distances, nn_indices=self.nn_indices, dim_algo=self.dim_algo, blockAn=self.blockAn, block_ratio=self.block_ratio, frac=self.frac, dim=self.dim_, n_jobs=self.n_jobs).fit(X) else: PAk = PointAdaptive_kNN(k_max=self.k_max_, D_thr=self.D_thr, metric=self.metric, dim_algo=self.dim_algo, blockAn=self.blockAn, block_ratio=self.block_ratio, frac=self.frac, dim=self.dim_, n_jobs=self.n_jobs).fit(X) self.nn_distances_ = PAk.distances_ self.nn_indices_ = PAk.indices_ self.densities_ = PAk.densities_ self.err_densities_ = PAk.err_densities_ self.k_hat_ = PAk.k_hat_ self.k_max_ = max(self.k_hat_) else: # TODO: implement option for kNN pass self.labels_, self.halos_, self.topography_, self.g_, self.centers_ = _DensityPeakAdvanced(self.densities_, self.err_densities_, self.k_hat_, self.nn_distances_, self.nn_indices_, self.Z) self.is_fitted_ = True return self
[docs] def fit_predict(self, X, y=None): """Perform DPA clustering from features or distance matrix, and return cluster labels. Parameters ---------- X : array-like or sparse matrix, shape (n_samples, n_features), or \ (n_samples, n_samples) Training instances to cluster, or distances between instances if ``metric='precomputed'``. If a sparse matrix is provided, it will be converted into a sparse ``csr_matrix``. y : Ignored Not used, present here for API consistency by convention. Returns ------- labels : ndarray, shape (n_samples,) Cluster labels. Noisy samples are given the label -1. """ self.fit(X) return self.labels_
[docs] def get_computed_params(self): params = self.get_params() for x in params: if x+"_" in self.__dict__: params[x] = self.__dict__[x+"_"] return params
[docs] def get_dendrogram(self): # Generation of SL dendrogram # Prepare some auxiliary lists e1=[] e2=[] d12=[] L=[] Li1=[] Li2=[] Ldis=[] Fmax=max(self.densities_) Nclus=np.max(self.labels_)+1 # Obtain distances in list format from topography for row in self.topography_: dis12=Fmax-row[2] e1.append(row[0]) e2.append(row[1]) d12.append(dis12) # Obtain the dendrogram in form of links nlinks=0 clnew=Nclus for j in range(Nclus-1): aa=np.argmin(d12) nlinks=nlinks+1 L.append(clnew+nlinks) Li1.append(e1[aa]) Li2.append(e2[aa]) Ldis.append(d12[aa]) #update distance matrix t=0 fe=Li1[nlinks-1] fs=Li2[nlinks-1] newname=L[nlinks-1] # list of untouched clusters unt=[] for r in d12: if ((e1[t]!=fe)&(e1[t]!=fs)): unt.append(e1[t]) if ((e2[t]!=fe)&(e2[t]!=fs)): unt.append(e2[t]) t=t+1 myset = set(unt) unt=list(myset) # Build a new distance matrix e1new=[] e2new=[] d12new=[] for j in unt: t=0 dmin=9.9E99 for r in d12: if ((e1[t]==j)|(e2[t]==j)): if ((e1[t]==fe)|(e2[t]==fe)|(e1[t]==fs)|(e2[t]==fs)): if (d12[t]<dmin): dmin=d12[t] t=t+1 e1new.append(j) e2new.append(newname) d12new.append(dmin) t=0 for r in d12: if ((unt.count(e1[t]))&(unt.count(e2[t]))): e1new.append(e1[t]) e2new.append(e2[t]) d12new.append(d12[t]) t=t+1 e1=e1new e2=e2new d12=d12new # Get the order in which the elements should be displayed sorted_elements=[] sorted_elements.append(L[nlinks-1]) for jj in range(len(L)): j=len(L)-jj-1 for i in range(len(sorted_elements)): if (sorted_elements[i]==L[j]): sorted_elements[i]=Li2[j] sorted_elements.insert(i,Li1[j]) # Get coordinates for the plot pop=np.zeros((Nclus),dtype=int) for i in range (len(self.labels_)): pop[self.labels_[i]]=pop[self.labels_[i]]+1 #print (pop) add=0. x=[] y=[] label=[] for i in range(len(sorted_elements)): label.append(sorted_elements[i]) j=self.centers_[label[i]] y.append(self.densities_[j]) x.append(add+0.5*np.log(pop[i])) add=add+np.log(pop[i]) xs=x.copy() ys=y.copy() labels=label.copy() zorder=0 for jj in range(len(L)): c1=label.index(Li1[jj]) c2=label.index(Li2[jj]) label.append(L[jj]) x.append((x[c1]+x[c2])/2.) ynew=Fmax-Ldis[jj] y.append(ynew) x1=x[c1] y1=y[c1] x2=x[c2] y2=y[c2] zorder=zorder+1 plt.plot([x1, x1], [y1, ynew], color='k', linestyle='-', linewidth=2,zorder=zorder) zorder=zorder+1 plt.plot([x2, x2], [y2, ynew], color='k', linestyle='-', linewidth=2,zorder=zorder) zorder=zorder+1 plt.plot([x1, x2], [ynew, ynew], color='k', linestyle='-', linewidth=2,zorder=zorder) zorder=zorder+1 viridis = cm.get_cmap('viridis', Nclus) #print(viridis.colors) plt.scatter (xs,ys,c=labels,s=100,zorder=zorder,cmap="viridis") for i in range (Nclus): zorder=zorder+1 cc='k' r=viridis.colors[labels[i]][0] g=viridis.colors[labels[i]][1] b=viridis.colors[labels[i]][2] luma = 0.2126 * r + 0.7152 * g + 0.0722 * b luma=luma*255 if (luma<156): cc='w' plt.annotate(labels[i],(xs[i],ys[i]),horizontalalignment='center',verticalalignment='center',zorder=zorder,c=cc,weight='bold') plt.show()