/* * Copyright (c) 2019 Clementine Computing LLC. * * This file is part of PopuFare. * * PopuFare is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * PopuFare is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with PopuFare. If not, see . * */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "../common/common_defs.h" #include "../commhub/commhub.h" #include "../commhub/client_utils.h" #define CLIENT_SUPERVISOR_VERSION "2.1.1" // Number of modules we are actively tracking // int NUM_MODULES = 0; // File descriptor of our connection to the comm hub // int commhub_fd = -1; // This flag is set by the SIGCHLD handler, and cleared by the reaper function // volatile sig_atomic_t zombie_alert = 0; // This is our flag that the SIGHUP handler sets and is cleared when a HUP has been delivered to the client modules // volatile sig_atomic_t hup_alert = 0; // This is the time of the last new process creation event // int last_global_spawn = 0; // This is the time of our last system status check // int last_system_stat = 0; // This is the number of KB of free memory in the system // int system_stat_freemem = 0; // This is the number of running processes in the system // int system_stat_numproc = 0; // This is the 5 minute load average // float system_stat_loadavg = 0; // This is the error logging module's module number // (this is the module who's name matches SUPERVISOR_SPECIAL_BILLDB) // int error_logging_module = -1; #define PING_PUNISH_NONE (0) #define PING_PUNISH_WARN (1) #define PING_PUNISH_TERM (2) #define PING_PUNISH_KILL (3) struct module_record { // last time this process was started // time_t last_start; // last time this process has reaped // time_t last_exit; // If this flag is nonzero, the specified module is subject to PING monitoring // int ping_requested; // however, if this flag is zero, the specified module is immune to PING monitoring. // If this is zero, all further fields are invalid // int active; //------------------------- // If this flag is set, we expect the process to exit (i.e. it's not an error if it does...) // int exit_expected; // Process ID of this child module // pid_t pid; // File descriptor of this child's standard input (write to this to feed child) // int stdin_fd; // File descriptor of this child's standard output (read from this to get output) // int stdout_fd; // File descriptor of this child's standard error (read from this to get error output) // int stderr_fd; // timestamp in microseconds of the last PING message sent to this process // long long int last_ping; // timestamp in microseconds of the last PONG message received from this process // long long int last_pong; // Latest PING->PONG lag for this process // long long int ping_lag; // Used to keep track of PING non-response action sequence // int ping_punish; // This array contains the state for each module // } module_status[SUPERVISOR_MAX_MODULES] = {{0}}; // This array contains pointers to the names of each module // char *module_name[SUPERVISOR_MAX_MODULES] = {0}; #define PIPE_R (0) #define PIPE_W (1) #define STDIN_H (0) #define STDOUT_H (1) #define STDERR_H (2) int route_debug_message(char *fmt, ...) { va_list ap; #ifdef SUPERVISOR_LOG_DEBUG_TO_BILLDB // IF... we have a connection to the commhub, and we have an instance of billdb under our care, and it is alive... // if ( (commhub_fd >= 0) && (error_logging_module >= 0) && module_status[error_logging_module].active ) { int retval; int len = 0; struct message_record outgoing_msg; char payload[MAX_PAYLOAD_LENGTH] = {0}; payload[len++] = LOGLEVEL_DEBUG; len += make_log_prefix(payload + len, MAX_PAYLOAD_LENGTH - len); va_start(ap, fmt); /* Initialize the va_list */ len += vsnprintf(payload + len, MAX_PAYLOAD_LENGTH - len, fmt, ap); va_end(ap); prepare_message(&outgoing_msg, MAILBOX_BILLING_LOG, payload, len); retval = send_message(commhub_fd, &outgoing_msg); if (retval == 0) { return 0; } } #endif // IF either that didn't work, or the conditions were not met, send it to syslog... // // Initialize the va_list // va_start(ap, fmt); vsyslog(LOG_DEBUG, fmt, ap); va_end(ap); return 0; } int route_warning_message(char *fmt, ...) { va_list ap; #ifdef SUPERVISOR_LOG_WARNING_TO_BILLDB // IF... we have a connection to the commhub, and we have an instance of billdb under our care, and it is alive... // if ( (commhub_fd >= 0) && (error_logging_module >= 0) && module_status[error_logging_module].active ) { int retval; int len = 0; struct message_record outgoing_msg; char payload[MAX_PAYLOAD_LENGTH] = {0}; payload[len++] = LOGLEVEL_WARN; len += make_log_prefix(payload + len, MAX_PAYLOAD_LENGTH - len); va_start(ap, fmt); /* Initialize the va_list */ len += vsnprintf(payload + len, MAX_PAYLOAD_LENGTH - len, fmt, ap); va_end(ap); prepare_message(&outgoing_msg, MAILBOX_BILLING_LOG, payload, len); retval = send_message(commhub_fd, &outgoing_msg); if (retval == 0) { return 0; } } #endif // IF either that didn't work, or the conditions were not met, send it to syslog... // va_start(ap, fmt); /* Initialize the va_list */ vsyslog(LOG_WARNING, fmt, ap); va_end(ap); return 0; } int route_error_message(char *fmt, ...) { va_list ap; #ifdef SUPERVISOR_LOG_ERROR_TO_BILLDB // IF... we have a connection to the commhub, and we have an instance of billdb under our care, and it is alive... // if ( (commhub_fd >= 0) && (error_logging_module >= 0) && module_status[error_logging_module].active ) { int retval; int len = 0; struct message_record outgoing_msg; char payload[MAX_PAYLOAD_LENGTH] = {0}; payload[len++] = LOGLEVEL_ERROR; len += make_log_prefix(payload + len, MAX_PAYLOAD_LENGTH - len); // Initialize the va_list // va_start(ap, fmt); len += vsnprintf(payload + len, MAX_PAYLOAD_LENGTH - len, fmt, ap); va_end(ap); prepare_message(&outgoing_msg, MAILBOX_BILLING_LOG, payload, len); retval = send_message(commhub_fd, &outgoing_msg); if (retval == 0) { return 0; } } #endif // IF either that didn't work, or the conditions were not met, send it to syslog... // // Initialize the va_list // va_start(ap, fmt); vsyslog(LOG_ERR, fmt, ap); va_end(ap); return 0; } void warn_num_proc() { route_warning_message("System has %d processes running!\n", system_stat_numproc); } void warn_load_avg() { route_warning_message("System load average is %.1f!\n", system_stat_loadavg); } void warn_mem_free() { route_warning_message("System is down to %d KB of free RAM\n", system_stat_freemem); } void monitor_system_status() { int retval; int kb_free = 0; int n_procs = 0; float la5 = 0; char line[LINE_BUFFER_SIZE] = {0}; FILE *f; f = fopen("/proc/loadavg", "rb"); if (f) { fgets(line, sizeof(line), f); retval = sscanf(line, "%f %*f %*f %*i/%i", &la5, &n_procs); if (retval == 2) { system_stat_loadavg = la5; system_stat_numproc = n_procs; } fclose(f); } else { route_error_message("Cannot read /proc/loadavg\n"); } f = fopen("/proc/meminfo", "rb"); if (f) { while(!feof(f)) { fgets(line, sizeof(line), f); retval = sscanf(line, "MemFree: %i kB\n", &kb_free); if (retval == 1) { system_stat_freemem = kb_free; break; } } fclose(f); } else { route_error_message("Cannot read /proc/meminfo\n"); } if (system_stat_numproc > SUPERVISOR_WARN_NUM_PROC) { warn_num_proc(); } if (system_stat_loadavg > SUPERVISOR_WARN_LOAD_AVG) { warn_load_avg(); } if (system_stat_freemem < SUPERVISOR_WARN_LOW_MEM) { warn_mem_free(); } } void send_heartbeat() { route_debug_message("heartbeat\n"); } int launch_module(int n) { pid_t fork_ret; int retval; int my_stdin, my_stdout, my_stderr; int new_stdin[2], new_stdout[2], new_stderr[2]; int record_confirm_pipe[2]; char record_confirm_msg[4] = {'O','K','\n','\0'}; int record_confirm_expect; if ( (n < 0) || (n >= NUM_MODULES) ) { return -1; } RESET_WATCHDOG(); // Save our actual standard I/O fd's // my_stdin = dup(STDIN_H); my_stdout = dup(STDOUT_H); my_stderr = dup(STDERR_H); // Generate pipes to be said handles for the child process // pipe(new_stdin); pipe(new_stdout); pipe(new_stderr); // close our actual handles in preparation for forking // close(STDIN_H); close(STDOUT_H); close(STDERR_H); // Assign the "far" end of the pipes to our handles so that the child process will use them // dup2(new_stdin[PIPE_R], STDIN_H); dup2(new_stdout[PIPE_W], STDOUT_H); dup2(new_stderr[PIPE_W], STDERR_H); // Create a pipe for record-keeping confirmation purposes (this prevents a race condition with SIGCHLD // pipe(record_confirm_pipe); record_confirm_expect = strlen(record_confirm_msg); // Ask the kernel to create a new process to hold our new module // fork_ret = fork(); // If we are the child process // if (fork_ret == 0) { // We don't need this end of the confirmation pipe // close(record_confirm_pipe[PIPE_W]); // Wait for the parent process to confirm that we have been recorded before proceeding // with the exec attempt so that if it fails, the parent process will know what to do with the // SIGCHLD signal. // retval = read(record_confirm_pipe[PIPE_R], record_confirm_msg, record_confirm_expect); // If the parent process can't confirm our existence, we're in deep shit... // if (retval != record_confirm_expect) { fprintf(stderr, "Never got recordkeeping confirmation!\n"); exit(EX_UNAVAILABLE); } // We are now done with our confirmation pipe // close(record_confirm_pipe[PIPE_R]); // Close the child's copy of the parent's standard I/O fds // close(my_stdin); close(my_stdout); close(my_stderr); // Close the parent's end of the pipes connecting the two processes // close(new_stdin[PIPE_W]); close(new_stdout[PIPE_R]); close(new_stderr[PIPE_R]); // Attempt to exec the child process now... // retval = execl(module_name[n], module_name[n], NULL); if (retval) { fprintf(stderr, "Cannot execl(\"%s\", \"%s\", NULL)!\n", module_name[n], module_name[n]); exit(EX_UNAVAILABLE); } exit(0); } // If we are the parent process // else if (fork_ret != -1) { // Record our child PID // module_status[n].pid = fork_ret; // Record our end of the child's standard file descriptors // module_status[n].stdin_fd = new_stdin[PIPE_W]; module_status[n].stdout_fd = new_stdout[PIPE_R]; module_status[n].stderr_fd = new_stderr[PIPE_R]; // Reset our list ping/pong times to "never" // module_status[n].last_ping = 0; module_status[n].last_pong = 0; module_status[n].ping_lag = 0; module_status[n].ping_punish = PING_PUNISH_NONE; // Record our process start time // module_status[n].last_start = time(NULL); // This process is not currently expected to die // module_status[n].exit_expected = 0; // Flag this child module as active // module_status[n].active = 1; // Close the opposite end of all of our pipes // close(record_confirm_pipe[PIPE_R]); close(new_stdin[PIPE_R]); close(new_stdout[PIPE_W]); close(new_stderr[PIPE_W]); // Close our copy of the child's standard I/O file descriptors // close(STDIN_H); close(STDOUT_H); close(STDERR_H); // Restore the parent's original standard I/O file descriptors // dup2(my_stdin, STDIN_H); dup2(my_stdout, STDOUT_H); dup2(my_stderr, STDERR_H); // Unblock the child process by transmitting our confirmation message // retval = write(record_confirm_pipe[PIPE_W], record_confirm_msg, record_confirm_expect); if (retval != record_confirm_expect) { route_error_message("Cannot transmit recordkeeping confirmation!\n"); } // We are now done with our recordkeeping confirmation pipe // close(record_confirm_pipe[PIPE_W]); } // Otherwise we have somehow FAILED to fork() // else { // restore our stderr file descriptor // close(STDERR_H); dup2(my_stderr, STDERR_H); route_error_message("Cannot fork()!\n"); exit(EX_OSERR); } return 0; } void expected_exit(int n) { route_debug_message("Module %s [%d] has finished as expected.\n", module_name[n], (int)module_status[n].pid); // flag this module as "clean" for the purpose of respawn // (so we won't count the short runtime of a process we asked to terminate against it) // module_status[n].last_start = 0; module_status[n].last_exit = 0; } void unexpected_exit(int n) { int age; // Record our last stop time // module_status[n].last_exit = time(NULL); age = (int)module_status[n].last_exit - (int)module_status[n].last_start; route_warning_message("Module %s [%d] has stopped running unexpectedly after %d seconds.\n", module_name[n], (int)module_status[n].pid, age); } void reap_zombies() { int status = 0; pid_t retval = 0; int i; do { // Clear the zombie alert HERE so that if things terminate after the loop finishes, // we'll get flagged. This could result in an extra flag, but we'll never miss one. // and since we're using the WNOHANG option, there is no harm in going through the waitpid() // loop one time extra. zombie_alert = 0; retval = waitpid(-1, &status, WNOHANG); // Wait for ANY child process // If waitpid() actually waited for the specified process... // if (retval > 0) { for (i = 0; i < NUM_MODULES; i++) { // Go through our table of modules looking for an active module with a // // matching PID... // if ( module_status[i].active && (module_status[i].pid == retval) ) { // Give us an opportunity to do some clever cleanup if it is merited // These functions also set the last_exit time (and may zero the last_start // to flag a process as "clean" for respawn purposes). // if (module_status[i].exit_expected) { expected_exit(i); } else { unexpected_exit(i); } // Set this module inactive // module_status[i].active = 0; // Set this module's last ping/pong times to "never" // module_status[i].last_ping = 0; module_status[i].last_pong = 0; module_status[i].ping_lag = 0; module_status[i].ping_punish = PING_PUNISH_NONE; // No more process // module_status[i].pid = 0; // Close our end of this module's standard I/O file descriptors // close(module_status[i].stdin_fd); close(module_status[i].stdout_fd); close(module_status[i].stderr_fd); module_status[i].stdin_fd = -1; module_status[i].stdout_fd = -1; module_status[i].stderr_fd = -1; break; } } } } while(retval > 0); } void sigchld_handler(int signum, siginfo_t *info, void *data) { zombie_alert = 1; } void sighup_handler(int signum, siginfo_t *into, void *data) { hup_alert = 1; } void setup_child_handler() { struct sigaction sa = {{0}}; sa.sa_sigaction = sigchld_handler; sa.sa_flags = SA_NOCLDSTOP | SA_SIGINFO; sigfillset(&sa.sa_mask); sigaction(SIGCHLD, &sa, NULL); } void setup_hup_handler() { struct sigaction sa = {{0}}; sa.sa_sigaction = sighup_handler; sa.sa_flags = SA_SIGINFO; sigfillset(&sa.sa_mask); sigaction(SIGHUP, &sa, NULL); } void monitor_spawn_list() { int i; int module_runtime; time_t now; for (i = 0; i < NUM_MODULES; i++) { now = time(NULL); // If it is too soon to attempt spawning any new process, then we're done for the moment // if ( (now - last_global_spawn) < SUPERVISOR_GLOBAL_SPAWN_RATE_LIMIT ) break; // If this module is still active, we don't need to spawn it... if (module_status[i].active) continue; // This number will only be valid if last_start != 0 module_runtime = module_status[i].last_exit - module_status[i].last_start; // If this module either made an expected termination or has never been launcher // if (module_status[i].last_start == 0) { // then update our last global spawn time // last_global_spawn = now; // and launch it... // launch_module(i); route_debug_message("Spawned module %s, PID = %d\n", module_name[i], (int)module_status[i].pid); } // otherwise, if it has run before, but terminated unexpectedly in a very short time // else if ( module_runtime < SUPERVISOR_RESPAWN_DELAY_THRESHOLD ) { // and it has been long enough to try again // if ( (now - module_status[i].last_exit) >= SUPERVISOR_RESPAWN_RATE_LIMIT) { // then update our last global spawn time // last_global_spawn = now; // and launch it... // launch_module(i); route_warning_message("Respawned module %s, PID = %d [rate limited]\n", module_name[i], (int)module_status[i].pid); } } // otherwise, the module has terminated unexpectedly, but it had run for a long time, so we can respawn it immediately // else { // then update our last global spawn time // last_global_spawn = now; // and launch it... // launch_module(i); route_debug_message("Respawned module %s, PID = %d\n", module_name[i], (int)module_status[i].pid); } } } void handle_dead_fd(int fd) { int i; close(fd); if (fd == commhub_fd) { commhub_fd = -1; return; } for (i = 0; i < NUM_MODULES; i++) { if (module_status[i].stdin_fd == fd) { module_status[i].stdin_fd = -1; return; } if (module_status[i].stdout_fd == fd) { module_status[i].stdout_fd = -1; return; } if (module_status[i].stderr_fd == fd) { module_status[i].stderr_fd = -1; return; } } } message_callback_return handle_pong_message(struct message_record *msg, void *param) { int i; // printf("Got PONG from PID %d\n", (int)msg->header.sender); // Iterate through all of the modules // for (i = 0; i < NUM_MODULES; i++) { // If this module is active, and its PID matches the PID of the sender of this PONG message // if ( module_status[i].active && (msg->header.sender == module_status[i].pid) ) { module_status[i].last_pong = get_usec_time(); module_status[i].ping_lag = (module_status[i].last_pong - module_status[i].last_ping); // printf("PING->PONG latency %lld microseconds.\n", module_status[i].ping_lag); } } return MESSAGE_HANDLED_CONT; } //---- // Client_supervisor is now in charge of managing the state_info file, // so we need to have gps, stop and driver messages handled explicitely // to be able to write to the state_info file. // `client_supervisor` should be the sole process writing to the state_info // file while any other client should be able to read. // message_callback_return handle_gps_message(struct message_record *msg, void *param) { // guard against version mismatch // if (msg->header.payload_length != sizeof(gps_status)) { return MESSAGE_HANDLED_STOP; } memcpy(&gps_stat, msg->payload, sizeof(gps_status)); //otherwise, update our structure update_state_info_with_gps(&state_info, &gps_stat); memcpy(state_info.comment, "client_supervisor gps", strlen("client_supervisor gps")+1); set_state_info(&state_info); return MESSAGE_HANDLED_CONT; } static message_callback_return handle_stop_message(struct message_record *msg, void *param) { // guard against version mismatch // if (msg->header.payload_length != sizeof(stop_status)) { return MESSAGE_HANDLED_STOP; } memcpy(&stop_stat, msg->payload, sizeof(stop_status)); update_state_info_with_stop(&state_info, &stop_stat); memcpy(state_info.comment, "client_supervisor stop", strlen("client_supervisor stop")+1); set_state_info(&state_info); return MESSAGE_HANDLED_CONT; } static message_callback_return handle_driver_message(struct message_record *msg, void *param) { // guard against version mismatch // if (msg->header.payload_length != sizeof(driver_status)) { return MESSAGE_HANDLED_STOP; } memcpy(&driver_stat, msg->payload, sizeof(driver_stat)); update_state_info_with_driver(&state_info, &driver_stat); memcpy(state_info.comment, "client_supervisor driver", strlen("client_supervisor driver")+1); set_state_info(&state_info); return MESSAGE_HANDLED_CONT; } //---- void clear_ping_attempt() { int i; for (i = 0; i < NUM_MODULES; i++) { if (module_status[i].active) { module_status[i].last_ping = module_status[i].last_pong = 0; module_status[i].ping_punish = PING_PUNISH_NONE; } } } void send_ping_message() { struct message_record outgoing_msg; long long int usec_now; char target[MAILBOX_NAME_MAX + 1]; int i; if (commhub_fd < 0) { return; } for (i = 0; i < NUM_MODULES; i++) { // If this module is active AND included in PING monitoring // if ( module_status[i].active && module_status[i].ping_requested ) { usec_now = get_usec_time(); // If it is not yet time to re-ping this module // if ( (usec_now - module_status[i].last_ping) < SUPERVISOR_PING_INTERVAL) { // skip it for now // continue; } // If we have PING'd this host but are still waiting for a PONG back // if (module_status[i].last_pong < module_status[i].last_ping) { // Don't flood it with pings // continue; } // If we've never PING'd this host, set it's PONG time to the same as its PING // time so it doesn't get killed for not having responded to its first PING. // if (module_status[i].last_ping == 0) { module_status[i].last_pong = usec_now; } module_status[i].last_ping = usec_now; // Turn our PID into a magic mailbox name that addresses that PID. // sprintf(target, ">%d", (int)module_status[i].pid); // Prepare a message to that magic mailbox with a payload consisting of the mailbox // name of the PING notification // prepare_message(&outgoing_msg, target, MAILBOX_PING, strlen(MAILBOX_PING)); // And send it! // send_message(commhub_fd, &outgoing_msg); } } } void send_hup_message() { struct message_record outgoing_msg; if (commhub_fd < 0) { return; } printf("Sending HUP\n"); prepare_message(&outgoing_msg, MAILBOX_HUP, "", 0); send_message(commhub_fd, &outgoing_msg); } void ping_punish_warn(int n, long long int delta) { route_debug_message("Module %s [%d] hasn't responded to a PING in %d seconds...\n", module_name[n], (int)module_status[n].pid, (int)USEC_TO_SEC(delta)); } void ping_punish_term(int n, long long int delta) { route_warning_message("Module %s [%d] hasn't responded to a PING in %d seconds... Sending SIGTERM\n", module_name[n], (int)module_status[n].pid, (int)USEC_TO_SEC(delta)); kill(module_status[n].pid, SIGTERM); } void ping_punish_kill(int n, long long int delta) { route_error_message("Module %s [%d] hasn't responded to a PING in %d seconds... Sending SIGKILL\n", module_name[n], (int)module_status[n].pid, (int)USEC_TO_SEC(delta)); kill(module_status[n].pid, SIGKILL); } void enforce_ping_policy() { int i; long long int delta = 0; for (i = 0; i < NUM_MODULES; i++) { //If this module is both active AND included in the ping monitoring // if ( module_status[i].active && module_status[i].ping_requested ) { // if we haven't sent a PING to this one... // if (module_status[i].last_ping == 0) { //clear any punishment that would otherwise be pending and get the next module // module_status[i].ping_punish = PING_PUNISH_NONE; continue; } //compute just how late this module is... // delta = get_usec_time() - module_status[i].last_pong; switch(module_status[i].ping_punish) { case PING_PUNISH_NONE: if (delta > SUPERVISOR_PING_WARN_TIME) { ping_punish_warn(i, delta); module_status[i].ping_punish++; } break; case PING_PUNISH_WARN: if (delta > SUPERVISOR_PING_TERM_TIME) { ping_punish_term(i, delta); module_status[i].ping_punish++; } break; case PING_PUNISH_TERM: if (delta > SUPERVISOR_PING_KILL_TIME) { ping_punish_kill(i, delta); module_status[i].ping_punish++; } break; default: route_error_message("At our wits end with module %s [%d], just plain won't die!\n", module_name[i], (int)module_status[i].pid); break; } } } } void maintain_ipc_hub_connect(char *progname) { struct message_record outgoing_msg; // if we have no connection to the communication hub // if (commhub_fd < 0) { // try and get one // commhub_fd = connect_to_message_server(progname); // printf("commhub_fd = %d\n", commhub_fd); // if it worked // if (commhub_fd >= 0) { //Subscribe to the command mailboxes we act on // prepare_message(&outgoing_msg, MAILBOX_SUBSCRIBE, MAILBOX_PONG, strlen(MAILBOX_TOKEN_MAG)); send_message(commhub_fd,&outgoing_msg); //Subscribe to the relevant status management mailboxes // subscribe_to_default_messages(commhub_fd); } } } void print_version(FILE *fp) { fprintf(fp, "version %s\n", CLIENT_SUPERVISOR_VERSION); } void usage(FILE *fp, char *progname) { fprintf(fp,"%s: %d modules given to be supervised, minimum = 1, maximum = %d\n", progname, NUM_MODULES, SUPERVISOR_MAX_MODULES - 1); fprintf(fp,"%s: Usage: %s [module ...] [--[no-]ping] [module ...]\n", progname, progname); fprintf(fp,"%s: By default modules will be subjet to PING monitoring through\n", progname); fprintf(fp,"%s: the IPC hub, --no-ping will effect all modules following it until a --ping\n", progname); fprintf(fp,"\n"); exit(EX_USAGE); } void handle_command_line(int argc, char **argv) { int i; int needs_ping = 1; NUM_MODULES = 0; // Walk all of the given command line arguments and add each one as a module we have been asked to supervice // for (i = 1; i < argc; i++) { if ( !strcmp(argv[i], "--no-ping") ) { needs_ping = 0; } else if ( !strcmp(argv[i], "--ping") ) { needs_ping = 1; } else if ( !strcmp(argv[i], "-v") ) { print_version(stdout); exit(0); } else if ( !strcmp(argv[i], "-h") ) { usage(stdout, argv[0]); exit(EX_USAGE); } else { // Here we look for the last / in the module name // char *foo = rindex(argv[i], '/'); // if we found one, advance past it // if (foo) { foo++; } // otherwise // else { // use our whole argument // foo = argv[i]; } // make sure it is at the end // if ( !strcmp(foo, SUPERVISOR_SPECIAL_BILLDB) ) { //if it is, this one is our guy! // error_logging_module = NUM_MODULES; } module_status[NUM_MODULES].ping_requested = needs_ping; module_name[NUM_MODULES++] = argv[i]; } if (NUM_MODULES >= SUPERVISOR_MAX_MODULES) { usage(stderr, argv[0]); } } if (NUM_MODULES == 0) { usage(stderr, argv[0]); } } int main(int argc, char **argv) { int poll_ret; int nfds; int i; struct message_record incoming_msg; char linebuffer[LINE_BUFFER_SIZE] = {0}; int read_ret; int modnum; struct pollfd fds[1 + (2 * SUPERVISOR_MAX_MODULES)] = {{0}}; int poll_to_module[1 + (2 * SUPERVISOR_MAX_MODULES)] = {0}; init_state_info(); // Parse our command line to figure out what modules we need to supervise // handle_command_line(argc, argv); // Configure our default signal handlers // configure_signal_handlers(argv[0]); // As well as our module-specific zombie-reaping flag setter // setup_child_handler(); // This listens for SIGHUP messages and flags us when one has come it. // setup_hup_handler(); // Register our default keep-up-with-system status callbacks // register_system_status_callbacks(); // Except we want to exempt ourselves from the MAILBOX_EXIT and // MAILBOX_PING so as not to start ping storms or kill ourselves... // register_dispatch_callback(MAILBOX_EXIT, CALLBACK_PREPROCESS, ignore_message, NULL); register_dispatch_callback(MAILBOX_PING, CALLBACK_PREPROCESS, ignore_message, NULL); // Add our specific handlers // register_dispatch_callback(MAILBOX_PONG, CALLBACK_USER(1), handle_pong_message, NULL); register_dispatch_callback(MAILBOX_GPS_STATUS, CALLBACK_USER(1), handle_gps_message, NULL); register_dispatch_callback(MAILBOX_STOP_STATUS, CALLBACK_USER(1), handle_stop_message, NULL); register_dispatch_callback(MAILBOX_DRIVER_STATUS, CALLBACK_USER(1), handle_driver_message, NULL); // This is the main processing loop which monitors system status and pumps I/O from // all of the client modules (debug and error messages) and from the IPC hub when it is up // as indicated by poll(). // while( exit_request_status == EXIT_REQUEST_NONE ) { RESET_WATCHDOG(); //-------------- SYSTEM STATE MAINTENANCE CODE HERE if ( (time(NULL) - last_system_stat) >= SUPERVISOR_SYSTEM_STAT_INTERVAL) { monitor_system_status(); send_heartbeat(); last_system_stat = time(NULL); } // Do any maintenance on our spawn list as needed // monitor_spawn_list(); // Try and keep in touch with the IPC hub if possible // maintain_ipc_hub_connect(argv[0]); // If the communications hub is up, we may need to maintain our PING states // if (commhub_fd >= 0) { // Send out any PING messages that are required // send_ping_message(); // Deal with any delinquents who don't answer their damn pings // enforce_ping_policy(); if (hup_alert) { hup_alert = 0; send_hup_message(); } } // If there is no connection to the IPC hub // else { // clear any PING attempt records we may have active // clear_ping_attempt(); } // If our SIGCHLD handler has notified us that there is at least 1 zombie to reap // if (zombie_alert) { // Try and reap any zombies that may exist // reap_zombies(); } //-------------- POLL FILE DESCRIPTOR SET POPULATION HERE // nfds = 0; // If we have a valid communication hub connection, add it to the poll set // if (commhub_fd >= 0) { fds[nfds].fd = commhub_fd; fds[nfds].events = POLLIN; // but flag it as NOT belonging to a child module // poll_to_module[nfds] = -1; nfds++; } // Then iterate through all of our modules and see if they need service // for (i = 0; i < NUM_MODULES; i++) { // If this child module is flagged as active // if (module_status[i].active) { // and we have a stdout file descriptor for this child // if (module_status[i].stdout_fd >= 0) { // add it to the poll set // fds[nfds].fd = module_status[i].stdout_fd; fds[nfds].events = POLLIN; // and remember which module it goes to // poll_to_module[nfds] = i; nfds++; } // if we have a stderr file descriptor for this child // if (module_status[i].stderr_fd >= 0) { // add it to the poll set // fds[nfds].fd = module_status[i].stderr_fd; fds[nfds].events = POLLIN; // and remember which module it goes to // poll_to_module[nfds] = i; nfds++; } } } // If we have any file descriptors to poll // if (nfds > 0) { // poll them // poll_ret = poll(fds, nfds, POLL_TIMEOUT); // if poll returns error // if (poll_ret < 0) { // and it's just EINTR // if (errno == EINTR) { // try the body of the while again // continue; } // if it is some other error // else { // scream bloody murder // route_debug_message("Poll returned %d, errno = %d (%s)\n", poll_ret, errno, strerror(errno)); sleep(1); } } } // If we have zero functional file descriptors // else { // Pretend that we called poll and it timed out // sleep(1); poll_ret = 0; } //-------------- I/O PUMPING CODE BELOW // if there are no file descriptors flagged as needing any action // if (poll_ret == 0) { continue; } for (i = 0; i < nfds; i++) { modnum = poll_to_module[i]; // If we have input and it is from a module... // if ( (fds[i].revents & POLLIN) && (modnum >= 0) ) { read_ret = read(fds[i].fd, linebuffer, sizeof(linebuffer) - 1); if (read_ret == 0) { // clean up // and skip this fd for now // handle_dead_fd(fds[i].fd); continue; } else if ( (read_ret < 0) && (errno != EINTR) ) { // clean up // and skip this fd for now // handle_dead_fd(fds[i].fd); continue; } // Terminate our read... // linebuffer[read_ret] = '\0'; if (fds[i].fd == module_status[modnum].stdout_fd) { //route_debug_message("%s [%d]: %s", module_name[modnum], (int)module_status[modnum].pid, linebuffer); } else if (fds[i].fd == module_status[modnum].stderr_fd) { route_error_message("%s [%d]: %s", module_name[modnum], (int)module_status[modnum].pid, linebuffer); } else { route_debug_message("File descriptor %d does not belong to module %d (%s)!\n", fds[i].fd, modnum, module_name[modnum]); } } // If we have input and it is from the commhub... // else if ( (fds[i].revents & POLLIN) && (fds[i].fd == commhub_fd) ) { read_ret = get_message(commhub_fd, &incoming_msg); if (read_ret < 0) { // clean up // and skip this fd for now // handle_dead_fd(fds[i].fd); continue; } else { // This passes the received message through the callback list // process_message(&incoming_msg); } } // If it looks like we have a closed or invalid descriptor // if (fds[i].revents & (POLLERR | POLLHUP | POLLNVAL)) { // clean up // and skip this fd for now // handle_dead_fd(fds[i].fd); continue; } } } route_debug_message("Attempting graceful exit...\n"); for (i = 0; i < NUM_MODULES; i++) { if (module_status[i].active) { route_debug_message("Sending SIGTERM to module %s [%d]\n", module_name[i], module_status[i].pid); kill(module_status[i].pid, SIGTERM); } } sleep(1); return 0; }