Taskflow  2.4-master-branch
k-means Clustering

We study a fundamental clustering problem in unsupervised learning, k-means clustering. We will begin by discussing the problem formulation and then learn how to write a parallel k-means algorithm.

Problem Formulation

k-means clustering uses centroids, k different randomly-initiated points in the data, and assigns every data point to the nearest centroid. After every point has been assigned, the centroid is moved to the average of all of the points assigned to it. We describe the k-means algorithm in the following steps:

  • Step 1: initialize k random centroids
  • Step 2: for every data point, find the nearest centroid (L2 distance or other measurements) and assign the point to it
  • Step 3: for every centroid, move the centroid to the average of the points assigned to that centroid
  • Step 4: go to Step 2 until converged (no more changes in the last few iterations) or maximum iterations reached

The algorithm is illustrated as follows:

kmeans_1.png

A sequential implementation of k-means is described as follows:

// sequential implementation of k-means on a CPU
// N: number of points, K: number of clusters, M: number of iterations, px/py: 2D point vector
void kmeans_seq(int N, int K, int M, cconst std::vector<float>& px, const std::vector<float>& py) {
std::vector<float> sx(K), sy(K), mx(K), my(K);
// initial centroids
std::copy_n(px.begin(), K, mx.begin());
std::copy_n(py.begin(), K, my.begin());
// k-means iteration
for(int m=0; m<M; m++) {
// clear the storage
std::fill_n(sx.begin(), K, 0.0f);
std::fill_n(sy.begin(), K, 0.0f);
std::fill_n(c.begin(), K, 0);
// find the best k (cluster id) for each point
for(int i=0; i<N; ++i) {
float x = px[i];
float y = py[i];
int best_k = 0;
for (int k = 0; k < K; ++k) {
const float d = L2(x, y, mx[k], my[k]);
if (d < best_d) {
best_d = d;
best_k = k;
}
}
sx[best_k] += x;
sy[best_k] += y;
c [best_k] += 1;
}
// update the centroid
for(int k=0; k<K; k++) {
const int count = max(1, c[k]); // turn 0/0 to 0/1
mx[k] = sx[k] / count;
my[k] = sy[k] / count;
}
}
// print the k centroids found
for(int k=0; k<K; ++k) {
std::cout << "centroid " << k << ": " << std::setw(10) << mx[k] << ' '
<< std::setw(10) << my[k] << '\n';
}
}

Parallel k-means using CPUs

The second step of k-means algorithm, assigning every point to the nearest centroid, is highly parallelizable across individual points. We can create a parallel-for graph to run this step in parallel. Assuming we request P tasks, each task works on a set of points of size (N + P - 1) / P.

std::vector<int> best_ks(N); // nearest centroid of each point
unsigned P = 12; // 12 partitioned tasks
// update cluster
taskflow.parallel_for(0, N, 1, [&](int i){
float x = px[i];
float y = py[i];
int best_k = 0;
for (int k = 0; k < K; ++k) {
const float d = L2(x, y, mx[k], my[k]);
if (d < best_d) {
best_d = d;
best_k = k;
}
}
best_ks[i] = best_k;
}, (N + P - 1)/ P);

The third step of moving every centroid to the average of points is also parallelizable across individual centroids. However, since k is typically not large, one task of doing this update is sufficient.

taskflow.emplace([&](){
// sum of points
for(int i=0; i<N; i++) {
sx[best_ks[i]] += px[i];
sy[best_ks[i]] += py[i];
c [best_ks[i]] += 1;
}
// average of points
for(int k=0; k<K; ++k) {
auto count = max(1, c[k]); // turn 0/0 to 0/1
mx[k] = sx[k] / count;
my[k] = sy[k] / count;
}
});

To describe M iterations, we create a condition task that loops the second step of the algorithm by M times. The return value of zero goes to the first successor which we will connect to the task of the second step later; otherwise, k-means completes.

taskflow.emplace([m=0, M]() mutable {
return (m++ < M) ? 0 : 1;
});

The entire code of CPU-parallel k-means is shown below. Here we use an additional storage, best_ks, to record the nearest centroid of a point at an iteration.

// N: number of points, K: number of clusters, M: number of iterations, px/py: 2D point vector
void kmeans_par(int N, int K, int M, cconst std::vector<float>& px, const std::vector<float>& py) {
unsigned P = 12; // 12 partitions of the parallel-for graph
tf::Executor executor;
tf::Taskflow taskflow("K-Means");
std::vector<int> c(K), best_ks(N);
std::vector<float> sx(K), sy(K), mx(K), my(K);
// initial centroids
tf::Task init = taskflow.emplace([&](){
for(int i=0; i<K; ++i) {
mx[i] = px[i];
my[i] = py[i];
}
}).name("init");
// clear the storage
tf::Task clean_up = taskflow.emplace([&](){
for(int k=0; k<K; ++k) {
sx[k] = 0.0f;
sy[k] = 0.0f;
c [k] = 0;
}
}).name("clean_up");
tf::Task S, T;
// update cluster
std::tie(S, T) = taskflow.parallel_for(0, N, 1, [&](int i){
float x = px[i];
float y = py[i];
int best_k = 0;
for (int k = 0; k < K; ++k) {
const float d = L2(x, y, mx[k], my[k]);
if (d < best_d) {
best_d = d;
best_k = k;
}
}
best_ks[i] = best_k;
}, (N + P - 1) / P);
S.name("beg_assign_cluster");
S.for_each_successor([i=0](tf::Task t) mutable {
t.name(std::string("partition") + std::to_string(i++));
});
T.name("end_assign_cluster");
tf::Task update_cluster = taskflow.emplace([&](){
for(int i=0; i<N; i++) {
sx[best_ks[i]] += px[i];
sy[best_ks[i]] += py[i];
c [best_ks[i]] += 1;
}
for(int k=0; k<K; ++k) {
auto count = max(1, c[k]); // turn 0/0 to 0/1
mx[k] = sx[k] / count;
my[k] = sy[k] / count;
}
}).name("update_cluster");
// convergence check
tf::Task condition = taskflow.emplace([m=0, M]() mutable {
return (m++ < M) ? 0 : 1;
}).name("converged?");
init.precede(clean_up);
clean_up.precede(S);
T.precede(update_cluster);
condition.precede(clean_up)
.succeed(update_cluster);
executor.run(taskflow).wait();
}
kmeans_2.svg

The second step of the algorithm consists of two parts, a clean_up task and a parallel-for graph. The former cleans up the storage sx, sy, and c that are used to average points for new centroids, and the later parallelizes the searching for nearest centroids across individual points using 12 tasks. If the iteration count is smaller than M, the condition task returns 0 to let the execution path go back to clean_up. Otherwise, it returns 1 to stop, since there is no successor indexable by 1.

Parallel k-means using GPUs

We observe Step 2 and Step 3 of the algorithm are parallelizable across individual points for use to harness the power of GPU:

  1. for every data point, find the nearest centroid (L2 distance or other measurements) and assign the point to it
  2. for every centroid, move the centroid to the average of the points assigned to that centroid.

At a fine-grained level, we request one GPU thread to work on one point for Step 2 and one GPU thread to work on one centroid for Step 3.

// px/py: 2D points, N: number of points, mx/my: centroids, K: number of clusters
// sx/sy/c: storage to compute the average
__global__ void assign_clusters(
float* px, float* py, int N, float* mx, float* my, float* sx, float* sy, int K, int* c
) {
const int index = blockIdx.x * blockDim.x + threadIdx.x;
if (index >= N) {
return;
}
// Make global loads once.
float x = px[index];
float y = py[index];
float best_dance = FLT_MAX;
int best_k = 0;
for (int k = 0; k < K; ++k) {
float d = L2(x, y, mx[k], my[k]);
if (d < best_d) {
best_d = d;
best_k = k;
}
}
atomicAdd(&sx[best_k], x);
atomicAdd(&sy[best_k], y);
atomicAdd(&c [best_k], 1);
}
// mx/my: centroids, sx/sy/c: storage to compute the average
__global__ void compute_new_means(float* mx, float* my, float* sx, float* sy, int* c
) {
int k = threadIdx.x;
int count = max(1, c[k]); // turn 0/0 to 0/1
mx[k] = sx[k] / count;
my[k] = sy[k] / count;
}

When we recompute the cluster centroids to be the mean of all points assigned to a particular centroid, multiple GPU threads may access the sum arrays, sx and sy, and the count array, c. To avoid data race, we use a simple atomicAdd method. Based on the two kernels, the entire code of CPU-GPU collaborative tasking is described as follows:

// N: number of points, K: number of clusters, M: number of iterations, px/py: 2D point vector
void kmeans_gpu(int N, int K, int M, cconst std::vector<float>& px, const std::vector<float>& py) {
std::vector<float> h_mx, h_my;
float *d_px, *d_py, *d_mx, *d_my, *d_sx, *d_sy, *d_c;
for(int i=0; i<K; ++i) {
h_mx.push_back(h_px[i]);
h_my.push_back(h_py[i]);
}
// create a taskflow graph
tf::Executor executor;
tf::Taskflow taskflow("K-Means");
// allocate GPU memory
tf::Task allocate_px = taskflow.emplace([&](){ cudaMalloc(&d_px, N*sizeof(float)); })
.name("allocate_px");
tf::Task allocate_py = taskflow.emplace([&](){ cudaMalloc(&d_py, N*sizeof(float)); })
.name("allocate_py");
tf::Task allocate_mx = taskflow.emplace([&](){ cudaMalloc(&d_mx, K*sizeof(float)); })
.name("allocate_mx");
tf::Task allocate_my = taskflow.emplace([&](){ cudaMalloc(&d_my, K*sizeof(float)); })
.name("allocate_my");
tf::Task allocate_sy = taskflow.emplace([&](){ cudaMalloc(&d_sy, K*sizeof(float)); })
.name("allocate_sy");
tf::Task allocate_c = taskflow.emplace([&](){ cudaMalloc(&d_c, K*sizeof(float)); })
.name("allocate_c");
// transfer data from the host to the GPU
tf::Task h2d = taskflow.emplace([&](tf::cudaFlow& cf){
cf.copy(d_px, h_px.data(), N).name("h2d_px");
cf.copy(d_py, h_py.data(), N).name("h2d_py");
cf.copy(d_mx, h_mx.data(), K).name("h2d_mx");
cf.copy(d_my, h_my.data(), K).name("h2d_my");
}).name("h2d");
// GPU task graph of the main k-means body
tf::Task kmeans = taskflow.emplace([&](tf::cudaFlow& cf){
tf::cudaTask zero_c = cf.zero(d_c, K).name("zero_c");
tf::cudaTask zero_sx = cf.zero(d_sx, K).name("zero_sx");
tf::cudaTask zero_sy = cf.zero(d_sy, K).name("zero_sy");
tf::cudaTask cluster = cf.kernel(
(N+1024-1) / 1024, 1024, 0, assign_clusters, d_px, d_py, N, d_mx, d_my, d_sx, d_sy, K, d_c
).name("cluster");
tf::cudaTask new_centroid = cf.kernel(
1, K, 0, compute_new_means, d_mx, d_my, d_sx, d_sy, d_c
).name("new_centroid");
cluster.precede(new_centroid)
.succeed(zero_c, zero_sx, zero_sy);
}).name("update_means");
// condition task to check convergence
tf::Task condition = taskflow.emplace([i=0, M] () mutable {
return i++ < M ? 0 : 1;
}).name("converged?");
// transfer the result of clusters from GPU to host
tf::Task stop = taskflow.emplace([&](tf::cudaFlow& cf){
cf.copy(h_mx.data(), d_mx, K).name("d2h_mx");
cf.copy(h_my.data(), d_my, K).name("d2h_my");
}).name("d2h");
// deallocated GPU memory
tf::Task free = taskflow.emplace([&](){
cudaFree(d_px);
cudaFree(d_py);
cudaFree(d_mx);
cudaFree(d_my);
cudaFree(d_sx);
cudaFree(d_sy);
cudaFree(d_c);
}).name("free");
// build up the dependency
h2d.succeed(allocate_px, allocate_py, allocate_mx, allocate_my);
kmeans.succeed(allocate_sx, allocate_sy, allocate_c, h2d)
.precede(condition);
condition.precede(kmeans, stop);
stop.precede(free);
// dump the taskflow without expanding GPU task graphs
taskflow.dump(std::cout);
// run the taskflow
executor.run(taskflow).wait();
// dump the entire taskflow
taskflow.dump(std::cout);
}

The first dump before executing the taskflow produces the following diagram. The condition tasks introduces a cycle between itself and update_means. Each time it goes back to update_means, the cudaFlow is reconstructed with captured parameters in the closure and offloaded to the GPU.

kmeans_3.svg

The second dump after executing the taskflow produces the following diagram, with all cudaFlows expanded:

kmeans_4.svg

The main cudaFlow task, update_means, must not run before all required data has settled down. It precedes a condition task that circles back to itself until we reach M iterations. When iteration completes, the condition task directs the execution path to the cudaFlow, h2d, to copy the results of clusters to h_mx and h_my and then deallocate all GPU memory.

Built-in Predicate

We observe the GPU task graph parameters remain unchanged across all k-means iterations. In this case, we can leverage tf::cudaFlow::repeat or tf::cudaFlow::predicate to create the cudaFlow once and launch it repeatedly as rapidly as possible without participating in conditional tasking.

tf::Task kmeans = taskflow.emplace([&](tf::cudaFlow& cf){
tf::cudaTask zero_c = cf.zero(d_c, K).name("zero_c");
tf::cudaTask zero_sx = cf.zero(d_sx, K).name("zero_sx");
tf::cudaTask zero_sy = cf.zero(d_sy, K).name("zero_sy");
tf::cudaTask cluster = cf.kernel(
(N+1024-1) / 1024, 1024, 0,
assign_clusters, d_px, d_py, N, d_mx, d_my, d_sx, d_sy, K, d_c
).name("cluster");
tf::cudaTask new_centroid = cf.kernel(
1, K, 0,
compute_new_means, d_mx, d_my, d_sx, d_sy, d_c
).name("new_centroid");
cluster.precede(new_centroid)
.succeed(zero_c, zero_sx, zero_sy);
// we ask the executor to launch the cudaFlow by M times;
// equivalent to: cf.predicate( [m=M] () mutable { return m-- == 0; } );
cf.repeat(M);
}).name("update_means");
// ...
// build up the dependency
h2d.succeed(allocate_px, allocate_py, allocate_mx, allocate_my);
kmeans.succeed(allocate_sx, allocate_sy, allocate_c, h2d)
.precede(stop);
stop.precede(free);

At the last line of the cudaFlow closure, we call cf.repeat(M) to ask the executor to repeatedly run the cudaFlow by M times. Compared with the version using conditional tasking, the cudaFlow here is created only one time and thus the overhead is reduced.

kmeans_5.svg

We can see from the above taskflow the condition task is removed. Not all problems can benefit from tf::cudaFlow::repeat. If the graph parameters change at each iteration, you may use conditional tasking or resort to other client-side decision.

Benchmarking

We run three versions of k-means, sequential CPU, parallel CPUs, and one GPU, on a machine of 6 Intel i7-8700 CPUs at 3.20GHz and a Nvidia RTX 2080 GPU using various numbers of 2D point counts and iterations.

N K M CPU Sequential CPU Parallel GPU (conditional taksing) GPU (with predicate)
10 5 10 0.14 ms 77 ms 1 ms 1 ms
100 10 100 0.56 ms 86 ms 7 ms 1 ms
1000 10 1000 10 ms 98 ms 55 ms 13 ms
10000 10 10000 1006 ms 713 ms 458 ms 183 ms
100000 10 100000 102483 ms 49966 ms 7952 ms 4725 ms

When the number of points is larger than 10K, both parallel CPU and GPU implementations start to pick up the speed over than the sequential version. We can see using the built-in predicate of cudaFlow is two times faster than conditional tasking.