ps_homeworks/dn6/server.c

222 lines
5.1 KiB
C

#include <stdlib.h>
#include <stdio.h>
#include <mpi.h>
#include <execinfo.h>
#include <signal.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#define ARRAY_SIZE 1000000
#define DEBUG 0
void segfault_handler(int sig) {
void *array[10];
size_t size;
size = backtrace(array, 10);
fprintf(stderr, "[-] Error: signal %d:\n", sig);
backtrace_symbols_fd(array, size, STDERR_FILENO);
exit(1);
}
double* setup_send_array(int size) {
double *arr = (double *)malloc(size * sizeof(double));
if(arr == NULL) {
printf("[-] Error: malloc failed!\n");
exit(1);
}
for (int i = 0; i < size; i++) {
arr[i] = (double)rand() / (double)RAND_MAX;
}
return arr;
}
double* setup_recv_array(int n_elements) {
double *recv_arr = (double *)malloc(n_elements * sizeof(double));
if(recv_arr == NULL) {
printf("[-] Error: malloc failed!\n");
exit(1);
}
for (int i = 0; i < n_elements; i++) {
recv_arr[i] = 0.0;
}
return recv_arr;
}
void setup_displacements_and_counts(int *displacements, int *counts, int ntasks) {
for(int i = 0; i < ntasks; i++) {
displacements[i] = (int) (ARRAY_SIZE / ntasks * i); // my_start
counts[i] = (int) (ARRAY_SIZE / ntasks * (i + 1)) - displacements[i]; // my_end - my_start
}
}
int compare_doubles(const void *a, const void *b) {
const double *da = (const double *) a;
const double *db = (const double *) b;
return (*da > *db) - (*da < *db);
}
double *finalize_sort(double* arr, int arr_size, int *displacements, int n_tasks) {
double *temp_arr = (double *)malloc(arr_size * sizeof(double));
int *temp_displacements = (int *)malloc(n_tasks * sizeof(int));
if(temp_arr == NULL || temp_displacements == NULL) {
printf("[-] Error: malloc failed!\n");
exit(1);
}
for(int i = 0; i < n_tasks; i++)
temp_displacements[i] = displacements[i];
for(int i = 0; i < arr_size; i++) {
int min_displacement_task_ix = 0;
for(int j = 0; j < n_tasks; j++) {
if(j != n_tasks -1 && temp_displacements[j] == displacements[j+1])
continue;
else if(j == n_tasks && temp_displacements[j] == arr_size -1)
continue;
if(arr[temp_displacements[min_displacement_task_ix]] > arr[temp_displacements[j]])
min_displacement_task_ix = j;
}
temp_arr[i] = arr[temp_displacements[min_displacement_task_ix]];
temp_displacements[min_displacement_task_ix]++;
}
return temp_arr;
}
void throw_err(int line, int err_code) {
char err_buf[256];
sprintf(err_buf, "[-] Error: %s -> %d", __FILE__, line);
perror(err_buf);
exit(err_code);
}
int main(int argc, char* argv[])
{
int taskid, ntasks;
double *send_arr, *recv_arr;
int *displacements, *counts;
double start_time = MPI_Wtime();
MPI_Init(&argc, &argv);
signal(SIGSEGV, segfault_handler);
MPI_Comm_rank(MPI_COMM_WORLD, &taskid);
MPI_Comm_size(MPI_COMM_WORLD, &ntasks);
if(DEBUG) {
printf("[+] Task %d has started...\n", taskid);
fflush(stdout);
}
if (taskid == 0) {
//Setup array that we need to sort
send_arr = setup_send_array(ARRAY_SIZE);
}
// Allocate memory for send_displacements and send_counts
displacements = (int *)malloc(ntasks * sizeof(int));
counts = (int *)malloc(ntasks * sizeof(int));
if(displacements == NULL || counts == NULL) {
printf("[-] Error: malloc failed!\n");
exit(1);
}
// Setup displacements and counts
setup_displacements_and_counts(displacements, counts, ntasks);
if(DEBUG) {
printf("[+] Setup displacements and counts done!\n");
fflush(stdout);
}
// Setup recv array
recv_arr = setup_recv_array(counts[taskid]);
if(DEBUG) {
printf("[+] Setup done!\n");
fflush(stdout);
}
if(MPI_Barrier(MPI_COMM_WORLD) != MPI_SUCCESS)
throw_err(__LINE__, errno);
// Scatterv array to all processes
if(MPI_Scatterv(send_arr, counts, displacements , MPI_DOUBLE, recv_arr, ARRAY_SIZE, MPI_DOUBLE, 0, MPI_COMM_WORLD) != MPI_SUCCESS)
throw_err(__LINE__, errno);
// Sort the array
qsort(recv_arr, counts[taskid], sizeof(double), compare_doubles);
if(DEBUG) {
for(int i =0; i < counts[taskid]; i++) {
printf("%f ", recv_arr[i]);
}
printf("---> tid: %d \n", taskid);
fflush(stdout);
}
if(MPI_Barrier(MPI_COMM_WORLD) != MPI_SUCCESS)
throw_err(__LINE__, errno);
// Gather the array
if(MPI_Gatherv(recv_arr, counts[taskid], MPI_DOUBLE, send_arr, counts, displacements, MPI_DOUBLE, 0, MPI_COMM_WORLD))
throw_err(__LINE__, errno);
if (taskid == 0) {
send_arr = finalize_sort(send_arr, ARRAY_SIZE, displacements, ntasks);
if(DEBUG) {
for(int i = 0; i < ARRAY_SIZE; i++) {
printf("%f ", send_arr[i]);
}
}
double total_time = MPI_Wtime() - start_time;
printf("\nTotal time: %f\n", total_time);
}
MPI_Finalize();
return 0;
// 500 000 000
// 1: Total time: 28.816904
// 2: Total time: 17.765614
// 4: Total time: 13.663701
// 8: Total time: 17.332404
// 16: Total time: 19.667273
// 32: Total time: 33.719451
// 100 000 000
// 1: Total time: 9.182889
// 2: Total time: 4.406844
// 4: Total time: 2.873339
// 8: Total time: 3.302755
// 16: Total time: 4.776216
// 32: Total time: 7.725292
// 1 000 000
// 1: Total time: 1.516520
// 2: Total time: 1.035112
// 4: Total time: 1.114661
// 8: Total time: 1.402073
// 16: Total time: 2.096291
// 32: Total time: 3.703796
}