/* * Copyright (c) 2019 Clementine Computing LLC. * * This file is part of PopuFare. * * PopuFare is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * PopuFare is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with PopuFare. If not, see . * */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "../common/common_defs.h" #include "../commhub/commhub.h" #include "../commhub/client_utils.h" int NUM_MODULES = 0; //Number of modules we are actively tracking int commhub_fd = -1; //File descriptor of our connection to the comm hub int zombie_alert = 0; //This flag is set by the SIGCHLD handler, and cleared by the reaper function int hup_alert = 0; //This is our flag that the SIGHUP handler sets and is cleared when a HUP has been delivered to the client modules int last_global_spawn = 0; //This is the time of the last new process creation event int last_system_stat = 0; //This is the time of our last system status check int system_stat_freemem = 0; //This is the number of KB of free memory in the system int system_stat_numproc = 0; //This is the number of running processes in the system float system_stat_loadavg = 0; //This is the 5 minute load average int error_logging_module = -1; //This is the error logging module's module number //(this is the module who's name matches SUPERVISOR_SPECIAL_BILLDB) #define PING_PUNISH_NONE (0) #define PING_PUNISH_WARN (1) #define PING_PUNISH_TERM (2) #define PING_PUNISH_KILL (3) struct module_record { time_t last_start; //last time this process was started time_t last_exit; //last time this process has reaped int ping_requested; //If this flag is nonzero, the specified module is subject to PING monitoring //however, if this flag is zero, the specified module is immune to PING monitoring. int active; //If this is zero, all further fields are invalid //------------------------- int exit_expected; //If this flag is set, we expect the process to exit (i.e. it's not an error if it does...) pid_t pid; //Process ID of this child module int stdin_fd; //File descriptor of this child's standard input (write to this to feed child) int stdout_fd; //File descriptor of this child's standard output (read from this to get output) int stderr_fd; //File descriptor of this child's standard error (read from this to get error output) long long int last_ping; //timestamp in microseconds of the last PING message sent to this process long long int last_pong; //timestamp in microseconds of the last PONG message received from this process long long int ping_lag; //Latest PING->PONG lag for this process int ping_punish; //Used to keep track of PING non-response action sequence } module_status[SUPERVISOR_MAX_MODULES] = {{0}}; //This array contains the state for each module char *module_name[SUPERVISOR_MAX_MODULES] = {0}; //This array contains pointers to the names of each module #define PIPE_R (0) #define PIPE_W (1) #define STDIN_H (0) #define STDOUT_H (1) #define STDERR_H (2) int route_debug_message(char *fmt, ...) { va_list ap; #ifdef SUPERVISOR_LOG_DEBUG_TO_BILLDB //IF... we have a connection to the commhub, and we have an instance of billdb under our care, and it is alive... if( (commhub_fd >= 0) && (error_logging_module >= 0) && module_status[error_logging_module].active ) { int retval; int len = 0; struct message_record outgoing_msg; char payload[MAX_PAYLOAD_LENGTH] = {0}; payload[len++] = LOGLEVEL_DEBUG; len += make_log_prefix(payload + len, MAX_PAYLOAD_LENGTH - len); va_start(ap, fmt); /* Initialize the va_list */ len += vsnprintf(payload + len, MAX_PAYLOAD_LENGTH - len, fmt, ap); va_end(ap); prepare_message(&outgoing_msg, MAILBOX_BILLING_LOG, payload, len); retval = send_message(commhub_fd, &outgoing_msg); if(retval == 0) { return 0; } } #endif //IF either that didn't work, or the conditions were not met, send it to syslog... va_start(ap, fmt); /* Initialize the va_list */ vsyslog(LOG_DEBUG, fmt, ap); va_end(ap); return 0; } int route_warning_message(char *fmt, ...) { va_list ap; #ifdef SUPERVISOR_LOG_WARNING_TO_BILLDB //IF... we have a connection to the commhub, and we have an instance of billdb under our care, and it is alive... if( (commhub_fd >= 0) && (error_logging_module >= 0) && module_status[error_logging_module].active ) { int retval; int len = 0; struct message_record outgoing_msg; char payload[MAX_PAYLOAD_LENGTH] = {0}; payload[len++] = LOGLEVEL_WARN; len += make_log_prefix(payload + len, MAX_PAYLOAD_LENGTH - len); va_start(ap, fmt); /* Initialize the va_list */ len += vsnprintf(payload + len, MAX_PAYLOAD_LENGTH - len, fmt, ap); va_end(ap); prepare_message(&outgoing_msg, MAILBOX_BILLING_LOG, payload, len); retval = send_message(commhub_fd, &outgoing_msg); if(retval == 0) { return 0; } } #endif //IF either that didn't work, or the conditions were not met, send it to syslog... va_start(ap, fmt); /* Initialize the va_list */ vsyslog(LOG_WARNING, fmt, ap); va_end(ap); return 0; } int route_error_message(char *fmt, ...) { va_list ap; #ifdef SUPERVISOR_LOG_ERROR_TO_BILLDB //IF... we have a connection to the commhub, and we have an instance of billdb under our care, and it is alive... if( (commhub_fd >= 0) && (error_logging_module >= 0) && module_status[error_logging_module].active ) { int retval; int len = 0; struct message_record outgoing_msg; char payload[MAX_PAYLOAD_LENGTH] = {0}; payload[len++] = LOGLEVEL_ERROR; len += make_log_prefix(payload + len, MAX_PAYLOAD_LENGTH - len); va_start(ap, fmt); /* Initialize the va_list */ len += vsnprintf(payload + len, MAX_PAYLOAD_LENGTH - len, fmt, ap); va_end(ap); prepare_message(&outgoing_msg, MAILBOX_BILLING_LOG, payload, len); retval = send_message(commhub_fd, &outgoing_msg); if(retval == 0) { return 0; } } #endif //IF either that didn't work, or the conditions were not met, send it to syslog... va_start(ap, fmt); /* Initialize the va_list */ vsyslog(LOG_ERR, fmt, ap); va_end(ap); return 0; } void warn_num_proc() { route_warning_message("System has %d processes running!\n", system_stat_numproc); } void warn_load_avg() { route_warning_message("System load average is %.1f!\n", system_stat_loadavg); } void warn_mem_free() { route_warning_message("System is down to %d KB of free RAM\n", system_stat_freemem); } void monitor_system_status() { int retval; int kb_free = 0; int n_procs = 0; float la5 = 0; char line[LINE_BUFFER_SIZE] = {0}; FILE *f; f = fopen("/proc/loadavg", "rb"); if(f) { fgets(line, sizeof(line), f); retval = sscanf(line, "%f %*f %*f %*i/%i", &la5, &n_procs); if(retval == 2) { system_stat_loadavg = la5; system_stat_numproc = n_procs; } fclose(f); } else { route_error_message("Cannot read /proc/loadavg\n"); } f = fopen("/proc/meminfo", "rb"); if(f) { while(!feof(f)) { fgets(line, sizeof(line), f); retval = sscanf(line, "MemFree: %i kB\n", &kb_free); if(retval == 1) { system_stat_freemem = kb_free; break; } } fclose(f); } else { route_error_message("Cannot read /proc/meminfo\n"); } if(system_stat_numproc > SUPERVISOR_WARN_NUM_PROC) { warn_num_proc(); } if(system_stat_loadavg > SUPERVISOR_WARN_LOAD_AVG) { warn_load_avg(); } if(system_stat_freemem < SUPERVISOR_WARN_LOW_MEM) { warn_mem_free(); } } void send_heartbeat() { route_debug_message("heartbeat\n"); } int launch_module(int n) { pid_t fork_ret; int retval; int my_stdin, my_stdout, my_stderr; int new_stdin[2], new_stdout[2], new_stderr[2]; int record_confirm_pipe[2]; char record_confirm_msg[4] = {'O','K','\n','\0'}; int record_confirm_expect; if( (n < 0) || (n >= NUM_MODULES) ) { return -1; } RESET_WATCHDOG(); //Save our actual standard I/O fd's my_stdin = dup(STDIN_H); my_stdout = dup(STDOUT_H); my_stderr = dup(STDERR_H); //Generate pipes to be said handles for the child process pipe(new_stdin); pipe(new_stdout); pipe(new_stderr); //close our actual handles in preparation for forking close(STDIN_H); close(STDOUT_H); close(STDERR_H); //Assign the "far" end of the pipes to our handles so that the child process will use them dup2(new_stdin[PIPE_R], STDIN_H); dup2(new_stdout[PIPE_W], STDOUT_H); dup2(new_stderr[PIPE_W], STDERR_H); //Create a pipe for record-keeping confirmation purposes (this prevents a race condition with SIGCHLD pipe(record_confirm_pipe); record_confirm_expect = strlen(record_confirm_msg); //Ask the kernel to create a new process to hold our new module fork_ret = fork(); if(fork_ret == 0) //If we are the child process { //We don't need this end of the confirmation pipe close(record_confirm_pipe[PIPE_W]); // Wait for the parent process to confirm that we have been recorded before proceeding //with the exec attempt so that if it fails, the parent process will know what to do with the //SIGCHLD signal. retval = read(record_confirm_pipe[PIPE_R], record_confirm_msg, record_confirm_expect); //If the parent process can't confirm our existence, we're in deep shit... if(retval != record_confirm_expect) { fprintf(stderr, "Never got recordkeeping confirmation!\n"); exit(EX_UNAVAILABLE); } //We are now done with our confirmation pipe close(record_confirm_pipe[PIPE_R]); //Close the child's copy of the parent's standard I/O fds close(my_stdin); close(my_stdout); close(my_stderr); //Close the parent's end of the pipes connecting the two processes close(new_stdin[PIPE_W]); close(new_stdout[PIPE_R]); close(new_stderr[PIPE_R]); //Attempt to exec the child process now... retval = execl(module_name[n], module_name[n], NULL); if(retval) { fprintf(stderr, "Cannot execl(\"%s\", \"%s\", NULL)!\n", module_name[n], module_name[n]); exit(EX_UNAVAILABLE); } exit(0); } else if(fork_ret != -1) //If we are the parent process { //Record our child PID module_status[n].pid = fork_ret; //Record our end of the child's standard file descriptors module_status[n].stdin_fd = new_stdin[PIPE_W]; module_status[n].stdout_fd = new_stdout[PIPE_R]; module_status[n].stderr_fd = new_stderr[PIPE_R]; //Reset our list ping/pong times to "never" module_status[n].last_ping = 0; module_status[n].last_pong = 0; module_status[n].ping_lag = 0; module_status[n].ping_punish = PING_PUNISH_NONE; //Record our process start time module_status[n].last_start = time(NULL); //This process is not currently expected to die module_status[n].exit_expected = 0; //Flag this child module as active module_status[n].active = 1; //Close the opposite end of all of our pipes close(record_confirm_pipe[PIPE_R]); close(new_stdin[PIPE_R]); close(new_stdout[PIPE_W]); close(new_stderr[PIPE_W]); //Close our copy of the child's standard I/O file descriptors close(STDIN_H); close(STDOUT_H); close(STDERR_H); //Restore the parent's original standard I/O file descriptors dup2(my_stdin, STDIN_H); dup2(my_stdout, STDOUT_H); dup2(my_stderr, STDERR_H); //Unblock the child process by transmitting our confirmation message retval = write(record_confirm_pipe[PIPE_W], record_confirm_msg, record_confirm_expect); if(retval != record_confirm_expect) { route_error_message("Cannot transmit recordkeeping confirmation!\n"); } //We are now done with our recordkeeping confirmation pipe close(record_confirm_pipe[PIPE_W]); } else //Otherwise we have somehow FAILED to fork() { //restore our stderr file descriptor close(STDERR_H); dup2(my_stderr, STDERR_H); route_error_message("Cannot fork()!\n"); exit(EX_OSERR); } return 0; } void expected_exit(int n) { route_debug_message("Module %s [%d] has finished as expected.\n", module_name[n], (int)module_status[n].pid); //flag this module as "clean" for the purpose of respawn //(so we won't count the short runtime of a process we asked to terminate against it) module_status[n].last_start = 0; module_status[n].last_exit = 0; } void unexpected_exit(int n) { int age; //Record our last stop time module_status[n].last_exit = time(NULL); age = (int)module_status[n].last_exit - (int)module_status[n].last_start; route_warning_message("Module %s [%d] has stopped running unexpectedly after %d seconds.\n", module_name[n], (int)module_status[n].pid, age); } void reap_zombies() { int status = 0; pid_t retval = 0; int i; do { // Clear the zombie alert HERE so that if things terminate after the loop finishes, //we'll get flagged. This could result in an extra flag, but we'll never miss one. //and since we're using the WNOHANG option, there is no harm in going through the waitpid() //loop one time extra. zombie_alert = 0; retval = waitpid(-1, &status, WNOHANG); //Wait for ANY child process if(retval > 0) //If waitpid() actually waited for the specified process... { for(i = 0; i < NUM_MODULES; i++) { //Go through our table of modules looking for an active module with a //matching PID... if( module_status[i].active && (module_status[i].pid == retval) ) { //Give us an opportunity to do some clever cleanup if it is merited //These functions also set the last_exit time (and may zero the last_start //to flag a process as "clean" for respawn purposes). if(module_status[i].exit_expected) { expected_exit(i); } else { unexpected_exit(i); } //Set this module inactive module_status[i].active = 0; //Set this module's last ping/pong times to "never" module_status[i].last_ping = 0; module_status[i].last_pong = 0; module_status[i].ping_lag = 0; module_status[i].ping_punish = PING_PUNISH_NONE; //No more process module_status[i].pid = 0; //Close our end of this module's standard I/O file descriptors close(module_status[i].stdin_fd); close(module_status[i].stdout_fd); close(module_status[i].stderr_fd); module_status[i].stdin_fd = -1; module_status[i].stdout_fd = -1; module_status[i].stderr_fd = -1; break; } } } } while(retval > 0); } void sigchld_handler(int signum, siginfo_t *info, void *data) { zombie_alert = 1; } void sighup_handler(int signum, siginfo_t *into, void *data) { hup_alert = 1; } void setup_child_handler() { struct sigaction sa = {{0}}; sa.sa_sigaction = sigchld_handler; sa.sa_flags = SA_NOCLDSTOP | SA_SIGINFO; sigfillset(&sa.sa_mask); sigaction(SIGCHLD, &sa, NULL); } void setup_hup_handler() { struct sigaction sa = {{0}}; sa.sa_sigaction = sighup_handler; sa.sa_flags = SA_SIGINFO; sigfillset(&sa.sa_mask); sigaction(SIGHUP, &sa, NULL); } void monitor_spawn_list() { int i; int module_runtime; time_t now; for(i = 0; i < NUM_MODULES; i++) { now = time(NULL); //If it is too soon to attempt spawning any new process, then we're done for the moment if( (now - last_global_spawn) < SUPERVISOR_GLOBAL_SPAWN_RATE_LIMIT ) break; //If this module is still active, we don't need to spawn it... if(module_status[i].active) continue; //This number will only be valid if last_start != 0 module_runtime = module_status[i].last_exit - module_status[i].last_start; if(module_status[i].last_start == 0) //If this module either made an expected termination or has never been launcher { last_global_spawn = now; //then update our last global spawn time launch_module(i); //and launch it... route_debug_message("Spawned module %s, PID = %d\n", module_name[i], (int)module_status[i].pid); } else if( module_runtime < SUPERVISOR_RESPAWN_DELAY_THRESHOLD ) //otherwise, if it has run before, but terminated unexpectedly in a very short time { if( (now - module_status[i].last_exit) >= SUPERVISOR_RESPAWN_RATE_LIMIT) //and it has been long enough to try again { last_global_spawn = now; //then update our last global spawn time launch_module(i); //and launch it... route_warning_message("Respawned module %s, PID = %d [rate limited]\n", module_name[i], (int)module_status[i].pid); } } else //otherwise, the module has terminated unexpectedly, but it had run for a long time, so we can respawn it immediately { last_global_spawn = now; //then update our last global spawn time launch_module(i); //and launch it... route_debug_message("Respawned module %s, PID = %d\n", module_name[i], (int)module_status[i].pid); } } } void handle_dead_fd(int fd) { int i; close(fd); if(fd == commhub_fd) { commhub_fd = -1; return; } for(i = 0; i < NUM_MODULES; i++) { if(module_status[i].stdin_fd == fd) { module_status[i].stdin_fd = -1; return; } if(module_status[i].stdout_fd == fd) { module_status[i].stdout_fd = -1; return; } if(module_status[i].stderr_fd == fd) { module_status[i].stderr_fd = -1; return; } } } message_callback_return handle_pong_message(struct message_record *msg, void *param) { int i; // printf("Got PONG from PID %d\n", (int)msg->header.sender); //Iterate through all of the modules for(i = 0; i < NUM_MODULES; i++) { //If this module is active, and its PID matches the PID of the sender of this PONG message if( module_status[i].active && (msg->header.sender == module_status[i].pid) ) { module_status[i].last_pong = get_usec_time(); module_status[i].ping_lag = (module_status[i].last_pong - module_status[i].last_ping); // printf("PING->PONG latency %lld microseconds.\n", module_status[i].ping_lag); } } return MESSAGE_HANDLED_CONT; } void clear_ping_attempt() { int i; for(i = 0; i < NUM_MODULES; i++) { if(module_status[i].active) { module_status[i].last_ping = module_status[i].last_pong = 0; module_status[i].ping_punish = PING_PUNISH_NONE; } } } void send_ping_message() { struct message_record outgoing_msg; long long int usec_now; char target[MAILBOX_NAME_MAX + 1]; int i; if(commhub_fd < 0) { return; } for(i = 0; i < NUM_MODULES; i++) { //If this module is active AND included in PING monitoring if( module_status[i].active && module_status[i].ping_requested ) { usec_now = get_usec_time(); //If it is not yet time to re-ping this module if( (usec_now - module_status[i].last_ping) < SUPERVISOR_PING_INTERVAL) { continue; //skip it for now } //If we have PING'd this host but are still waiting for a PONG back if(module_status[i].last_pong < module_status[i].last_ping) { continue; //Don't flood it with pings } //If we've never PING'd this host, set it's PONG time to the same as its PING //time so it doesn't get killed for not having responded to its first PING. if(module_status[i].last_ping == 0) { module_status[i].last_pong = usec_now; } module_status[i].last_ping = usec_now; //Turn our PID into a magic mailbox name that addresses that PID. sprintf(target, ">%d", (int)module_status[i].pid); //Prepare a message to that magic mailbox with a payload consisting of the mailbox //name of the PING notification prepare_message(&outgoing_msg, target, MAILBOX_PING, strlen(MAILBOX_PING)); //And send it! send_message(commhub_fd, &outgoing_msg); } } } void send_hup_message() { struct message_record outgoing_msg; if(commhub_fd < 0) { return; } printf("Sending HUP\n"); prepare_message(&outgoing_msg, MAILBOX_HUP, "", 0); send_message(commhub_fd, &outgoing_msg); } void ping_punish_warn(int n, long long int delta) { route_debug_message("Module %s [%d] hasn't responded to a PING in %d seconds...\n", module_name[n], (int)module_status[n].pid, (int)USEC_TO_SEC(delta)); } void ping_punish_term(int n, long long int delta) { route_warning_message("Module %s [%d] hasn't responded to a PING in %d seconds... Sending SIGTERM\n", module_name[n], (int)module_status[n].pid, (int)USEC_TO_SEC(delta)); kill(module_status[n].pid, SIGTERM); } void ping_punish_kill(int n, long long int delta) { route_error_message("Module %s [%d] hasn't responded to a PING in %d seconds... Sending SIGKILL\n", module_name[n], (int)module_status[n].pid, (int)USEC_TO_SEC(delta)); kill(module_status[n].pid, SIGKILL); } void enforce_ping_policy() { int i; long long int delta = 0; for(i = 0; i < NUM_MODULES; i++) { //If this module is both active AND included in the ping monitoring if( module_status[i].active && module_status[i].ping_requested ) { if(module_status[i].last_ping == 0) //if we haven't sent a PING to this one... { //clear any punishment that would otherwise be pending and get the next module module_status[i].ping_punish = PING_PUNISH_NONE; continue; } //compute just how late this module is... delta = get_usec_time() - module_status[i].last_pong; switch(module_status[i].ping_punish) { case PING_PUNISH_NONE: if(delta > SUPERVISOR_PING_WARN_TIME) { ping_punish_warn(i, delta); module_status[i].ping_punish++; } break; case PING_PUNISH_WARN: if(delta > SUPERVISOR_PING_TERM_TIME) { ping_punish_term(i, delta); module_status[i].ping_punish++; } break; case PING_PUNISH_TERM: if(delta > SUPERVISOR_PING_KILL_TIME) { ping_punish_kill(i, delta); module_status[i].ping_punish++; } break; default: route_error_message("At our wits end with module %s [%d], just plain won't die!\n", module_name[i], (int)module_status[i].pid); break; } } } } void maintain_ipc_hub_connect(char *progname) { struct message_record outgoing_msg; if(commhub_fd < 0) //if we have no connection to the communication hub { commhub_fd = connect_to_message_server(progname); //try and get one // printf("commhub_fd = %d\n", commhub_fd); if(commhub_fd >= 0) //if it worked { //Subscribe to the command mailboxes we act on prepare_message(&outgoing_msg, MAILBOX_SUBSCRIBE, MAILBOX_PONG, strlen(MAILBOX_TOKEN_MAG)); send_message(commhub_fd,&outgoing_msg); //Subscribe to the relevant status management mailboxes subscribe_to_default_messages(commhub_fd); } } } void usage(char *progname) { fprintf(stderr,"%s: %d modules given to be supervised, minimum = 1, maximum = %d\n", progname, NUM_MODULES, SUPERVISOR_MAX_MODULES - 1); fprintf(stderr,"%s: Usage: %s [module ...] [--[no-]ping] [module ...]\n", progname, progname); fprintf(stderr,"%s: By default modules will be subjet to PING monitoring through\n", progname); fprintf(stderr,"%s: the IPC hub, --no-ping will effect all modules following it until a --ping\n", progname); fprintf(stderr,"\n"); exit(EX_USAGE); } void handle_command_line(int argc, char **argv) { int i; int needs_ping = 1; NUM_MODULES = 0; //Walk all of the given command line arguments and add each one as a module we have been asked to supervice for(i = 1; i < argc; i++) { if( !strcmp(argv[i], "--no-ping") ) { needs_ping = 0; } else if ( !strcmp(argv[i], "--ping") ) { needs_ping = 1; } else { //Here we look for the last / in the module name char *foo = rindex(argv[i], '/'); if(foo) //if we found one, advance past it { foo++; } else //otherwise { foo = argv[i]; //use our whole argument } if( !strcmp(foo, SUPERVISOR_SPECIAL_BILLDB) ) //make sure it is at the end { //if it is, this one is our guy! error_logging_module = NUM_MODULES; } module_status[NUM_MODULES].ping_requested = needs_ping; module_name[NUM_MODULES++] = argv[i]; } if(NUM_MODULES >= SUPERVISOR_MAX_MODULES) { usage(argv[0]); } } if(NUM_MODULES == 0) { usage(argv[0]); } } int main(int argc, char **argv) { int poll_ret; int nfds; int i; struct message_record incoming_msg; char linebuffer[LINE_BUFFER_SIZE] = {0}; int read_ret; int modnum; struct pollfd fds[1 + (2 * SUPERVISOR_MAX_MODULES)] = {{0}}; int poll_to_module[1 + (2 * SUPERVISOR_MAX_MODULES)] = {0}; //Parse our command line to figure out what modules we need to supervise handle_command_line(argc, argv); //Configure our default signal handlers configure_signal_handlers(argv[0]); //As well as our module-specific zombie-reaping flag setter setup_child_handler(); //This listens for SIGHUP messages and flags us when one has come it. setup_hup_handler(); //Register our default keep-up-with-system status callbacks register_system_status_callbacks(); //Except we want to exempt ourselves from the MAILBOX_EXIT and //MAILBOX_PING so as not to start ping storms or kill ourselves... register_dispatch_callback(MAILBOX_EXIT, CALLBACK_PREPROCESS, ignore_message, NULL); register_dispatch_callback(MAILBOX_PING, CALLBACK_PREPROCESS, ignore_message, NULL); //Add our specific handlers register_dispatch_callback(MAILBOX_PONG, CALLBACK_USER(1), handle_pong_message, NULL); // This is the main processing loop which monitors system status and pumps I/O from //all of the client modules (debug and error messages) and from the IPC hub when it is up //as indicated by poll(). while( exit_request_status == EXIT_REQUEST_NONE ) { RESET_WATCHDOG(); //-------------- SYSTEM STATE MAINTENANCE CODE HERE if( (time(NULL) - last_system_stat) >= SUPERVISOR_SYSTEM_STAT_INTERVAL) { monitor_system_status(); send_heartbeat(); last_system_stat = time(NULL); } //Do any maintenance on our spawn list as needed monitor_spawn_list(); //Try and keep in touch with the IPC hub if possible maintain_ipc_hub_connect(argv[0]); //If the communications hub is up, we may need to maintain our PING states if(commhub_fd >= 0) { //Send out any PING messages that are required send_ping_message(); //Deal with any delinquents who don't answer their damn pings enforce_ping_policy(); if(hup_alert) { hup_alert = 0; send_hup_message(); } } else //If there is no connection to the IPC hub { clear_ping_attempt(); //clear any PING attempt records we may have active } //If our SIGCHLD handler has notified us that there is at least 1 zombie to reap if(zombie_alert) { //Try and reap any zombies that may exist reap_zombies(); } //-------------- POLL FILE DESCRIPTOR SET POPULATION HERE nfds = 0; //If we have a valid communication hub connection, add it to the poll set if(commhub_fd >= 0) { fds[nfds].fd = commhub_fd; fds[nfds].events = POLLIN; poll_to_module[nfds] = -1; //but flag it as NOT belonging to a child module nfds++; } //Then iterate through all of our modules and see if they need service for(i = 0; i < NUM_MODULES; i++) { if(module_status[i].active) //If this child module is flagged as active { if(module_status[i].stdout_fd >= 0) //and we have a stdout file descriptor for this child { fds[nfds].fd = module_status[i].stdout_fd; //add it to the poll set fds[nfds].events = POLLIN; poll_to_module[nfds] = i; //and remember which module it goes to nfds++; } if(module_status[i].stderr_fd >= 0) //if we have a stderr file descriptor for this child { fds[nfds].fd = module_status[i].stderr_fd; //add it to the poll set fds[nfds].events = POLLIN; poll_to_module[nfds] = i; //and remember which module it goes to nfds++; } } } if(nfds > 0) //If we have any file descriptors to poll { poll_ret = poll(fds, nfds, POLL_TIMEOUT); //poll them if(poll_ret < 0) //if poll returns error { if(errno == EINTR) //and it's just EINTR { continue; //try the body of the while again } else //if it is some other error { //scream bloody murder route_debug_message("Poll returned %d, errno = %d (%s)\n", poll_ret, errno, strerror(errno)); sleep(1); } } } else //If we have zero functional file descriptors { sleep(1); //Pretend that we called poll and it timed out poll_ret = 0; } //-------------- I/O PUMPING CODE BELOW if(poll_ret == 0) //if there are no file descriptors flagged as needing any action { continue; } for(i = 0; i < nfds; i++) { modnum = poll_to_module[i]; if( (fds[i].revents & POLLIN) && (modnum >= 0) ) //If we have input and it is from a module... { read_ret = read(fds[i].fd, linebuffer, sizeof(linebuffer) - 1); if(read_ret == 0) { handle_dead_fd(fds[i].fd); //clean up continue; //and skip this fd for now } else if( (read_ret < 0) && (errno != EINTR) ) { handle_dead_fd(fds[i].fd); //clean up continue; //and skip this fd for now } //Terminate our read... linebuffer[read_ret] = '\0'; if(fds[i].fd == module_status[modnum].stdout_fd) { //route_debug_message("%s [%d]: %s", module_name[modnum], (int)module_status[modnum].pid, linebuffer); } else if(fds[i].fd == module_status[modnum].stderr_fd) { route_error_message("%s [%d]: %s", module_name[modnum], (int)module_status[modnum].pid, linebuffer); } else { route_debug_message("File descriptor %d does not belong to module %d (%s)!\n", fds[i].fd, modnum, module_name[modnum]); } } else if( (fds[i].revents & POLLIN) && (fds[i].fd == commhub_fd) ) //If we have input and it is from the commhub... { read_ret = get_message(commhub_fd, &incoming_msg); if(read_ret < 0) { handle_dead_fd(fds[i].fd); //clean up continue; //and skip this fd for now } else { process_message(&incoming_msg); //This passes the received message through the callback list } } //If it looks like we have a closed or invalid descriptor if(fds[i].revents & (POLLERR | POLLHUP | POLLNVAL)) { handle_dead_fd(fds[i].fd); //clean up continue; //and skip this fd for now } } } route_debug_message("Attempting graceful exit...\n"); for(i = 0; i < NUM_MODULES; i++) { if(module_status[i].active) { route_debug_message("Sending SIGTERM to module %s [%d]\n", module_name[i], module_status[i].pid); kill(module_status[i].pid, SIGTERM); } } sleep(1); return 0; }