#define _SVID_SOURCE 1 //#define _DEFAULT_SOURCE 1 #define _XOPEN_SOURCE 500 #define _POSIX_C_SOURCE 2 #include #include #include #include #include #include #include #include #include #include #include #include "error.h" #include "shr_mem.h" #include #include #include #include #define SLAVE_NUM 5 #define MAX_SIZE 300 #define MAX_PATH_SIZE 100 #define MAX_OUTPUT_SIZE (200 + MAX_PATH_SIZE) void createPipes(int pipesOutput[SLAVE_NUM][2], int pipesInput[SLAVE_NUM][2]); int writeOutput(char * output, FILE * outputFile, void * shmP, char viewLinked); void closePipes(int pipesInput[SLAVE_NUM][2], int pipesOutput[SLAVE_NUM][2], int slave); char sendTask(struct stat statBuf, char * fileName, int pipesInput[SLAVE_NUM][2], int slave); void changeViewFlag(); static char viewLinked = 0; int main(int argc, char *argv[]) { if (setvbuf(stdout, NULL, _IONBF, 0) != 0) printSystemError("setvbuf() -- ERROR"); if (argc == 1) printError("Error: no arguments provided"); //sem_unlink("/EMPTY"); FILE * outputFile; if ((outputFile = fopen("outputFile.txt", "w")) == NULL) printSystemError("fopen() -- ERROR"); char * shmPointer; int shmFd; shmFd = createShm((argc - 1) * MAX_OUTPUT_SIZE, &shmPointer); void * shmStart; printf("%d\n", argc - 1); printf("%d\n", getpid()); struct sigaction sigAct; sigAct.sa_handler = changeViewFlag; sigAct.sa_flags = 0; if (sigaction(SIGUSR1, &sigAct, NULL) == -1) printSystemError("sigaction() -- ERROR"); sleep(5); /* char * shmPointer; int shmFd; void * shmStart; */ if (viewLinked) { shmPointer = mmap(0, (argc - 1) * MAX_OUTPUT_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmFd, 0); //shmFd = createShm((argc - 1) * MAX_OUTPUT_SIZE, &shmPointer); shmStart = shmPointer; //close(shmFd); } int pipesOutput[SLAVE_NUM][2], pipesInput[SLAVE_NUM][2]; createPipes(pipesOutput, pipesInput); struct stat statBuf; int tasks = 1; for (int j = 0; tasks <= SLAVE_NUM && tasks < argc; tasks++) { if (sendTask(statBuf, argv[tasks], pipesInput, j++) == 0) continue; } char buf[MAX_SIZE]; fd_set pipesOutputSet; FD_ZERO(&pipesOutputSet); int r, readBytes, flag, count; if (tasks != SLAVE_NUM) { for (int k = tasks - 1; k < SLAVE_NUM; k++) { closePipes(pipesInput, pipesOutput, k); } } do { flag = 0; FD_ZERO(&pipesOutputSet); for (int n = 0; n < SLAVE_NUM; n++) { if (pipesOutput[n][0] != -1) { FD_SET(pipesOutput[n][0], &pipesOutputSet); flag = 1; } } if (flag) { if ((count = select(pipesOutput[SLAVE_NUM - 1][1] + 1, &pipesOutputSet, NULL, NULL, NULL)) <= 0) printSystemError("select() -- ERROR"); for (r = 0; r < SLAVE_NUM && count > 0; r++) { if (FD_ISSET(pipesOutput[r][0], &pipesOutputSet)) { readBytes = 0; do { if ((readBytes += read(pipesOutput[r][0], buf + readBytes, MAX_SIZE)) < 0) printSystemError("read() -- ERROR"); } while (buf[readBytes - 1] != '\n'); buf[readBytes] = 0; // printf("\n%s\n", buf); shmPointer += writeOutput(buf, outputFile, shmPointer, viewLinked); count--; if (tasks < argc) { if (sendTask(statBuf, argv[tasks++], pipesInput, r) == 0) continue; } else { closePipes(pipesInput, pipesOutput, r); } } } } } while (flag); //sleep(5); fclose(outputFile); for (int i = 0; i < SLAVE_NUM; i++){ if (wait(NULL) == -1) printSystemError("wait() -- ERROR"); } sigset_t set; sigaddset(&set, SIGUSR2); if (viewLinked) { int temp = 0; sigwait(&set, &temp); terminateShm("/BottlerSHM", shmFd, shmStart, (argc - 1) * MAX_OUTPUT_SIZE); closeSem(); } //close(shmFd); return EXIT_SUCCESS; } void changeViewFlag() { viewLinked = 1; } char sendTask(struct stat statBuf, char * fileName, int pipesInput[SLAVE_NUM][2], int slave) { if (stat(fileName, &statBuf) == -1) printSystemError("Error, one of the specified paths is incorrect"); if (!S_ISREG(statBuf.st_mode)) return EXIT_FAILURE; if ((write(pipesInput[slave][1], fileName, strlen(fileName) + 1)) < 0) printSystemError("write() -- ERROR"); return EXIT_SUCCESS; } void closePipes(int pipesInput[SLAVE_NUM][2], int pipesOutput[SLAVE_NUM][2], int slave) { if (close(pipesOutput[slave][0]) < 0) printSystemError("close() -- ERROR"); if (close(pipesInput[slave][1]) < 0) printSystemError("close() -- ERROR"); pipesOutput[slave][0] = -1; pipesInput[slave][0] = -1; } int writeOutput(char * output, FILE * outputFile, void * shmPointer, char viewLinked) { int length = strlen(output); if (fwrite(output, sizeof(char), length, outputFile) != length) printSystemError("fwrite() -- ERROR"); if (viewLinked) writeShm(shmPointer, output, length); return length; } void createPipes(int pipesOutput[SLAVE_NUM][2], int pipesInput[SLAVE_NUM][2]) { pid_t pid[SLAVE_NUM]; for (int i = 0; i < SLAVE_NUM; i++) { if (pipe(pipesOutput[i]) < 0) printSystemError("pipe() -- ERROR"); if (pipe(pipesInput[i]) < 0) printSystemError("pipe() -- ERROR"); if ((pid[i] = fork()) == 0) { if (close(pipesOutput[i][0]) < 0) printSystemError("close() -- ERROR"); if (dup2(pipesOutput[i][1], STDOUT_FILENO) < 0) printSystemError("dup2() -- ERROR 1"); if (close(pipesOutput[i][1]) < 0) printSystemError("close() -- ERROR"); if (close(pipesInput[i][1]) < 0) printSystemError("close() -- ERROR"); if (dup2(pipesInput[i][0], STDIN_FILENO) < 0) printSystemError("dup2() -- ERROR 2"); if (close(pipesInput[i][0]) < 0) printSystemError("close() -- ERROR"); char * params[] = {"slave.o", NULL}; if (execv("slave.o", params) < 0) printSystemError("execv() -- ERROR"); } else if (pid[i] < 0) printSystemError("fork() -- ERROR"); if (close(pipesInput[i][0]) < 0) printSystemError("close() -- ERROR"); if (close(pipesOutput[i][1]) < 0) printSystemError("close() -- ERROR"); } }