PROWAREtech
C/C++: Multi-threaded Merge Sort Algorithm
See related: Quick Sort Parallel Processing Example
Merge sort is a good design for multi-threaded sorting because it allocates sub-arrays during the merge procedure thereby avoiding data collisions. This implementation breaks the array up into separate ranges and then runs its algorithm on each of them, but the data must be merged (sorted) in the end by the main thread. The more threads there are, the more unsorted the second to last array is thereby causing the final merge to take longer!! The following second example is the version taught by the Massachusetts Institute of Technology. And it's even worse.
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#if defined (WIN32) || (_WIN64)
#include <windows.h>
#define pthread_t DWORD
#define pthread_create(THREAD_ID_PTR, ATTR, ROUTINE, PARAMS) CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)ROUTINE,(void*)PARAMS,0,THREAD_ID_PTR)
#define sleep(ms) Sleep(ms)
#else // Linux
#include <pthread.h>
#include <unistd.h>
#endif
// thread parameters
struct TASK
{
int low;
int high;
int busy;
int* a;
};
// merge function for merging two parts
void merge(int* a, int low, int mid, int high)
{
// n1 is size of left side and n2 is size of right side
int n1 = mid - low + 1;
int n2 = high - mid;
int* left = (int*)malloc(n1 * sizeof(int));
int* right = (int*)malloc(n2 * sizeof(int));
int i;
int j;
// storing values in left part
for (i = 0; i < n1; i++)
left[i] = a[i + low];
// storing values in right part
for (i = 0; i < n2; i++)
right[i] = a[i + mid + 1];
int k = low;
i = j = 0;
// merge left and right in ascending order
while (i < n1 && j < n2)
{
if (left[i] <= right[j])
a[k++] = left[i++];
else
a[k++] = right[j++];
}
// insert remaining values from left
while (i < n1)
a[k++] = left[i++];
// insert remaining values from right
while (j < n2)
a[k++] = right[j++];
free(left);
free(right);
}
// merge sort function
void merge_sort(int* a, int low, int high)
{
// calculating mid point of array
int mid = low + (high - low) / 2;
if (low < high)
{
// call 1st half
merge_sort(a, low, mid);
// call 2nd half
merge_sort(a, mid + 1, high);
// merge 1st and 2nd halves
merge(a, low, mid, high);
}
}
// thread function
void* merge_sort_thread(void* arg)
{
TASK* task = (TASK*)arg;
int low;
int high;
// calculating low and high
low = task->low;
high = task->high;
// evaluating mid point
int mid = low + (high - low) / 2;
if (low < high)
{
merge_sort(task->a, low, mid);
merge_sort(task->a, mid + 1, high);
merge(task->a, low, mid, high);
}
task->busy = 0;
return 0;
}
// driver
int main(int argc, char** argv)
{
char* sz;
int MAX_ARRAY_ELEMENTS = 2000;
int MAX_THREADS = 1;
// parse command line arguments
for (--argc, ++argv; argc > 0; --argc, ++argv)
{
sz = *argv;
if (*sz != '-')
break;
switch (sz[1])
{
case 'A': // array max
MAX_ARRAY_ELEMENTS = atoi(sz + 2);
break;
case 'T': // thread count
MAX_THREADS = atoi(sz + 2);
break;
}
}
printf("\n\nArray[%d]\nThreads[%d]", MAX_ARRAY_ELEMENTS, MAX_THREADS);
// allocate the array
int* array = (int*)malloc(sizeof(int) * MAX_ARRAY_ELEMENTS);
// generating random values in array
srand(clock());
for (int i = 0; i < MAX_ARRAY_ELEMENTS; i++)
array[i] = rand();
printf("\n\nArray Randomized");
pthread_t* threads = (pthread_t*)malloc(sizeof(pthread_t) * MAX_THREADS);
TASK* tasklist = (TASK*)malloc(sizeof(TASK) * MAX_THREADS);
int len = MAX_ARRAY_ELEMENTS / MAX_THREADS;
TASK* task;
int low = 0;
clock_t time = clock();
for (int i = 0; i < MAX_THREADS; i++, low += len)
{
task = &tasklist[i];
task->low = low;
task->high = low + len - 1;
if (i == (MAX_THREADS - 1))
task->high = MAX_ARRAY_ELEMENTS - 1;
}
// create the threads
for (int i = 0; i < MAX_THREADS; i++)
{
task = &tasklist[i];
task->a = array;
task->busy = 1;
pthread_create(&threads[i], 0, merge_sort_thread, task);
}
// wait for all threads
for (int i = 0; i < MAX_THREADS; i++)
while (tasklist[i].busy)
sleep(50);
TASK* taskm = &tasklist[0];
for (int i = 1; i < MAX_THREADS; i++)
{
TASK* task = &tasklist[i];
merge(taskm->a, taskm->low, task->low - 1, task->high);
}
printf("\n\nSorted in %f Seconds", (clock() - time) / 1000.0L);
int last = 0;
for (int i = 0; i < MAX_ARRAY_ELEMENTS; i++)
{
if (array[i] < last)
{
printf("\n\nArray Not Sorted");
return 0;
}
last = array[i];
}
printf("\n\nArray Sorted");
if (MAX_ARRAY_ELEMENTS < 50)
for (int i = 0; i < MAX_ARRAY_ELEMENTS; i++)
printf(" %d", array[i]);
printf("\n");
free(tasklist);
free(threads);
return 0;
}
Here is another implementation that runs even slower on multi-threaded CPU's. The source of this code is from the Introduction to Algorithms, 3rd Edition, by Thomas H. Cormen, The MIT Press, Massachusetts Institute of Technology (http://mitpress.mit.edu). While the algorithm does properly sort, it does so very slowly when several threads are specified. It is fastest when a single thread is used.
// ALL CAPS VARIABLES ARE DATA ARRAYS
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <thread>
#if defined (WIN32) || (_WIN64)
#include <windows.h>
#define pthread_t DWORD
#define pthread_create(THREAD_ID_PTR, ATTR, ROUTINE, PARAMS) CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)ROUTINE,(void*)PARAMS,0,THREAD_ID_PTR)
#define sleep(ms) Sleep(ms)
#else // Linux
#include <pthread.h>
#include <unistd.h>
#endif
int MAX_THREADS = std::thread::hardware_concurrency();
int thread_count = 1;
// a basic binary search
int binary_search(double val, double* array, int p, int r)
{
int high = p > (r + 1) ? p : (r + 1);
while (p < high)
{
int mid = (p + high) / 2;
if (val <= array[mid])
high = mid;
else
p = mid + 1;
}
return high;
}
void swap(int& a, int& b)
{
int tmp = a;
a = b;
b = tmp;
}
class p_merge_args
{
public:
double* TMP, * A;
int p1, r1, p2, r2, p3, busy;
p_merge_args(double* TMP, int p1, int r1, int p2, int r2, double* A, int p3, int thread) : TMP(TMP), p1(p1), r1(r1), p2(p2), r2(r2), A(A), p3(p3), busy(thread) {}
~p_merge_args() { TMP = nullptr; }
};
// parallel merge function
void p_merge(p_merge_args *args)
{
int n1 = args->r1 - args->p1 + 1;
int n2 = args->r2 - args->p2 + 1;
if (n1 < n2)
{
swap(args->p1, args->p2);
swap(args->r1, args->r2);
swap(n1, n2);
}
if (n1 != 0)
{
int q1 = (args->p1 + args->r1) / 2;
int q2 = binary_search(args->TMP[q1], args->TMP, args->p2, args->r2);
int q3 = args->p3 + (q1 - args->p1) + (q2 - args->p2);
args->A[q3] = args->TMP[q1];
p_merge_args* args1 = new p_merge_args(args->TMP, args->p1, q1 - 1, args->p2, q2 - 1, args->A, args->p3, false);
if (thread_count >= MAX_THREADS)
p_merge(args1);
else
{
args1->busy = ++thread_count;
pthread_create(0, 0, p_merge, args1); // spawn
}
p_merge_args args2(args->TMP, q1 + 1, args->r1, q2, args->r2, args->A, q3 + 1, false);
p_merge(&args2);
while (args1->busy) // sync
sleep(5);
delete args1;
}
if (args->busy)
{
args->busy = 0;
thread_count--;
}
}
class p_merge_sort_args
{
public:
double* A, * B;
int p, r, s, busy;
p_merge_sort_args(double* A, int p, int r, double* B, int s, int thread) : A(A), p(p), r(r), B(B), s(s), busy(thread) {}
~p_merge_sort_args() { A = B = nullptr; }
};
// parallel merge sort function
void p_merge_sort(p_merge_sort_args *args)
{
int n = args->r - args->p + 1;
if (n == 1)
args->B[args->s] = args->A[args->p];
else
{
double* TMP = new double[n + 1];
int q1 = (args->p + args->r) / 2;
int q2 = q1 - args->p + 1;
p_merge_sort_args* args1 = new p_merge_sort_args(args->A, args->p, q1, TMP, 1, false);
if(thread_count >= MAX_THREADS)
p_merge_sort(args1);
else
{
args1->busy = ++thread_count;
pthread_create(0, 0, p_merge_sort, args1); // spawn
}
p_merge_sort_args args2(args->A, q1 + 1, args->r, TMP, q2 + 1, false);
p_merge_sort(&args2);
while (args1->busy) // sync
sleep(5);
delete args1;
p_merge_args* merge_args = new p_merge_args(TMP, 1, q2, q2 + 1, n, args->B, args->s, false);
p_merge(merge_args);
delete[]TMP;
}
if (args->busy)
{
args->busy = 0;
thread_count--;
}
}
// driver
int main(int argc, char** argv)
{
char* sz;
int MAX_ARRAY_ELEMENTS = 1000;
// parse command line arguments
for (--argc, ++argv; argc > 0; --argc, ++argv)
{
sz = *argv;
if (*sz != '-')
break;
switch (sz[1])
{
case 'A': // array max size
MAX_ARRAY_ELEMENTS = atoi(sz + 2);
break;
case 'T': // maximum thread count
MAX_THREADS = atoi(sz + 2);
break;
}
}
printf("\n\nArray[%d]", MAX_ARRAY_ELEMENTS);
// allocate the arrays
double* A = new double[MAX_ARRAY_ELEMENTS];
double* B = new double[MAX_ARRAY_ELEMENTS];
// generating random values in array to be sorted
srand(clock());
for (int i = 0; i < MAX_ARRAY_ELEMENTS; i++)
A[i] = (double)rand();
printf("\n\nArray Randomized");
clock_t time = clock();
p_merge_sort_args args(A, 0, MAX_ARRAY_ELEMENTS - 1, B, 0, false); // A (source array), start index, end index, B (destination array), start index, thread ID (or false)
p_merge_sort(&args);
printf("\n\nSorted in %f Seconds", (clock() - time) / 1000.0L);
double last = 0.0;
for (int i = 0; i < MAX_ARRAY_ELEMENTS; i++)
{
if (B[i] < last)
{
printf("\n\nArray Not Sorted");
return 0;
}
last = B[i];
}
printf("\n\nArray Sorted");
if (MAX_ARRAY_ELEMENTS < 50)
for (int i = 0; i < MAX_ARRAY_ELEMENTS; i++)
printf(" %f", B[i]);
printf("\n");
delete[]B;
delete[]A;
return 0;
}