/*
* Copyright (c) 2019 Clementine Computing LLC.
*
* This file is part of PopuFare.
*
* PopuFare is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* PopuFare is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with PopuFare. If not, see .
*
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "../common/common_defs.h"
#include "../commhub/commhub.h"
#include "../commhub/client_utils.h"
int NUM_MODULES = 0; //Number of modules we are actively tracking
int commhub_fd = -1; //File descriptor of our connection to the comm hub
int zombie_alert = 0; //This flag is set by the SIGCHLD handler, and cleared by the reaper function
int hup_alert = 0; //This is our flag that the SIGHUP handler sets and is cleared when a HUP has been delivered to the client modules
int last_global_spawn = 0; //This is the time of the last new process creation event
int last_system_stat = 0; //This is the time of our last system status check
int system_stat_freemem = 0; //This is the number of KB of free memory in the system
int system_stat_numproc = 0; //This is the number of running processes in the system
float system_stat_loadavg = 0; //This is the 5 minute load average
int error_logging_module = -1; //This is the error logging module's module number
//(this is the module who's name matches SUPERVISOR_SPECIAL_BILLDB)
#define PING_PUNISH_NONE (0)
#define PING_PUNISH_WARN (1)
#define PING_PUNISH_TERM (2)
#define PING_PUNISH_KILL (3)
struct module_record
{
time_t last_start; //last time this process was started
time_t last_exit; //last time this process has reaped
int ping_requested; //If this flag is nonzero, the specified module is subject to PING monitoring
//however, if this flag is zero, the specified module is immune to PING monitoring.
int active; //If this is zero, all further fields are invalid
//-------------------------
int exit_expected; //If this flag is set, we expect the process to exit (i.e. it's not an error if it does...)
pid_t pid; //Process ID of this child module
int stdin_fd; //File descriptor of this child's standard input (write to this to feed child)
int stdout_fd; //File descriptor of this child's standard output (read from this to get output)
int stderr_fd; //File descriptor of this child's standard error (read from this to get error output)
long long int last_ping; //timestamp in microseconds of the last PING message sent to this process
long long int last_pong; //timestamp in microseconds of the last PONG message received from this process
long long int ping_lag; //Latest PING->PONG lag for this process
int ping_punish; //Used to keep track of PING non-response action sequence
} module_status[SUPERVISOR_MAX_MODULES] = {{0}}; //This array contains the state for each module
char *module_name[SUPERVISOR_MAX_MODULES] = {0}; //This array contains pointers to the names of each module
#define PIPE_R (0)
#define PIPE_W (1)
#define STDIN_H (0)
#define STDOUT_H (1)
#define STDERR_H (2)
int route_debug_message(char *fmt, ...)
{
va_list ap;
#ifdef SUPERVISOR_LOG_DEBUG_TO_BILLDB
//IF... we have a connection to the commhub, and we have an instance of billdb under our care, and it is alive...
if( (commhub_fd >= 0) && (error_logging_module >= 0) && module_status[error_logging_module].active )
{
int retval;
int len = 0;
struct message_record outgoing_msg;
char payload[MAX_PAYLOAD_LENGTH] = {0};
payload[len++] = LOGLEVEL_DEBUG;
len += make_log_prefix(payload + len, MAX_PAYLOAD_LENGTH - len);
va_start(ap, fmt); /* Initialize the va_list */
len += vsnprintf(payload + len, MAX_PAYLOAD_LENGTH - len, fmt, ap);
va_end(ap);
prepare_message(&outgoing_msg, MAILBOX_BILLING_LOG, payload, len);
retval = send_message(commhub_fd, &outgoing_msg);
if(retval == 0)
{
return 0;
}
}
#endif
//IF either that didn't work, or the conditions were not met, send it to syslog...
va_start(ap, fmt); /* Initialize the va_list */
vsyslog(LOG_DEBUG, fmt, ap);
va_end(ap);
return 0;
}
int route_warning_message(char *fmt, ...)
{
va_list ap;
#ifdef SUPERVISOR_LOG_WARNING_TO_BILLDB
//IF... we have a connection to the commhub, and we have an instance of billdb under our care, and it is alive...
if( (commhub_fd >= 0) && (error_logging_module >= 0) && module_status[error_logging_module].active )
{
int retval;
int len = 0;
struct message_record outgoing_msg;
char payload[MAX_PAYLOAD_LENGTH] = {0};
payload[len++] = LOGLEVEL_WARN;
len += make_log_prefix(payload + len, MAX_PAYLOAD_LENGTH - len);
va_start(ap, fmt); /* Initialize the va_list */
len += vsnprintf(payload + len, MAX_PAYLOAD_LENGTH - len, fmt, ap);
va_end(ap);
prepare_message(&outgoing_msg, MAILBOX_BILLING_LOG, payload, len);
retval = send_message(commhub_fd, &outgoing_msg);
if(retval == 0)
{
return 0;
}
}
#endif
//IF either that didn't work, or the conditions were not met, send it to syslog...
va_start(ap, fmt); /* Initialize the va_list */
vsyslog(LOG_WARNING, fmt, ap);
va_end(ap);
return 0;
}
int route_error_message(char *fmt, ...)
{
va_list ap;
#ifdef SUPERVISOR_LOG_ERROR_TO_BILLDB
//IF... we have a connection to the commhub, and we have an instance of billdb under our care, and it is alive...
if( (commhub_fd >= 0) && (error_logging_module >= 0) && module_status[error_logging_module].active )
{
int retval;
int len = 0;
struct message_record outgoing_msg;
char payload[MAX_PAYLOAD_LENGTH] = {0};
payload[len++] = LOGLEVEL_ERROR;
len += make_log_prefix(payload + len, MAX_PAYLOAD_LENGTH - len);
va_start(ap, fmt); /* Initialize the va_list */
len += vsnprintf(payload + len, MAX_PAYLOAD_LENGTH - len, fmt, ap);
va_end(ap);
prepare_message(&outgoing_msg, MAILBOX_BILLING_LOG, payload, len);
retval = send_message(commhub_fd, &outgoing_msg);
if(retval == 0)
{
return 0;
}
}
#endif
//IF either that didn't work, or the conditions were not met, send it to syslog...
va_start(ap, fmt); /* Initialize the va_list */
vsyslog(LOG_ERR, fmt, ap);
va_end(ap);
return 0;
}
void warn_num_proc()
{
route_warning_message("System has %d processes running!\n", system_stat_numproc);
}
void warn_load_avg()
{
route_warning_message("System load average is %.1f!\n", system_stat_loadavg);
}
void warn_mem_free()
{
route_warning_message("System is down to %d KB of free RAM\n", system_stat_freemem);
}
void monitor_system_status()
{
int retval;
int kb_free = 0;
int n_procs = 0;
float la5 = 0;
char line[LINE_BUFFER_SIZE] = {0};
FILE *f;
f = fopen("/proc/loadavg", "rb");
if(f)
{
fgets(line, sizeof(line), f);
retval = sscanf(line, "%f %*f %*f %*i/%i", &la5, &n_procs);
if(retval == 2)
{
system_stat_loadavg = la5;
system_stat_numproc = n_procs;
}
fclose(f);
}
else
{
route_error_message("Cannot read /proc/loadavg\n");
}
f = fopen("/proc/meminfo", "rb");
if(f)
{
while(!feof(f))
{
fgets(line, sizeof(line), f);
retval = sscanf(line, "MemFree: %i kB\n", &kb_free);
if(retval == 1)
{
system_stat_freemem = kb_free;
break;
}
}
fclose(f);
}
else
{
route_error_message("Cannot read /proc/meminfo\n");
}
if(system_stat_numproc > SUPERVISOR_WARN_NUM_PROC)
{
warn_num_proc();
}
if(system_stat_loadavg > SUPERVISOR_WARN_LOAD_AVG)
{
warn_load_avg();
}
if(system_stat_freemem < SUPERVISOR_WARN_LOW_MEM)
{
warn_mem_free();
}
}
int launch_module(int n)
{
pid_t fork_ret;
int retval;
int my_stdin, my_stdout, my_stderr;
int new_stdin[2], new_stdout[2], new_stderr[2];
int record_confirm_pipe[2];
char record_confirm_msg[4] = {'O','K','\n','\0'};
int record_confirm_expect;
if( (n < 0) || (n >= NUM_MODULES) )
{
return -1;
}
RESET_WATCHDOG();
//Save our actual standard I/O fd's
my_stdin = dup(STDIN_H);
my_stdout = dup(STDOUT_H);
my_stderr = dup(STDERR_H);
//Generate pipes to be said handles for the child process
pipe(new_stdin);
pipe(new_stdout);
pipe(new_stderr);
//close our actual handles in preparation for forking
close(STDIN_H);
close(STDOUT_H);
close(STDERR_H);
//Assign the "far" end of the pipes to our handles so that the child process will use them
dup2(new_stdin[PIPE_R], STDIN_H);
dup2(new_stdout[PIPE_W], STDOUT_H);
dup2(new_stderr[PIPE_W], STDERR_H);
//Create a pipe for record-keeping confirmation purposes (this prevents a race condition with SIGCHLD
pipe(record_confirm_pipe);
record_confirm_expect = strlen(record_confirm_msg);
//Ask the kernel to create a new process to hold our new module
fork_ret = fork();
if(fork_ret == 0) //If we are the child process
{
//We don't need this end of the confirmation pipe
close(record_confirm_pipe[PIPE_W]);
// Wait for the parent process to confirm that we have been recorded before proceeding
//with the exec attempt so that if it fails, the parent process will know what to do with the
//SIGCHLD signal.
retval = read(record_confirm_pipe[PIPE_R], record_confirm_msg, record_confirm_expect);
//If the parent process can't confirm our existence, we're in deep shit...
if(retval != record_confirm_expect)
{
fprintf(stderr, "Never got recordkeeping confirmation!\n");
exit(EX_UNAVAILABLE);
}
//We are now done with our confirmation pipe
close(record_confirm_pipe[PIPE_R]);
//Close the child's copy of the parent's standard I/O fds
close(my_stdin);
close(my_stdout);
close(my_stderr);
//Close the parent's end of the pipes connecting the two processes
close(new_stdin[PIPE_W]);
close(new_stdout[PIPE_R]);
close(new_stderr[PIPE_R]);
//Attempt to exec the child process now...
retval = execl(module_name[n], module_name[n], NULL);
if(retval)
{
fprintf(stderr, "Cannot execl(\"%s\", \"%s\", NULL)!\n", module_name[n], module_name[n]);
exit(EX_UNAVAILABLE);
}
exit(0);
}
else if(fork_ret != -1) //If we are the parent process
{
//Record our child PID
module_status[n].pid = fork_ret;
//Record our end of the child's standard file descriptors
module_status[n].stdin_fd = new_stdin[PIPE_W];
module_status[n].stdout_fd = new_stdout[PIPE_R];
module_status[n].stderr_fd = new_stderr[PIPE_R];
//Reset our list ping/pong times to "never"
module_status[n].last_ping = 0;
module_status[n].last_pong = 0;
module_status[n].ping_lag = 0;
module_status[n].ping_punish = PING_PUNISH_NONE;
//Record our process start time
module_status[n].last_start = time(NULL);
//This process is not currently expected to die
module_status[n].exit_expected = 0;
//Flag this child module as active
module_status[n].active = 1;
//Close the opposite end of all of our pipes
close(record_confirm_pipe[PIPE_R]);
close(new_stdin[PIPE_R]);
close(new_stdout[PIPE_W]);
close(new_stderr[PIPE_W]);
//Close our copy of the child's standard I/O file descriptors
close(STDIN_H);
close(STDOUT_H);
close(STDERR_H);
//Restore the parent's original standard I/O file descriptors
dup2(my_stdin, STDIN_H);
dup2(my_stdout, STDOUT_H);
dup2(my_stderr, STDERR_H);
//Unblock the child process by transmitting our confirmation message
retval = write(record_confirm_pipe[PIPE_W], record_confirm_msg, record_confirm_expect);
if(retval != record_confirm_expect)
{
route_error_message("Cannot transmit recordkeeping confirmation!\n");
}
//We are now done with our recordkeeping confirmation pipe
close(record_confirm_pipe[PIPE_W]);
}
else //Otherwise we have somehow FAILED to fork()
{
//restore our stderr file descriptor
close(STDERR_H);
dup2(my_stderr, STDERR_H);
route_error_message("Cannot fork()!\n");
exit(EX_OSERR);
}
return 0;
}
void expected_exit(int n)
{
route_debug_message("Module %s [%d] has finished as expected.\n", module_name[n], (int)module_status[n].pid);
//flag this module as "clean" for the purpose of respawn
//(so we won't count the short runtime of a process we asked to terminate against it)
module_status[n].last_start = 0;
module_status[n].last_exit = 0;
}
void unexpected_exit(int n)
{
int age;
//Record our last stop time
module_status[n].last_exit = time(NULL);
age = (int)module_status[n].last_exit - (int)module_status[n].last_start;
route_warning_message("Module %s [%d] has stopped running unexpectedly after %d seconds.\n", module_name[n], (int)module_status[n].pid, age);
}
void reap_zombies()
{
int status = 0;
pid_t retval = 0;
int i;
do
{
// Clear the zombie alert HERE so that if things terminate after the loop finishes,
//we'll get flagged. This could result in an extra flag, but we'll never miss one.
//and since we're using the WNOHANG option, there is no harm in going through the waitpid()
//loop one time extra.
zombie_alert = 0;
retval = waitpid(-1, &status, WNOHANG); //Wait for ANY child process
if(retval > 0) //If waitpid() actually waited for the specified process...
{
for(i = 0; i < NUM_MODULES; i++)
{
//Go through our table of modules looking for an active module with a
//matching PID...
if( module_status[i].active && (module_status[i].pid == retval) )
{
//Give us an opportunity to do some clever cleanup if it is merited
//These functions also set the last_exit time (and may zero the last_start
//to flag a process as "clean" for respawn purposes).
if(module_status[i].exit_expected)
{
expected_exit(i);
}
else
{
unexpected_exit(i);
}
//Set this module inactive
module_status[i].active = 0;
//Set this module's last ping/pong times to "never"
module_status[i].last_ping = 0;
module_status[i].last_pong = 0;
module_status[i].ping_lag = 0;
module_status[i].ping_punish = PING_PUNISH_NONE;
//No more process
module_status[i].pid = 0;
//Close our end of this module's standard I/O file descriptors
close(module_status[i].stdin_fd);
close(module_status[i].stdout_fd);
close(module_status[i].stderr_fd);
module_status[i].stdin_fd = -1;
module_status[i].stdout_fd = -1;
module_status[i].stderr_fd = -1;
break;
}
}
}
} while(retval > 0);
}
void sigchld_handler(int signum, siginfo_t *info, void *data)
{
zombie_alert = 1;
}
void sighup_handler(int signum, siginfo_t *into, void *data)
{
hup_alert = 1;
}
void setup_child_handler()
{
struct sigaction sa = {{0}};
sa.sa_sigaction = sigchld_handler;
sa.sa_flags = SA_NOCLDSTOP | SA_SIGINFO;
sigfillset(&sa.sa_mask);
sigaction(SIGCHLD, &sa, NULL);
}
void setup_hup_handler()
{
struct sigaction sa = {{0}};
sa.sa_sigaction = sighup_handler;
sa.sa_flags = SA_SIGINFO;
sigfillset(&sa.sa_mask);
sigaction(SIGHUP, &sa, NULL);
}
void monitor_spawn_list()
{
int i;
int module_runtime;
time_t now;
for(i = 0; i < NUM_MODULES; i++)
{
now = time(NULL);
//If it is too soon to attempt spawning any new process, then we're done for the moment
if( (now - last_global_spawn) < SUPERVISOR_GLOBAL_SPAWN_RATE_LIMIT ) break;
//If this module is still active, we don't need to spawn it...
if(module_status[i].active) continue;
//This number will only be valid if last_start != 0
module_runtime = module_status[i].last_exit - module_status[i].last_start;
if(module_status[i].last_start == 0) //If this module either made an expected termination or has never been launcher
{
last_global_spawn = now; //then update our last global spawn time
launch_module(i); //and launch it...
route_debug_message("Spawned module %s, PID = %d\n", module_name[i], (int)module_status[i].pid);
}
else if( module_runtime < SUPERVISOR_RESPAWN_DELAY_THRESHOLD ) //otherwise, if it has run before, but terminated unexpectedly in a very short time
{
if( (now - module_status[i].last_exit) >= SUPERVISOR_RESPAWN_RATE_LIMIT) //and it has been long enough to try again
{
last_global_spawn = now; //then update our last global spawn time
launch_module(i); //and launch it...
route_warning_message("Respawned module %s, PID = %d [rate limited]\n", module_name[i], (int)module_status[i].pid);
}
}
else //otherwise, the module has terminated unexpectedly, but it had run for a long time, so we can respawn it immediately
{
last_global_spawn = now; //then update our last global spawn time
launch_module(i); //and launch it...
route_debug_message("Respawned module %s, PID = %d\n", module_name[i], (int)module_status[i].pid);
}
}
}
void handle_dead_fd(int fd)
{
int i;
close(fd);
if(fd == commhub_fd)
{
commhub_fd = -1;
return;
}
for(i = 0; i < NUM_MODULES; i++)
{
if(module_status[i].stdin_fd == fd)
{
module_status[i].stdin_fd = -1;
return;
}
if(module_status[i].stdout_fd == fd)
{
module_status[i].stdout_fd = -1;
return;
}
if(module_status[i].stderr_fd == fd)
{
module_status[i].stderr_fd = -1;
return;
}
}
}
message_callback_return handle_pong_message(struct message_record *msg, void *param)
{
int i;
// printf("Got PONG from PID %d\n", (int)msg->header.sender);
//Iterate through all of the modules
for(i = 0; i < NUM_MODULES; i++)
{
//If this module is active, and its PID matches the PID of the sender of this PONG message
if( module_status[i].active && (msg->header.sender == module_status[i].pid) )
{
module_status[i].last_pong = get_usec_time();
module_status[i].ping_lag = (module_status[i].last_pong - module_status[i].last_ping);
// printf("PING->PONG latency %lld microseconds.\n", module_status[i].ping_lag);
}
}
return MESSAGE_HANDLED_CONT;
}
void clear_ping_attempt()
{
int i;
for(i = 0; i < NUM_MODULES; i++)
{
if(module_status[i].active)
{
module_status[i].last_ping = module_status[i].last_pong = 0;
module_status[i].ping_punish = PING_PUNISH_NONE;
}
}
}
void send_ping_message()
{
struct message_record outgoing_msg;
long long int usec_now;
char target[MAILBOX_NAME_MAX + 1];
int i;
if(commhub_fd < 0)
{
return;
}
for(i = 0; i < NUM_MODULES; i++)
{
//If this module is active AND included in PING monitoring
if( module_status[i].active && module_status[i].ping_requested )
{
usec_now = get_usec_time();
//If it is not yet time to re-ping this module
if( (usec_now - module_status[i].last_ping) < SUPERVISOR_PING_INTERVAL)
{
continue; //skip it for now
}
//If we have PING'd this host but are still waiting for a PONG back
if(module_status[i].last_pong < module_status[i].last_ping)
{
continue; //Don't flood it with pings
}
//If we've never PING'd this host, set it's PONG time to the same as its PING
//time so it doesn't get killed for not having responded to its first PING.
if(module_status[i].last_ping == 0)
{
module_status[i].last_pong = usec_now;
}
module_status[i].last_ping = usec_now;
//Turn our PID into a magic mailbox name that addresses that PID.
sprintf(target, ">%d", (int)module_status[i].pid);
//Prepare a message to that magic mailbox with a payload consisting of the mailbox
//name of the PING notification
prepare_message(&outgoing_msg, target, MAILBOX_PING, strlen(MAILBOX_PING));
//And send it!
send_message(commhub_fd, &outgoing_msg);
}
}
}
void send_hup_message()
{
struct message_record outgoing_msg;
if(commhub_fd < 0)
{
return;
}
printf("Sending HUP\n");
prepare_message(&outgoing_msg, MAILBOX_HUP, "", 0);
send_message(commhub_fd, &outgoing_msg);
}
void ping_punish_warn(int n, long long int delta)
{
route_debug_message("Module %s [%d] hasn't responded to a PING in %d seconds...\n", module_name[n], (int)module_status[n].pid, (int)USEC_TO_SEC(delta));
}
void ping_punish_term(int n, long long int delta)
{
route_warning_message("Module %s [%d] hasn't responded to a PING in %d seconds... Sending SIGTERM\n", module_name[n], (int)module_status[n].pid, (int)USEC_TO_SEC(delta));
kill(module_status[n].pid, SIGTERM);
}
void ping_punish_kill(int n, long long int delta)
{
route_error_message("Module %s [%d] hasn't responded to a PING in %d seconds... Sending SIGKILL\n", module_name[n], (int)module_status[n].pid, (int)USEC_TO_SEC(delta));
kill(module_status[n].pid, SIGKILL);
}
void enforce_ping_policy()
{
int i;
long long int delta = 0;
for(i = 0; i < NUM_MODULES; i++)
{
//If this module is both active AND included in the ping monitoring
if( module_status[i].active && module_status[i].ping_requested )
{
if(module_status[i].last_ping == 0) //if we haven't sent a PING to this one...
{
//clear any punishment that would otherwise be pending and get the next module
module_status[i].ping_punish = PING_PUNISH_NONE;
continue;
}
//compute just how late this module is...
delta = get_usec_time() - module_status[i].last_pong;
switch(module_status[i].ping_punish)
{
case PING_PUNISH_NONE:
if(delta > SUPERVISOR_PING_WARN_TIME)
{
ping_punish_warn(i, delta);
module_status[i].ping_punish++;
}
break;
case PING_PUNISH_WARN:
if(delta > SUPERVISOR_PING_TERM_TIME)
{
ping_punish_term(i, delta);
module_status[i].ping_punish++;
}
break;
case PING_PUNISH_TERM:
if(delta > SUPERVISOR_PING_KILL_TIME)
{
ping_punish_kill(i, delta);
module_status[i].ping_punish++;
}
break;
default:
route_error_message("At our wits end with module %s [%d], just plain won't die!\n", module_name[i], (int)module_status[i].pid);
break;
}
}
}
}
void maintain_ipc_hub_connect(char *progname)
{
struct message_record outgoing_msg;
if(commhub_fd < 0) //if we have no connection to the communication hub
{
commhub_fd = connect_to_message_server(progname); //try and get one
// printf("commhub_fd = %d\n", commhub_fd);
if(commhub_fd >= 0) //if it worked
{
//Subscribe to the command mailboxes we act on
prepare_message(&outgoing_msg, MAILBOX_SUBSCRIBE, MAILBOX_PONG, strlen(MAILBOX_TOKEN_MAG));
send_message(commhub_fd,&outgoing_msg);
//Subscribe to the relevant status management mailboxes
subscribe_to_default_messages(commhub_fd);
}
}
}
void usage(char *progname)
{
fprintf(stderr,"%s: %d modules given to be supervised, minimum = 1, maximum = %d\n", progname, NUM_MODULES, SUPERVISOR_MAX_MODULES - 1);
fprintf(stderr,"%s: Usage: %s [module ...] [--[no-]ping] [module ...]\n", progname, progname);
fprintf(stderr,"%s: By default modules will be subjet to PING monitoring through\n", progname);
fprintf(stderr,"%s: the IPC hub, --no-ping will effect all modules following it until a --ping\n", progname);
fprintf(stderr,"\n");
exit(EX_USAGE);
}
void handle_command_line(int argc, char **argv)
{
int i;
int needs_ping = 1;
NUM_MODULES = 0;
//Walk all of the given command line arguments and add each one as a module we have been asked to supervice
for(i = 1; i < argc; i++)
{
if( !strcmp(argv[i], "--no-ping") )
{
needs_ping = 0;
}
else if ( !strcmp(argv[i], "--ping") )
{
needs_ping = 1;
}
else
{
//Here we look for the last / in the module name
char *foo = rindex(argv[i], '/');
if(foo) //if we found one, advance past it
{
foo++;
}
else //otherwise
{
foo = argv[i]; //use our whole argument
}
if( !strcmp(foo, SUPERVISOR_SPECIAL_BILLDB) ) //make sure it is at the end
{
//if it is, this one is our guy!
error_logging_module = NUM_MODULES;
}
module_status[NUM_MODULES].ping_requested = needs_ping;
module_name[NUM_MODULES++] = argv[i];
}
if(NUM_MODULES >= SUPERVISOR_MAX_MODULES)
{
usage(argv[0]);
}
}
if(NUM_MODULES == 0)
{
usage(argv[0]);
}
}
int main(int argc, char **argv)
{
int poll_ret;
int nfds;
int i;
struct message_record incoming_msg;
char linebuffer[LINE_BUFFER_SIZE] = {0};
int read_ret;
int modnum;
struct pollfd fds[1 + (2 * SUPERVISOR_MAX_MODULES)] = {{0}};
int poll_to_module[1 + (2 * SUPERVISOR_MAX_MODULES)] = {0};
//Parse our command line to figure out what modules we need to supervise
handle_command_line(argc, argv);
//Configure our default signal handlers
configure_signal_handlers(argv[0]);
//As well as our module-specific zombie-reaping flag setter
setup_child_handler();
//This listens for SIGHUP messages and flags us when one has come it.
setup_hup_handler();
//Register our default keep-up-with-system status callbacks
register_system_status_callbacks();
//Except we want to exempt ourselves from the MAILBOX_EXIT and
//MAILBOX_PING so as not to start ping storms or kill ourselves...
register_dispatch_callback(MAILBOX_EXIT, CALLBACK_PREPROCESS, ignore_message, NULL);
register_dispatch_callback(MAILBOX_PING, CALLBACK_PREPROCESS, ignore_message, NULL);
//Add our specific handlers
register_dispatch_callback(MAILBOX_PONG, CALLBACK_USER(1), handle_pong_message, NULL);
// This is the main processing loop which monitors system status and pumps I/O from
//all of the client modules (debug and error messages) and from the IPC hub when it is up
//as indicated by poll().
while( exit_request_status == EXIT_REQUEST_NONE )
{
RESET_WATCHDOG();
//-------------- SYSTEM STATE MAINTENANCE CODE HERE
if( (time(NULL) - last_system_stat) >= SUPERVISOR_SYSTEM_STAT_INTERVAL)
{
monitor_system_status();
last_system_stat = time(NULL);
}
//Do any maintenance on our spawn list as needed
monitor_spawn_list();
//Try and keep in touch with the IPC hub if possible
maintain_ipc_hub_connect(argv[0]);
//If the communications hub is up, we may need to maintain our PING states
if(commhub_fd >= 0)
{
//Send out any PING messages that are required
send_ping_message();
//Deal with any delinquents who don't answer their damn pings
enforce_ping_policy();
if(hup_alert)
{
hup_alert = 0;
send_hup_message();
}
}
else //If there is no connection to the IPC hub
{
clear_ping_attempt(); //clear any PING attempt records we may have active
}
//If our SIGCHLD handler has notified us that there is at least 1 zombie to reap
if(zombie_alert)
{
//Try and reap any zombies that may exist
reap_zombies();
}
//-------------- POLL FILE DESCRIPTOR SET POPULATION HERE
nfds = 0;
//If we have a valid communication hub connection, add it to the poll set
if(commhub_fd >= 0)
{
fds[nfds].fd = commhub_fd;
fds[nfds].events = POLLIN;
poll_to_module[nfds] = -1; //but flag it as NOT belonging to a child module
nfds++;
}
//Then iterate through all of our modules and see if they need service
for(i = 0; i < NUM_MODULES; i++)
{
if(module_status[i].active) //If this child module is flagged as active
{
if(module_status[i].stdout_fd >= 0) //and we have a stdout file descriptor for this child
{
fds[nfds].fd = module_status[i].stdout_fd; //add it to the poll set
fds[nfds].events = POLLIN;
poll_to_module[nfds] = i; //and remember which module it goes to
nfds++;
}
if(module_status[i].stderr_fd >= 0) //if we have a stderr file descriptor for this child
{
fds[nfds].fd = module_status[i].stderr_fd; //add it to the poll set
fds[nfds].events = POLLIN;
poll_to_module[nfds] = i; //and remember which module it goes to
nfds++;
}
}
}
if(nfds > 0) //If we have any file descriptors to poll
{
poll_ret = poll(fds, nfds, POLL_TIMEOUT); //poll them
if(poll_ret < 0) //if poll returns error
{
if(errno == EINTR) //and it's just EINTR
{
continue; //try the body of the while again
}
else //if it is some other error
{
//scream bloody murder
route_debug_message("Poll returned %d, errno = %d (%s)\n", poll_ret, errno, strerror(errno));
sleep(1);
}
}
}
else //If we have zero functional file descriptors
{
sleep(1); //Pretend that we called poll and it timed out
poll_ret = 0;
}
//-------------- I/O PUMPING CODE BELOW
if(poll_ret == 0) //if there are no file descriptors flagged as needing any action
{
continue;
}
for(i = 0; i < nfds; i++)
{
modnum = poll_to_module[i];
if( (fds[i].revents & POLLIN) && (modnum >= 0) ) //If we have input and it is from a module...
{
read_ret = read(fds[i].fd, linebuffer, sizeof(linebuffer) - 1);
if(read_ret == 0)
{
handle_dead_fd(fds[i].fd); //clean up
continue; //and skip this fd for now
}
else if( (read_ret < 0) && (errno != EINTR) )
{
handle_dead_fd(fds[i].fd); //clean up
continue; //and skip this fd for now
}
//Terminate our read...
linebuffer[read_ret] = '\0';
if(fds[i].fd == module_status[modnum].stdout_fd)
{
route_debug_message("%s [%d]: %s", module_name[modnum], (int)module_status[modnum].pid, linebuffer);
}
else if(fds[i].fd == module_status[modnum].stderr_fd)
{
route_error_message("%s [%d]: %s", module_name[modnum], (int)module_status[modnum].pid, linebuffer);
}
else
{
route_debug_message("File descriptor %d does not belong to module %d (%s)!\n", fds[i].fd, modnum, module_name[modnum]);
}
}
else if( (fds[i].revents & POLLIN) && (fds[i].fd == commhub_fd) ) //If we have input and it is from the commhub...
{
read_ret = get_message(commhub_fd, &incoming_msg);
if(read_ret < 0)
{
handle_dead_fd(fds[i].fd); //clean up
continue; //and skip this fd for now
}
else
{
process_message(&incoming_msg); //This passes the received message through the callback list
}
}
//If it looks like we have a closed or invalid descriptor
if(fds[i].revents & (POLLERR | POLLHUP | POLLNVAL))
{
handle_dead_fd(fds[i].fd); //clean up
continue; //and skip this fd for now
}
}
}
route_debug_message("Attempting graceful exit...\n");
for(i = 0; i < NUM_MODULES; i++)
{
if(module_status[i].active)
{
route_debug_message("Sending SIGTERM to module %s [%d]\n", module_name[i], module_status[i].pid);
kill(module_status[i].pid, SIGTERM);
}
}
sleep(1);
return 0;
}