/* Program: Parallel Sort using a hybrid mergesort Author: Chris Harper (snakebytestudios.com) Mod Date: 4/21/08 TODO: -phase 2 -get number of processors in each machine -give multiprocessor machines more tasks -optimize serial sorts for large data sets (does quicksort duplicate data for each recursive call?) -implement recursive slave execution (must be memory efficient for large data sets) */ #include #include #include #include #include #include #include #include "pvm3.h" #include "stopwatch.c" #define MSG_STATUS 1 #define MSG_DATA 2 //settings #define LOG_FILE "/home/clh324/mergesort/log2.csv" //#define LOG_FILE "/root/Desktop/log2.csv" #define DEF_TEST_SIZE 25 #define SLAVENAME "slave2" struct NodeData { int Task_ID; int part_num; char host[255]; double exec_time; long data_length; }; int short_comp (const void *, const void *); int main(int argc, char *argv[]) { printf("PARALLEL SORT\n"); printf("\tUsage: executable [# items [# tasks]]\n"); printf("\t# items - length of array to sort or -1 for default\n"); printf("\t# tasks - force this many tasks, -1 for default, or 0 for serial\n"); stopwatch timer_program, timer_comm; float total_timer_waiting, total_timer_comm, total_timer_calc; char master_host[255]; gethostname(master_host, 255); /* command args debug for (int i = 0; i < argc; i++) { printf("[%d]: %s\n", i, argv[i]); } */ //set up the random array data to use long test_size; if (argc > 1) { if (atol(argv[1]) > 0) { test_size = atol(argv[1]); } else { test_size = DEF_TEST_SIZE; } } else { test_size = DEF_TEST_SIZE; } printf("Alloc %ld items ... ", test_size); short *test_set = new short[test_size]; printf("OK\n"); printf("\tMemory Use: %.2f MB\n", (test_size * sizeof(short))/1048576.0); printf("Randomizing ... "); srand(time(NULL)); for (long i = 0; i < test_size; i++) { test_set[i] = rand() % SHRT_MAX; } printf("OK\n"); if (test_size <= 50) { for (int i = 0; i < test_size; i++) { printf("%d ", test_set[i]); } printf("\n"); } //serial sort if ((argc > 2) && (atoi(argv[2]) == 0)) { printf("Executing Serial Sort ... "); stopwatch timer_serial; timer_serial.start(); qsort(test_set, test_size, sizeof(short), short_comp); printf("OK\n"); printf("\tElapsed Serial Sort Time: %.3f secs\n", timer_serial.stop()); //write log //parts is -1 for serial run FILE *pFile; pFile = fopen(LOG_FILE,"a"); if (pFile != NULL) { fprintf(pFile, "Master,Data_Length,Data_Size,Parts,Total_Exec_Time\n"); fprintf(pFile, "%s,%ld,%.2f,-1,%.3f\n", master_host, test_size, (test_size * sizeof(short))/1048576.0, timer_serial.read()); fclose(pFile); } exit(0); } timer_program.start(); /* enroll in pvm */ int mytid, nproc, nhost, narch; pvmhostinfo *hostp; mytid = pvm_mytid(); if (mytid < 0) { printf("Fatal PVM error! Exiting.\n"); exit(1); } pvm_config( &nhost, &narch, &hostp ); printf("%d nodes available\n", nhost); if (argc > 2) { if ((atoi(argv[2]) > 0) && (atoi(argv[2]) < 32) && (atoi(argv[2]) < test_size)) { nproc = atoi(argv[2]); } else { nproc = nhost; } } else { //nproc = 4; nproc = nhost; } printf("%d tasks selected\n", nproc); //decide on how hosts will be divided among selected tasks //put the current host first pvmhostinfo temp_host; for (int i = 1; i < nhost; i++) { if (strcmp(hostp[i].hi_name, master_host) == 0) { temp_host = hostp[0]; hostp[0] = hostp[i]; hostp[i] = temp_host; break; } } int k = 0; int n = 0; char *host_parts[nproc]; int host_parts_num[nproc]; while (k < nproc) { for (int i = 0; i < nhost; i++) { host_parts_num[k] = n++; host_parts[k] = hostp[i].hi_name; printf("\tTask %d Host: %s\n", k, host_parts[k]); k++; if (k >= nproc) break; } } //pvm_catchout(stdout); /* start up slave tasks */ printf("Spawning root slave task ... "); int numt; int tid[1]; numt = pvm_spawn(SLAVENAME, (char**)0, 1, ".", 1, &tid[0]); if( numt < 1 ) { printf("\n Trouble spawning slave. Aborting. Error code: %d\n", tid[0]); pvm_exit(); exit(1); } printf("OK\n"); /* Send data to slave tasks */ printf("Sending data to slave ... "); pvm_initsend(PvmDataDefault); pvm_pkint(&mytid, 1, 1); pvm_pkint(&nproc, 1, 1); for (int i = 0; i < nproc; i++) { pvm_pkint(&host_parts_num[i], 1, 1); pvm_pkstr(host_parts[i]); } pvm_pklong(&test_size, 1, 1); pvm_pkshort(test_set, test_size, 1); pvm_send(tid[0], MSG_DATA); printf("OK\n"); delete(test_set); //message loop while (1) { while (pvm_probe(-1, -1)) { if (pvm_probe(-1, MSG_STATUS)) { pvm_recv(-1, MSG_STATUS); char buf[256]; pvm_upkstr(buf); printf("%s", buf); } else if (pvm_probe(-1, MSG_DATA)) { goto results; } } usleep(15000); } results: // receive data from root slave float temp_timer_waiting, temp_timer_comm, temp_timer_calc; pvm_recv(-1, MSG_DATA); timer_comm.start(); pvm_upklong(&test_size, 1, 1); test_set = new short[test_size]; pvm_upkshort(test_set, test_size, 1); pvm_upkfloat(&temp_timer_waiting, 1, 1); pvm_upkfloat(&temp_timer_comm, 1, 1); pvm_upkfloat(&temp_timer_calc, 1, 1); total_timer_comm = timer_comm.stop(); total_timer_waiting += temp_timer_waiting; total_timer_comm += temp_timer_comm; total_timer_calc += temp_timer_calc; if (test_size <= 50) { for (int j = 0; j < test_size; j++) { printf("%d ", test_set[j]); } printf("\n"); } /* Program finished. Exit PVM before stopping */ pvm_exit(); printf("Elapsed Comm Overhead Time: %.3f secs\n", total_timer_comm); printf("Elapsed Wait Overhead Time: %.3f secs\n", total_timer_waiting); printf("Elapsed Calculation Time: %.3f secs\n", total_timer_calc); printf("Elapsed Program Time: %.3f secs\n", timer_program.stop()); FILE *pFile; pFile = fopen(LOG_FILE,"a"); if (pFile != NULL) { fprintf(pFile, "Master,Data_Length,Data_Size,Parts,Total_Exec_Time,Total_Comm_Time,Total_Wait_Time,Total_Calc_Time\n"); fprintf(pFile, "%s,%ld,%.2f,%d,%.3f,%.3f,%.3f,%.3f\n", master_host, test_size, (test_size * sizeof(short))/1048576.0, nproc, timer_program.read(), total_timer_comm, total_timer_waiting, total_timer_calc); fclose(pFile); } } int short_comp (const void * a, const void * b) { return ( *(short*)a - *(short*)b ); }