Skip to content

Commit

Permalink
Use simulated annealing to optimize cluster assignments.
Browse files Browse the repository at this point in the history
Since `ortools` is not installable on osx-arm64 in Bioconda, I implemented alternative smethods to calculate min_cost_flow.
  • Loading branch information
akikuno committed Jul 22, 2024
1 parent 32467b5 commit b07b626
Showing 1 changed file with 102 additions and 43 deletions.
145 changes: 102 additions & 43 deletions src/DAJIN2/core/clustering/constrained_kmeans.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
"""Constrained k-means clustering
Bradley, Paul S., Kristin P. Bennett, and Ayhan Demiriz. "Constrained k-means clustering." Microsoft Research, Redmond 20.0 (2000): 0.
The implementation is referenced from @kuga-qiita. (https://qiita.com/kuga-qiita/items/5588d5469f3268b7fd39#constrained-k-means-1)
The implementation is modified from @kuga-qiita. (https://qiita.com/kuga-qiita/items/5588d5469f3268b7fd39#constrained-k-means-1)
"""

from __future__ import annotations

from functools import partial

import numpy as np
from ortools.graph.python import min_cost_flow
from scipy.spatial.distance import cdist


Expand All @@ -22,14 +21,37 @@ def random(
return centers, indices


def kmeans_plusplus(X, n_clusters, random_state):
n_samples, _ = X.shape
centers = np.empty((n_clusters, X.shape[1]), dtype=X.dtype)
indices = np.empty(n_clusters, dtype=int)

center_id = random_state.choice(n_samples)
centers[0] = X[center_id]
indices[0] = center_id

closest_dist_sq = np.full(n_samples, np.inf)
for i in range(1, n_clusters):
dist_sq = np.sum((X - centers[i - 1]) ** 2, axis=1)
closest_dist_sq = np.minimum(closest_dist_sq, dist_sq)
probs = closest_dist_sq / np.sum(closest_dist_sq)
cumulative_probs = np.cumsum(probs)
r = random_state.rand()
center_id = np.searchsorted(cumulative_probs, r)
centers[i] = X[center_id]
indices[i] = center_id

return centers, indices


class ConstrainedKMeans:
def __init__(
self,
n_clusters: int,
min_cluster_size: int,
max_cluster_size: int | None = None,
random_state: int = 0,
init: str = "random",
init: str = "k-means++",
n_init: int = 10,
max_iter: int = 300,
tol: float = 1e-4,
Expand Down Expand Up @@ -57,6 +79,13 @@ def __init__(
random_state=self.random_state,
**kwargs,
)
elif init == "k-means++":
self.init = partial(
kmeans_plusplus,
n_clusters=self.n_clusters,
random_state=self.random_state,
**kwargs,
)

self.n_init = n_init
self.max_iter = max_iter
Expand All @@ -65,7 +94,7 @@ def __init__(
self.centers = None
self.labels = None

def get_smcf_params(self, n_samples: int) -> tuple[np.ndarray, np.ndarray, np.ndarray, list[int]]:
def get_lp_params(self, n_samples: int) -> tuple[np.ndarray, np.ndarray, np.ndarray, list[float]]:
X_nodes = np.arange(n_samples)
cluster_nodes = np.arange(n_samples, n_samples + self.n_clusters)
artificial_demand_node = np.array([n_samples + self.n_clusters])
Expand Down Expand Up @@ -96,59 +125,92 @@ def get_smcf_params(self, n_samples: int) -> tuple[np.ndarray, np.ndarray, np.nd
).tolist()
return start_nodes, end_nodes, capacities, supplies

def calc_unit_costs(self, X: np.ndarray, centers: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
def simulated_annealing(self, X: np.ndarray, centers: np.ndarray) -> np.ndarray:
"""
Perform simulated annealing to optimize cluster assignments.
This method uses simulated annealing to assign data points to clusters,
allowing for probabilistic acceptance of worse assignments to escape local optima.
Parameters:
X (np.ndarray): The input data array of shape (n_samples, n_features).
centers (np.ndarray): The initial cluster centers of shape (n_clusters, n_features).
Returns:
np.ndarray: A mask array of shape (n_samples, n_clusters) indicating cluster assignments.
"""
n_samples, n_clusters = X.shape[0], self.n_clusters
dist_sq = cdist(X, centers, "sqeuclidean")
unit_costs = np.concatenate([dist_sq.flatten(), np.zeros(self.n_clusters)])
return unit_costs, dist_sq
labels = np.argmin(dist_sq, axis=1)

def clustering(
self,
start_nodes: np.ndarray,
end_nodes: np.ndarray,
capacities: np.ndarray,
supplies: list[int],
unit_costs: np.ndarray,
) -> np.ndarray:
smcf = min_cost_flow.SimpleMinCostFlow()
all_arcs = smcf.add_arcs_with_capacity_and_unit_cost(start_nodes, end_nodes, capacities, unit_costs)
smcf.set_nodes_supplies(np.arange(len(supplies)), supplies)
smcf.solve()
solution_flows = smcf.flows(all_arcs)
mask = solution_flows[: -self.n_clusters].reshape((-1, self.n_clusters))
T = 1.0
T_min = 0.0001
alpha = 0.9

while T > T_min:
# Vectorize sample processing and compute in batches
idx = np.random.randint(0, n_samples, size=1000)
current_labels = labels[idx]
new_labels = np.random.randint(0, n_clusters, size=1000)

# Select only the samples where the new label differs from the current label
mask = current_labels != new_labels
idx = idx[mask]
current_labels = current_labels[mask]
new_labels = new_labels[mask]

if len(idx) == 0:
T *= alpha
continue

current_dists = dist_sq[idx, current_labels]
new_dists = dist_sq[idx, new_labels]

delta_E = new_dists - current_dists
prob = np.exp(-delta_E / T)
accept = (delta_E < 0) | (prob > np.random.rand(len(prob)))

labels[idx[accept]] = new_labels[accept]

T *= alpha

mask = np.zeros((n_samples, n_clusters))
mask[np.arange(n_samples), labels] = 1
return mask

def fit_once(self, X: np.ndarray, **kwargs) -> tuple[np.ndarray, float]:
centers_, _ = self.init(X, **kwargs)
dist_sq = cdist(X, centers_, "sqeuclidean")
mask = self.simulated_annealing(X, centers_)
inertia = np.sum(mask * dist_sq)
return centers_, inertia

def fit(self, X: np.ndarray) -> ConstrainedKMeans:
self.valid_tr(X)
if X.ndim == 1:
X = X[:, np.newaxis]
n_samples, _ = X.shape
params = self.get_smcf_params(n_samples)

best_inertia, centers, self.init_indices = None, None, None
best_inertia, best_centers = None, None
for _ in range(self.n_init):
centers_, init_indices_ = self.init(X, **self.kwargs)
unit_costs, dist_sq = self.calc_unit_costs(X, centers_)
mask = self.clustering(*params, unit_costs)
inertia = np.sum(mask * dist_sq)
if best_inertia is None or best_inertia > inertia:
centers_, inertia = self.fit_once(X, **self.kwargs)
if best_inertia is None or inertia < best_inertia:
best_inertia = inertia
centers = centers_
self.init_indices = init_indices_
best_centers = centers_
self.centers = best_centers

unit_costs, dist_sq = self.calc_unit_costs(X, centers)
dist_sq = cdist(X, self.centers, "sqeuclidean")

for i in range(self.max_iter):
mask = self.clustering(*params, unit_costs)
mask = self.simulated_annealing(X, self.centers)
centers_ = np.dot(mask.T, X) / np.sum(mask, axis=0)[:, np.newaxis]
unit_costs, dist_sq = self.calc_unit_costs(X, centers_)
centers_squared_diff = np.sum((centers_ - centers) ** 2)
centers = centers_
dist_sq = cdist(X, centers_, "sqeuclidean")
centers_squared_diff = np.sum((centers_ - self.centers) ** 2)
self.centers = centers_
iter_ = i
if centers_squared_diff <= self.tol:
break

self.inertia = np.sum(mask * dist_sq)
self.centers = centers
self.labels = np.argmax(mask, axis=-1)
self.iter_ = iter_
return self
Expand All @@ -160,10 +222,7 @@ def predict(self, X: np.ndarray) -> np.ndarray:
self.valid_pr(X)
if X.ndim == 1:
X = X[:, np.newaxis]
n_samples, _ = X.shape
params = self.get_smcf_params(n_samples)
unit_costs = self.calc_unit_costs(X, self.centers)
mask = self.clustering(*params, unit_costs)
mask = self.simulated_annealing(X, self.centers)
labels = np.argmax(mask, axis=1)
return labels

Expand All @@ -186,8 +245,8 @@ def valid(
raise TypeError("max_cluster_size must be an integer or None")
if not isinstance(random_state, int) and random_state is not None:
raise TypeError("random_state must be an integer")
if init not in ["random"]:
raise TypeError("init must be 'random'")
if init not in ["random", "k-means++"]:
raise TypeError("init must be 'random' or 'k-means++'")
if not isinstance(n_init, int):
raise TypeError("n_init must be an integer")
if not isinstance(max_iter, int):
Expand Down

0 comments on commit b07b626

Please sign in to comment.