#include #include #include #include #include #include #include "pvm3.h" #include "stopwatch.c" #define MSG_STATUS 1 #define MSG_DATA 2 #define SLAVENAME "slave2" int short_comp (const void *, const void *); void SendMasterMsg(char * format, ...); short *merge(short *left, long left_len, short *right, long right_len); int master; main() { char host[255]; gethostname(host, 255); stopwatch timer_waiting, timer_comm, timer_calc, timer_slave; float total_timer_waiting, total_timer_comm, total_timer_calc, total_timer_slave; total_timer_waiting = 0.0; total_timer_comm = 0.0; total_timer_calc = 0.0; total_timer_slave = 0.0; //SendMasterMsg("[%s]: Started\n", host); timer_slave.start(); /* enroll in pvm */ int mytid; mytid = pvm_mytid(); /* Receive data from master */ //short *data_set; //long set_size; //int nproc; pvm_recv( -1, MSG_DATA ); timer_comm.start(); short *data_set; long set_size; int nproc; pvm_upkint(&master, 1, 1); pvm_upkint(&nproc, 1, 1); char *host_parts[nproc]; int host_parts_num[nproc]; for (int i = 0; i < nproc; i++) { pvm_upkint(&host_parts_num[i], 1, 1); host_parts[i] = new char[255]; pvm_upkstr(host_parts[i]); //SendMasterMsg("[%s]: Got Host %i: %s\n", host, host_parts_num[i], host_parts[i]); } pvm_upklong(&set_size, 1, 1); data_set = new short[set_size]; pvm_upkshort(data_set, set_size, 1); total_timer_comm += timer_comm.stop(); int rcv_tids[32]; int count_rcv_tids = 0; while (1) { if (nproc > 1) { /* split array in half */ long temp1 = (int)floor(set_size / 1.3); long data_set2_size = set_size - temp1; set_size = temp1; short *data_set2 = &data_set[temp1]; //split available nodes in half int temp2 = (int)floor(nproc / 2); int host_parts2_size = nproc - temp2; nproc = temp2; char *host_parts2[host_parts2_size]; int *host_parts_num2 = &host_parts_num[temp2]; for (int i = 0; i < host_parts2_size; i++) { host_parts2[i] = host_parts[i+temp2]; } int numt; int tid[1]; SendMasterMsg("\t[%s (%d)]: Spawning Task: %d (%s)\n", host, host_parts_num[0], host_parts_num2[0], host_parts2[0]); numt = pvm_spawn(SLAVENAME, (char**)0, 1, host_parts2[0], 1, &tid[0]); if( numt < 1 ) { SendMasterMsg("Trouble spawning slave. Error code: %d\n", tid[0]); } else { rcv_tids[count_rcv_tids++] = *tid; } //SendMasterMsg("[%s (%d)]: Spawning Task: %d %ld\n", host, host_parts_num[0], host_parts2_size, data_set2_size); pvm_initsend(PvmDataDefault); pvm_pkint(&master, 1, 1); pvm_pkint(&host_parts2_size, 1, 1); for (int i = 0; i < host_parts2_size; i++) { pvm_pkint(&host_parts_num2[i], 1, 1); pvm_pkstr(host_parts2[i]); } pvm_pklong(&data_set2_size, 1, 1); pvm_pkshort(data_set2, data_set2_size, 1); pvm_send(tid[0], MSG_DATA); //delete(data_set2); } else { /* Do calculations with data */ timer_calc.start(); qsort(data_set, set_size, sizeof(short), short_comp); total_timer_calc += timer_calc.stop(); break; } } float temp_timer_waiting, temp_timer_comm, temp_timer_calc; temp_timer_waiting = 0.0; temp_timer_comm = 0.0; temp_timer_calc = 0.0; int current_rcv_tid = 0; while (current_rcv_tid < count_rcv_tids) { long set_size_right; short *data_set_right; timer_waiting.start(); pvm_recv(rcv_tids[current_rcv_tid++], MSG_DATA); total_timer_waiting += timer_waiting.stop(); timer_comm.start(); pvm_upklong(&set_size_right, 1, 1); data_set_right = new short[set_size_right]; pvm_upkshort(data_set_right, set_size_right, 1); pvm_upkfloat(&temp_timer_waiting, 1, 1); pvm_upkfloat(&temp_timer_comm, 1, 1); pvm_upkfloat(&temp_timer_calc, 1, 1); total_timer_comm += timer_comm.stop(); timer_calc.start(); short *data_temp = merge(data_set, set_size, data_set_right, set_size_right); delete(data_set); delete(data_set_right); data_set = data_temp; total_timer_calc += timer_calc.stop(); set_size += set_size_right; } total_timer_slave = timer_slave.stop(); SendMasterMsg("\t[%s (%d)]: Done. Returning. Times - Wait: %.3f Comm: %.3f Calc: %.3f Slave: %.3f\n", host, host_parts_num[0], total_timer_waiting, total_timer_comm, total_timer_calc, total_timer_slave); total_timer_waiting += temp_timer_waiting; total_timer_comm += temp_timer_comm; total_timer_calc += temp_timer_calc; /* Send result to master */ int parent = pvm_parent(); pvm_initsend( PvmDataDefault ); pvm_pklong(&set_size, 1, 1); pvm_pkshort(data_set, set_size, 1); pvm_pkfloat(&total_timer_waiting, 1, 1); pvm_pkfloat(&total_timer_comm, 1, 1); pvm_pkfloat(&total_timer_calc, 1, 1); pvm_send( parent, MSG_DATA ); /* Program finished. Exit PVM before stopping */ pvm_exit(); } int short_comp(const void * a, const void * b) { return ( *(short*)a - *(short*)b ); } void SendMasterMsg(char * format, ...) { char buffer[256]; va_list args; va_start(args, format); vsprintf(buffer, format, args); pvm_initsend(PvmDataDefault); pvm_pkstr(buffer); pvm_send(master, MSG_STATUS); //pvm_psend(master, MSG_STATUS, (void*)buffer, strlen(buffer), PVM_STR); /*FILE *pFile; pFile = fopen("slave_out","a"); if (pFile != NULL) { fputs(buffer, pFile); fclose(pFile); }*/ va_end(args); } short *merge(short *left, long left_len, short *right, long right_len) { short *result = new short[left_len + right_len]; long i, j, k; i = 0; j = 0; k = 0; while ((i < left_len) && (j < right_len)) { if (left[i] <= right[j]) { result[k++] = left[i++]; } else { result[k++] = right[j++]; } } while (i < left_len) { result[k++] = left[i++]; } while (j < right_len) { result[k++] = right[j++]; } return result; }