diff --git a/dn5/Makefile b/dn5/Makefile index 5d37328..29a85fd 100644 --- a/dn5/Makefile +++ b/dn5/Makefile @@ -15,4 +15,4 @@ push: run_remote: ssh nsc 'cd $(DESTINATION); make clean;module load OpenMPI/4.0.5-GCC-10.2.0; make server;' - ssh nsc 'module load OpenMPI/4.0.5-GCC-10.2.0; mpirun -N 2 $(DESTINATION)/server' + ssh nsc 'module load OpenMPI/4.0.5-GCC-10.2.0; srun --reservation=fri --nodes=2 --mpi=pmix mpirun -N 2 $(DESTINATION)/server' diff --git a/dn5/mpi b/dn5/mpi new file mode 100644 index 0000000..e739af6 Binary files /dev/null and b/dn5/mpi differ diff --git a/dn5/server.c b/dn5/server.c index 74a3518..47ab9d0 100644 --- a/dn5/server.c +++ b/dn5/server.c @@ -18,8 +18,7 @@ int main(int argc, char* argv[]) buff = (int *)malloc(sizeof(int)*buffsize); for (int i=0; i +#include +#include +#include +#include +#include +#include +#include + +#define ARRAY_SIZE 50000000 +#define DEBUG 0 + +void segfault_handler(int sig) { + void *array[10]; + size_t size; + + size = backtrace(array, 10); + + fprintf(stderr, "[-] Error: signal %d:\n", sig); + backtrace_symbols_fd(array, size, STDERR_FILENO); + exit(1); +} + +double* setup_send_array(int size) { + double *arr = (double *)malloc(size * sizeof(double)); + + if(arr == NULL) { + printf("[-] Error: malloc failed!\n"); + exit(1); + } + + for (int i = 0; i < size; i++) { + arr[i] = (double)rand() / (double)RAND_MAX; + } + return arr; +} + +double* setup_recv_array(int n_elements) { + double *recv_arr = (double *)malloc(n_elements * sizeof(double)); + + if(recv_arr == NULL) { + printf("[-] Error: malloc failed!\n"); + exit(1); + } + + for (int i = 0; i < n_elements; i++) { + recv_arr[i] = 0.0; + } + return recv_arr; +} + +void setup_displacements_and_counts(int *displacements, int *counts, int ntasks) { + for(int i = 0; i < ntasks; i++) { + displacements[i] = (int) (ARRAY_SIZE / ntasks * i); // my_start + counts[i] = (int) (ARRAY_SIZE / ntasks * (i + 1)) - displacements[i]; // my_end - my_start + } +} + +int compare_doubles(const void *a, const void *b) { + const double *da = (const double *) a; + const double *db = (const double *) b; + + return (*da > *db) - (*da < *db); +} + +double *finalize_sort(double* arr, int arr_size, int *displacements, int n_tasks) { + double *temp_arr = (double *)malloc(arr_size * sizeof(double)); + int *temp_displacements = (int *)malloc(n_tasks * sizeof(int)); + + if(temp_arr == NULL || temp_displacements == NULL) { + printf("[-] Error: malloc failed!\n"); + exit(1); + } + + for(int i = 0; i < n_tasks; i++) + temp_displacements[i] = displacements[i]; + + for(int i = 0; i < arr_size; i++) { + + int min_displacement_task_ix = 0; + for(int j = 0; j < n_tasks; j++) { + if(j != n_tasks -1 && temp_displacements[j] == displacements[j+1]) + continue; + else if(j == n_tasks && temp_displacements[j] == arr_size -1) + continue; + + if(arr[temp_displacements[min_displacement_task_ix]] > arr[temp_displacements[j]]) + min_displacement_task_ix = j; + } + temp_arr[i] = arr[temp_displacements[min_displacement_task_ix]]; + temp_displacements[min_displacement_task_ix]++; + } + + return temp_arr; +} + +void throw_err(int line, int err_code) { + char err_buf[256]; + + sprintf(err_buf, "[-] Error: %s -> %d", __FILE__, line); + perror(err_buf); + exit(err_code); +} + +int main(int argc, char* argv[]) +{ + int taskid, ntasks; + double *send_arr, *recv_arr; + int *displacements, *counts; + + double start_time = MPI_Wtime(); + + MPI_Init(&argc, &argv); + signal(SIGSEGV, segfault_handler); + + MPI_Comm_rank(MPI_COMM_WORLD, &taskid); + MPI_Comm_size(MPI_COMM_WORLD, &ntasks); + + if(DEBUG) { + printf("[+] Task %d has started...\n", taskid); + fflush(stdout); + } + + if (taskid == 0) { + //Setup array that we need to sort + send_arr = setup_send_array(ARRAY_SIZE); + } + + // Allocate memory for send_displacements and send_counts + displacements = (int *)malloc(ntasks * sizeof(int)); + counts = (int *)malloc(ntasks * sizeof(int)); + + if(displacements == NULL || counts == NULL) { + printf("[-] Error: malloc failed!\n"); + exit(1); + } + + // Setup displacements and counts + setup_displacements_and_counts(displacements, counts, ntasks); + + if(DEBUG) { + printf("[+] Setup displacements and counts done!\n"); + fflush(stdout); + } + + // Setup recv array + recv_arr = setup_recv_array(counts[taskid]); + + if(DEBUG) { + printf("[+] Setup done!\n"); + fflush(stdout); + } + + if(MPI_Barrier(MPI_COMM_WORLD) != MPI_SUCCESS) + throw_err(__LINE__, errno); + + // Scatterv array to all processes + if(MPI_Scatterv(send_arr, counts, displacements , MPI_DOUBLE, recv_arr, ARRAY_SIZE, MPI_DOUBLE, 0, MPI_COMM_WORLD) != MPI_SUCCESS) + throw_err(__LINE__, errno); + + // Sort the array + qsort(recv_arr, counts[taskid], sizeof(double), compare_doubles); + + + if(DEBUG) { + for(int i =0; i < counts[taskid]; i++) { + printf("%f ", recv_arr[i]); + } + printf("---> tid: %d \n", taskid); + fflush(stdout); + } + + if(MPI_Barrier(MPI_COMM_WORLD) != MPI_SUCCESS) + throw_err(__LINE__, errno); + + // Gather the array + if(MPI_Gatherv(recv_arr, counts[taskid], MPI_DOUBLE, send_arr, counts, displacements, MPI_DOUBLE, 0, MPI_COMM_WORLD)) + throw_err(__LINE__, errno); + + + if (taskid == 0) { + send_arr = finalize_sort(send_arr, ARRAY_SIZE, displacements, ntasks); + if(DEBUG) { + for(int i = 0; i < ARRAY_SIZE; i++) { + printf("%f ", send_arr[i]); + } + } + + double total_time = MPI_Wtime() - start_time; + + printf("\nTotal time: %f\n", total_time); + } + + + MPI_Finalize(); + return 0; +}