/*
* Copyright (c) 2019 Clementine Computing LLC.
*
* This file is part of PopuFare.
*
* PopuFare is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* PopuFare is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with PopuFare. If not, see .
*
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "../common/common_defs.h"
#include "../commhub/commhub.h"
#include "../commhub/client_utils.h"
#define CLIENT_SUPERVISOR_VERSION "2.1.1"
// Number of modules we are actively tracking
//
int NUM_MODULES = 0;
// File descriptor of our connection to the comm hub
//
int commhub_fd = -1;
// This flag is set by the SIGCHLD handler, and cleared by the reaper function
//
volatile sig_atomic_t zombie_alert = 0;
// This is our flag that the SIGHUP handler sets and is cleared when a HUP has been delivered to the client modules
//
volatile sig_atomic_t hup_alert = 0;
// This is the time of the last new process creation event
//
int last_global_spawn = 0;
// This is the time of our last system status check
//
int last_system_stat = 0;
// This is the number of KB of free memory in the system
//
int system_stat_freemem = 0;
// This is the number of running processes in the system
//
int system_stat_numproc = 0;
// This is the 5 minute load average
//
float system_stat_loadavg = 0;
// This is the error logging module's module number
// (this is the module who's name matches SUPERVISOR_SPECIAL_BILLDB)
//
int error_logging_module = -1;
#define PING_PUNISH_NONE (0)
#define PING_PUNISH_WARN (1)
#define PING_PUNISH_TERM (2)
#define PING_PUNISH_KILL (3)
struct module_record {
// last time this process was started
//
time_t last_start;
// last time this process has reaped
//
time_t last_exit;
// If this flag is nonzero, the specified module is subject to PING monitoring
//
int ping_requested;
// however, if this flag is zero, the specified module is immune to PING monitoring.
// If this is zero, all further fields are invalid
//
int active;
//-------------------------
// If this flag is set, we expect the process to exit (i.e. it's not an error if it does...)
//
int exit_expected;
// Process ID of this child module
//
pid_t pid;
// File descriptor of this child's standard input (write to this to feed child)
//
int stdin_fd;
// File descriptor of this child's standard output (read from this to get output)
//
int stdout_fd;
// File descriptor of this child's standard error (read from this to get error output)
//
int stderr_fd;
// timestamp in microseconds of the last PING message sent to this process
//
long long int last_ping;
// timestamp in microseconds of the last PONG message received from this process
//
long long int last_pong;
// Latest PING->PONG lag for this process
//
long long int ping_lag;
// Used to keep track of PING non-response action sequence
//
int ping_punish;
// This array contains the state for each module
//
} module_status[SUPERVISOR_MAX_MODULES] = {{0}};
// This array contains pointers to the names of each module
//
char *module_name[SUPERVISOR_MAX_MODULES] = {0};
#define PIPE_R (0)
#define PIPE_W (1)
#define STDIN_H (0)
#define STDOUT_H (1)
#define STDERR_H (2)
int route_debug_message(char *fmt, ...) {
va_list ap;
#ifdef SUPERVISOR_LOG_DEBUG_TO_BILLDB
// IF... we have a connection to the commhub, and we have an instance of billdb under our care, and it is alive...
//
if ( (commhub_fd >= 0) && (error_logging_module >= 0) && module_status[error_logging_module].active ) {
int retval;
int len = 0;
struct message_record outgoing_msg;
char payload[MAX_PAYLOAD_LENGTH] = {0};
payload[len++] = LOGLEVEL_DEBUG;
len += make_log_prefix(payload + len, MAX_PAYLOAD_LENGTH - len);
va_start(ap, fmt); /* Initialize the va_list */
len += vsnprintf(payload + len, MAX_PAYLOAD_LENGTH - len, fmt, ap);
va_end(ap);
prepare_message(&outgoing_msg, MAILBOX_BILLING_LOG, payload, len);
retval = send_message(commhub_fd, &outgoing_msg);
if (retval == 0) {
return 0;
}
}
#endif
// IF either that didn't work, or the conditions were not met, send it to syslog...
//
// Initialize the va_list
//
va_start(ap, fmt);
vsyslog(LOG_DEBUG, fmt, ap);
va_end(ap);
return 0;
}
int route_warning_message(char *fmt, ...) {
va_list ap;
#ifdef SUPERVISOR_LOG_WARNING_TO_BILLDB
// IF... we have a connection to the commhub, and we have an instance of billdb under our care, and it is alive...
//
if ( (commhub_fd >= 0) && (error_logging_module >= 0) && module_status[error_logging_module].active ) {
int retval;
int len = 0;
struct message_record outgoing_msg;
char payload[MAX_PAYLOAD_LENGTH] = {0};
payload[len++] = LOGLEVEL_WARN;
len += make_log_prefix(payload + len, MAX_PAYLOAD_LENGTH - len);
va_start(ap, fmt); /* Initialize the va_list */
len += vsnprintf(payload + len, MAX_PAYLOAD_LENGTH - len, fmt, ap);
va_end(ap);
prepare_message(&outgoing_msg, MAILBOX_BILLING_LOG, payload, len);
retval = send_message(commhub_fd, &outgoing_msg);
if (retval == 0) {
return 0;
}
}
#endif
// IF either that didn't work, or the conditions were not met, send it to syslog...
//
va_start(ap, fmt); /* Initialize the va_list */
vsyslog(LOG_WARNING, fmt, ap);
va_end(ap);
return 0;
}
int route_error_message(char *fmt, ...) {
va_list ap;
#ifdef SUPERVISOR_LOG_ERROR_TO_BILLDB
// IF... we have a connection to the commhub, and we have an instance of billdb under our care, and it is alive...
//
if ( (commhub_fd >= 0) && (error_logging_module >= 0) && module_status[error_logging_module].active ) {
int retval;
int len = 0;
struct message_record outgoing_msg;
char payload[MAX_PAYLOAD_LENGTH] = {0};
payload[len++] = LOGLEVEL_ERROR;
len += make_log_prefix(payload + len, MAX_PAYLOAD_LENGTH - len);
// Initialize the va_list
//
va_start(ap, fmt);
len += vsnprintf(payload + len, MAX_PAYLOAD_LENGTH - len, fmt, ap);
va_end(ap);
prepare_message(&outgoing_msg, MAILBOX_BILLING_LOG, payload, len);
retval = send_message(commhub_fd, &outgoing_msg);
if (retval == 0)
{
return 0;
}
}
#endif
// IF either that didn't work, or the conditions were not met, send it to syslog...
//
// Initialize the va_list
//
va_start(ap, fmt);
vsyslog(LOG_ERR, fmt, ap);
va_end(ap);
return 0;
}
void warn_num_proc() {
route_warning_message("System has %d processes running!\n", system_stat_numproc);
}
void warn_load_avg() {
route_warning_message("System load average is %.1f!\n", system_stat_loadavg);
}
void warn_mem_free() {
route_warning_message("System is down to %d KB of free RAM\n", system_stat_freemem);
}
void monitor_system_status() {
int retval;
int kb_free = 0;
int n_procs = 0;
float la5 = 0;
char line[LINE_BUFFER_SIZE] = {0};
FILE *f;
f = fopen("/proc/loadavg", "rb");
if (f) {
fgets(line, sizeof(line), f);
retval = sscanf(line, "%f %*f %*f %*i/%i", &la5, &n_procs);
if (retval == 2) {
system_stat_loadavg = la5;
system_stat_numproc = n_procs;
}
fclose(f);
}
else {
route_error_message("Cannot read /proc/loadavg\n");
}
f = fopen("/proc/meminfo", "rb");
if (f) {
while(!feof(f)) {
fgets(line, sizeof(line), f);
retval = sscanf(line, "MemFree: %i kB\n", &kb_free);
if (retval == 1) {
system_stat_freemem = kb_free;
break;
}
}
fclose(f);
}
else {
route_error_message("Cannot read /proc/meminfo\n");
}
if (system_stat_numproc > SUPERVISOR_WARN_NUM_PROC) {
warn_num_proc();
}
if (system_stat_loadavg > SUPERVISOR_WARN_LOAD_AVG) {
warn_load_avg();
}
if (system_stat_freemem < SUPERVISOR_WARN_LOW_MEM) {
warn_mem_free();
}
}
void send_heartbeat() {
route_debug_message("heartbeat\n");
}
int launch_module(int n) {
pid_t fork_ret;
int retval;
int my_stdin, my_stdout, my_stderr;
int new_stdin[2], new_stdout[2], new_stderr[2];
int record_confirm_pipe[2];
char record_confirm_msg[4] = {'O','K','\n','\0'};
int record_confirm_expect;
if ( (n < 0) || (n >= NUM_MODULES) ) {
return -1;
}
RESET_WATCHDOG();
// Save our actual standard I/O fd's
//
my_stdin = dup(STDIN_H);
my_stdout = dup(STDOUT_H);
my_stderr = dup(STDERR_H);
// Generate pipes to be said handles for the child process
//
pipe(new_stdin);
pipe(new_stdout);
pipe(new_stderr);
// close our actual handles in preparation for forking
//
close(STDIN_H);
close(STDOUT_H);
close(STDERR_H);
// Assign the "far" end of the pipes to our handles so that the child process will use them
//
dup2(new_stdin[PIPE_R], STDIN_H);
dup2(new_stdout[PIPE_W], STDOUT_H);
dup2(new_stderr[PIPE_W], STDERR_H);
// Create a pipe for record-keeping confirmation purposes (this prevents a race condition with SIGCHLD
//
pipe(record_confirm_pipe);
record_confirm_expect = strlen(record_confirm_msg);
// Ask the kernel to create a new process to hold our new module
//
fork_ret = fork();
// If we are the child process
//
if (fork_ret == 0) {
// We don't need this end of the confirmation pipe
//
close(record_confirm_pipe[PIPE_W]);
// Wait for the parent process to confirm that we have been recorded before proceeding
// with the exec attempt so that if it fails, the parent process will know what to do with the
// SIGCHLD signal.
//
retval = read(record_confirm_pipe[PIPE_R], record_confirm_msg, record_confirm_expect);
// If the parent process can't confirm our existence, we're in deep shit...
//
if (retval != record_confirm_expect) {
fprintf(stderr, "Never got recordkeeping confirmation!\n");
exit(EX_UNAVAILABLE);
}
// We are now done with our confirmation pipe
//
close(record_confirm_pipe[PIPE_R]);
// Close the child's copy of the parent's standard I/O fds
//
close(my_stdin);
close(my_stdout);
close(my_stderr);
// Close the parent's end of the pipes connecting the two processes
//
close(new_stdin[PIPE_W]);
close(new_stdout[PIPE_R]);
close(new_stderr[PIPE_R]);
// Attempt to exec the child process now...
//
retval = execl(module_name[n], module_name[n], NULL);
if (retval)
{
fprintf(stderr, "Cannot execl(\"%s\", \"%s\", NULL)!\n", module_name[n], module_name[n]);
exit(EX_UNAVAILABLE);
}
exit(0);
}
// If we are the parent process
//
else if (fork_ret != -1) {
// Record our child PID
//
module_status[n].pid = fork_ret;
// Record our end of the child's standard file descriptors
//
module_status[n].stdin_fd = new_stdin[PIPE_W];
module_status[n].stdout_fd = new_stdout[PIPE_R];
module_status[n].stderr_fd = new_stderr[PIPE_R];
// Reset our list ping/pong times to "never"
//
module_status[n].last_ping = 0;
module_status[n].last_pong = 0;
module_status[n].ping_lag = 0;
module_status[n].ping_punish = PING_PUNISH_NONE;
// Record our process start time
//
module_status[n].last_start = time(NULL);
// This process is not currently expected to die
//
module_status[n].exit_expected = 0;
// Flag this child module as active
//
module_status[n].active = 1;
// Close the opposite end of all of our pipes
//
close(record_confirm_pipe[PIPE_R]);
close(new_stdin[PIPE_R]);
close(new_stdout[PIPE_W]);
close(new_stderr[PIPE_W]);
// Close our copy of the child's standard I/O file descriptors
//
close(STDIN_H);
close(STDOUT_H);
close(STDERR_H);
// Restore the parent's original standard I/O file descriptors
//
dup2(my_stdin, STDIN_H);
dup2(my_stdout, STDOUT_H);
dup2(my_stderr, STDERR_H);
// Unblock the child process by transmitting our confirmation message
//
retval = write(record_confirm_pipe[PIPE_W], record_confirm_msg, record_confirm_expect);
if (retval != record_confirm_expect) {
route_error_message("Cannot transmit recordkeeping confirmation!\n");
}
// We are now done with our recordkeeping confirmation pipe
//
close(record_confirm_pipe[PIPE_W]);
}
// Otherwise we have somehow FAILED to fork()
//
else {
// restore our stderr file descriptor
//
close(STDERR_H);
dup2(my_stderr, STDERR_H);
route_error_message("Cannot fork()!\n");
exit(EX_OSERR);
}
return 0;
}
void expected_exit(int n) {
route_debug_message("Module %s [%d] has finished as expected.\n", module_name[n], (int)module_status[n].pid);
// flag this module as "clean" for the purpose of respawn
// (so we won't count the short runtime of a process we asked to terminate against it)
//
module_status[n].last_start = 0;
module_status[n].last_exit = 0;
}
void unexpected_exit(int n) {
int age;
// Record our last stop time
//
module_status[n].last_exit = time(NULL);
age = (int)module_status[n].last_exit - (int)module_status[n].last_start;
route_warning_message("Module %s [%d] has stopped running unexpectedly after %d seconds.\n", module_name[n], (int)module_status[n].pid, age);
}
void reap_zombies() {
int status = 0;
pid_t retval = 0;
int i;
do {
// Clear the zombie alert HERE so that if things terminate after the loop finishes,
// we'll get flagged. This could result in an extra flag, but we'll never miss one.
// and since we're using the WNOHANG option, there is no harm in going through the waitpid()
// loop one time extra.
zombie_alert = 0;
retval = waitpid(-1, &status, WNOHANG); // Wait for ANY child process
// If waitpid() actually waited for the specified process...
//
if (retval > 0) {
for (i = 0; i < NUM_MODULES; i++) {
// Go through our table of modules looking for an active module with a
//
// matching PID...
//
if ( module_status[i].active && (module_status[i].pid == retval) ) {
// Give us an opportunity to do some clever cleanup if it is merited
// These functions also set the last_exit time (and may zero the last_start
// to flag a process as "clean" for respawn purposes).
//
if (module_status[i].exit_expected) {
expected_exit(i);
}
else {
unexpected_exit(i);
}
// Set this module inactive
//
module_status[i].active = 0;
// Set this module's last ping/pong times to "never"
//
module_status[i].last_ping = 0;
module_status[i].last_pong = 0;
module_status[i].ping_lag = 0;
module_status[i].ping_punish = PING_PUNISH_NONE;
// No more process
//
module_status[i].pid = 0;
// Close our end of this module's standard I/O file descriptors
//
close(module_status[i].stdin_fd);
close(module_status[i].stdout_fd);
close(module_status[i].stderr_fd);
module_status[i].stdin_fd = -1;
module_status[i].stdout_fd = -1;
module_status[i].stderr_fd = -1;
break;
}
}
}
} while(retval > 0);
}
void sigchld_handler(int signum, siginfo_t *info, void *data) {
zombie_alert = 1;
}
void sighup_handler(int signum, siginfo_t *into, void *data) {
hup_alert = 1;
}
void setup_child_handler() {
struct sigaction sa = {{0}};
sa.sa_sigaction = sigchld_handler;
sa.sa_flags = SA_NOCLDSTOP | SA_SIGINFO;
sigfillset(&sa.sa_mask);
sigaction(SIGCHLD, &sa, NULL);
}
void setup_hup_handler() {
struct sigaction sa = {{0}};
sa.sa_sigaction = sighup_handler;
sa.sa_flags = SA_SIGINFO;
sigfillset(&sa.sa_mask);
sigaction(SIGHUP, &sa, NULL);
}
void monitor_spawn_list() {
int i;
int module_runtime;
time_t now;
for (i = 0; i < NUM_MODULES; i++) {
now = time(NULL);
// If it is too soon to attempt spawning any new process, then we're done for the moment
//
if ( (now - last_global_spawn) < SUPERVISOR_GLOBAL_SPAWN_RATE_LIMIT ) break;
// If this module is still active, we don't need to spawn it...
if (module_status[i].active) continue;
// This number will only be valid if last_start != 0
module_runtime = module_status[i].last_exit - module_status[i].last_start;
// If this module either made an expected termination or has never been launcher
//
if (module_status[i].last_start == 0) {
// then update our last global spawn time
//
last_global_spawn = now;
// and launch it...
//
launch_module(i);
route_debug_message("Spawned module %s, PID = %d\n", module_name[i], (int)module_status[i].pid);
}
// otherwise, if it has run before, but terminated unexpectedly in a very short time
//
else if ( module_runtime < SUPERVISOR_RESPAWN_DELAY_THRESHOLD ) {
// and it has been long enough to try again
//
if ( (now - module_status[i].last_exit) >= SUPERVISOR_RESPAWN_RATE_LIMIT) {
// then update our last global spawn time
//
last_global_spawn = now;
// and launch it...
//
launch_module(i);
route_warning_message("Respawned module %s, PID = %d [rate limited]\n", module_name[i], (int)module_status[i].pid);
}
}
// otherwise, the module has terminated unexpectedly, but it had run for a long time, so we can respawn it immediately
//
else {
// then update our last global spawn time
//
last_global_spawn = now;
// and launch it...
//
launch_module(i);
route_debug_message("Respawned module %s, PID = %d\n", module_name[i], (int)module_status[i].pid);
}
}
}
void handle_dead_fd(int fd) {
int i;
close(fd);
if (fd == commhub_fd) {
commhub_fd = -1;
return;
}
for (i = 0; i < NUM_MODULES; i++) {
if (module_status[i].stdin_fd == fd) {
module_status[i].stdin_fd = -1;
return;
}
if (module_status[i].stdout_fd == fd) {
module_status[i].stdout_fd = -1;
return;
}
if (module_status[i].stderr_fd == fd) {
module_status[i].stderr_fd = -1;
return;
}
}
}
message_callback_return handle_pong_message(struct message_record *msg, void *param) {
int i;
// printf("Got PONG from PID %d\n", (int)msg->header.sender);
// Iterate through all of the modules
//
for (i = 0; i < NUM_MODULES; i++) {
// If this module is active, and its PID matches the PID of the sender of this PONG message
//
if ( module_status[i].active && (msg->header.sender == module_status[i].pid) ) {
module_status[i].last_pong = get_usec_time();
module_status[i].ping_lag = (module_status[i].last_pong - module_status[i].last_ping);
// printf("PING->PONG latency %lld microseconds.\n", module_status[i].ping_lag);
}
}
return MESSAGE_HANDLED_CONT;
}
//----
// Client_supervisor is now in charge of managing the state_info file,
// so we need to have gps, stop and driver messages handled explicitely
// to be able to write to the state_info file.
// `client_supervisor` should be the sole process writing to the state_info
// file while any other client should be able to read.
//
message_callback_return handle_gps_message(struct message_record *msg, void *param) {
// guard against version mismatch
//
if (msg->header.payload_length != sizeof(gps_status)) {
return MESSAGE_HANDLED_STOP;
}
memcpy(&gps_stat, msg->payload, sizeof(gps_status)); //otherwise, update our structure
update_state_info_with_gps(&state_info, &gps_stat);
memcpy(state_info.comment, "client_supervisor gps", strlen("client_supervisor gps")+1);
set_state_info(&state_info);
return MESSAGE_HANDLED_CONT;
}
static message_callback_return handle_stop_message(struct message_record *msg, void *param) {
// guard against version mismatch
//
if (msg->header.payload_length != sizeof(stop_status)) {
return MESSAGE_HANDLED_STOP;
}
memcpy(&stop_stat, msg->payload, sizeof(stop_status));
update_state_info_with_stop(&state_info, &stop_stat);
memcpy(state_info.comment, "client_supervisor stop", strlen("client_supervisor stop")+1);
set_state_info(&state_info);
return MESSAGE_HANDLED_CONT;
}
static message_callback_return handle_driver_message(struct message_record *msg, void *param) {
// guard against version mismatch
//
if (msg->header.payload_length != sizeof(driver_status)) {
return MESSAGE_HANDLED_STOP;
}
memcpy(&driver_stat, msg->payload, sizeof(driver_stat));
update_state_info_with_driver(&state_info, &driver_stat);
memcpy(state_info.comment, "client_supervisor driver", strlen("client_supervisor driver")+1);
set_state_info(&state_info);
return MESSAGE_HANDLED_CONT;
}
//----
void clear_ping_attempt() {
int i;
for (i = 0; i < NUM_MODULES; i++) {
if (module_status[i].active) {
module_status[i].last_ping = module_status[i].last_pong = 0;
module_status[i].ping_punish = PING_PUNISH_NONE;
}
}
}
void send_ping_message()
{
struct message_record outgoing_msg;
long long int usec_now;
char target[MAILBOX_NAME_MAX + 1];
int i;
if (commhub_fd < 0) { return; }
for (i = 0; i < NUM_MODULES; i++) {
// If this module is active AND included in PING monitoring
//
if ( module_status[i].active && module_status[i].ping_requested ) {
usec_now = get_usec_time();
// If it is not yet time to re-ping this module
//
if ( (usec_now - module_status[i].last_ping) < SUPERVISOR_PING_INTERVAL) {
// skip it for now
//
continue;
}
// If we have PING'd this host but are still waiting for a PONG back
//
if (module_status[i].last_pong < module_status[i].last_ping) {
// Don't flood it with pings
//
continue;
}
// If we've never PING'd this host, set it's PONG time to the same as its PING
// time so it doesn't get killed for not having responded to its first PING.
//
if (module_status[i].last_ping == 0) {
module_status[i].last_pong = usec_now;
}
module_status[i].last_ping = usec_now;
// Turn our PID into a magic mailbox name that addresses that PID.
//
sprintf(target, ">%d", (int)module_status[i].pid);
// Prepare a message to that magic mailbox with a payload consisting of the mailbox
// name of the PING notification
//
prepare_message(&outgoing_msg, target, MAILBOX_PING, strlen(MAILBOX_PING));
// And send it!
//
send_message(commhub_fd, &outgoing_msg);
}
}
}
void send_hup_message() {
struct message_record outgoing_msg;
if (commhub_fd < 0) {
return;
}
printf("Sending HUP\n");
prepare_message(&outgoing_msg, MAILBOX_HUP, "", 0);
send_message(commhub_fd, &outgoing_msg);
}
void ping_punish_warn(int n, long long int delta) {
route_debug_message("Module %s [%d] hasn't responded to a PING in %d seconds...\n", module_name[n], (int)module_status[n].pid, (int)USEC_TO_SEC(delta));
}
void ping_punish_term(int n, long long int delta) {
route_warning_message("Module %s [%d] hasn't responded to a PING in %d seconds... Sending SIGTERM\n", module_name[n], (int)module_status[n].pid, (int)USEC_TO_SEC(delta));
kill(module_status[n].pid, SIGTERM);
}
void ping_punish_kill(int n, long long int delta) {
route_error_message("Module %s [%d] hasn't responded to a PING in %d seconds... Sending SIGKILL\n", module_name[n], (int)module_status[n].pid, (int)USEC_TO_SEC(delta));
kill(module_status[n].pid, SIGKILL);
}
void enforce_ping_policy()
{
int i;
long long int delta = 0;
for (i = 0; i < NUM_MODULES; i++) {
//If this module is both active AND included in the ping monitoring
//
if ( module_status[i].active && module_status[i].ping_requested ) {
// if we haven't sent a PING to this one...
//
if (module_status[i].last_ping == 0) {
//clear any punishment that would otherwise be pending and get the next module
//
module_status[i].ping_punish = PING_PUNISH_NONE;
continue;
}
//compute just how late this module is...
//
delta = get_usec_time() - module_status[i].last_pong;
switch(module_status[i].ping_punish) {
case PING_PUNISH_NONE:
if (delta > SUPERVISOR_PING_WARN_TIME) {
ping_punish_warn(i, delta);
module_status[i].ping_punish++;
}
break;
case PING_PUNISH_WARN:
if (delta > SUPERVISOR_PING_TERM_TIME) {
ping_punish_term(i, delta);
module_status[i].ping_punish++;
}
break;
case PING_PUNISH_TERM:
if (delta > SUPERVISOR_PING_KILL_TIME) {
ping_punish_kill(i, delta);
module_status[i].ping_punish++;
}
break;
default:
route_error_message("At our wits end with module %s [%d], just plain won't die!\n", module_name[i], (int)module_status[i].pid);
break;
}
}
}
}
void maintain_ipc_hub_connect(char *progname) {
struct message_record outgoing_msg;
// if we have no connection to the communication hub
//
if (commhub_fd < 0) {
// try and get one
//
commhub_fd = connect_to_message_server(progname);
// printf("commhub_fd = %d\n", commhub_fd);
// if it worked
//
if (commhub_fd >= 0) {
//Subscribe to the command mailboxes we act on
//
prepare_message(&outgoing_msg, MAILBOX_SUBSCRIBE, MAILBOX_PONG, strlen(MAILBOX_TOKEN_MAG));
send_message(commhub_fd,&outgoing_msg);
//Subscribe to the relevant status management mailboxes
//
subscribe_to_default_messages(commhub_fd);
}
}
}
void print_version(FILE *fp) {
fprintf(fp, "version %s\n", CLIENT_SUPERVISOR_VERSION);
}
void usage(FILE *fp, char *progname) {
fprintf(fp,"%s: %d modules given to be supervised, minimum = 1, maximum = %d\n", progname, NUM_MODULES, SUPERVISOR_MAX_MODULES - 1);
fprintf(fp,"%s: Usage: %s [module ...] [--[no-]ping] [module ...]\n", progname, progname);
fprintf(fp,"%s: By default modules will be subjet to PING monitoring through\n", progname);
fprintf(fp,"%s: the IPC hub, --no-ping will effect all modules following it until a --ping\n", progname);
fprintf(fp,"\n");
exit(EX_USAGE);
}
void handle_command_line(int argc, char **argv) {
int i;
int needs_ping = 1;
NUM_MODULES = 0;
// Walk all of the given command line arguments and add each one as a module we have been asked to supervice
//
for (i = 1; i < argc; i++) {
if ( !strcmp(argv[i], "--no-ping") ) {
needs_ping = 0;
}
else if ( !strcmp(argv[i], "--ping") ) {
needs_ping = 1;
}
else if ( !strcmp(argv[i], "-v") ) {
print_version(stdout);
exit(0);
}
else if ( !strcmp(argv[i], "-h") ) {
usage(stdout, argv[0]);
exit(EX_USAGE);
}
else {
// Here we look for the last / in the module name
//
char *foo = rindex(argv[i], '/');
// if we found one, advance past it
//
if (foo) { foo++; }
// otherwise
//
else {
// use our whole argument
//
foo = argv[i];
}
// make sure it is at the end
//
if ( !strcmp(foo, SUPERVISOR_SPECIAL_BILLDB) ) {
//if it is, this one is our guy!
//
error_logging_module = NUM_MODULES;
}
module_status[NUM_MODULES].ping_requested = needs_ping;
module_name[NUM_MODULES++] = argv[i];
}
if (NUM_MODULES >= SUPERVISOR_MAX_MODULES) {
usage(stderr, argv[0]);
}
}
if (NUM_MODULES == 0) {
usage(stderr, argv[0]);
}
}
int main(int argc, char **argv) {
int poll_ret;
int nfds;
int i;
struct message_record incoming_msg;
char linebuffer[LINE_BUFFER_SIZE] = {0};
int read_ret;
int modnum;
struct pollfd fds[1 + (2 * SUPERVISOR_MAX_MODULES)] = {{0}};
int poll_to_module[1 + (2 * SUPERVISOR_MAX_MODULES)] = {0};
init_state_info();
// Parse our command line to figure out what modules we need to supervise
//
handle_command_line(argc, argv);
// Configure our default signal handlers
//
configure_signal_handlers(argv[0]);
// As well as our module-specific zombie-reaping flag setter
//
setup_child_handler();
// This listens for SIGHUP messages and flags us when one has come it.
//
setup_hup_handler();
// Register our default keep-up-with-system status callbacks
//
register_system_status_callbacks();
// Except we want to exempt ourselves from the MAILBOX_EXIT and
// MAILBOX_PING so as not to start ping storms or kill ourselves...
//
register_dispatch_callback(MAILBOX_EXIT, CALLBACK_PREPROCESS, ignore_message, NULL);
register_dispatch_callback(MAILBOX_PING, CALLBACK_PREPROCESS, ignore_message, NULL);
// Add our specific handlers
//
register_dispatch_callback(MAILBOX_PONG, CALLBACK_USER(1), handle_pong_message, NULL);
register_dispatch_callback(MAILBOX_GPS_STATUS, CALLBACK_USER(1), handle_gps_message, NULL);
register_dispatch_callback(MAILBOX_STOP_STATUS, CALLBACK_USER(1), handle_stop_message, NULL);
register_dispatch_callback(MAILBOX_DRIVER_STATUS, CALLBACK_USER(1), handle_driver_message, NULL);
// This is the main processing loop which monitors system status and pumps I/O from
// all of the client modules (debug and error messages) and from the IPC hub when it is up
// as indicated by poll().
//
while( exit_request_status == EXIT_REQUEST_NONE ) {
RESET_WATCHDOG();
//-------------- SYSTEM STATE MAINTENANCE CODE HERE
if ( (time(NULL) - last_system_stat) >= SUPERVISOR_SYSTEM_STAT_INTERVAL) {
monitor_system_status();
send_heartbeat();
last_system_stat = time(NULL);
}
// Do any maintenance on our spawn list as needed
//
monitor_spawn_list();
// Try and keep in touch with the IPC hub if possible
//
maintain_ipc_hub_connect(argv[0]);
// If the communications hub is up, we may need to maintain our PING states
//
if (commhub_fd >= 0) {
// Send out any PING messages that are required
//
send_ping_message();
// Deal with any delinquents who don't answer their damn pings
//
enforce_ping_policy();
if (hup_alert) {
hup_alert = 0;
send_hup_message();
}
}
// If there is no connection to the IPC hub
//
else {
// clear any PING attempt records we may have active
//
clear_ping_attempt();
}
// If our SIGCHLD handler has notified us that there is at least 1 zombie to reap
//
if (zombie_alert) {
// Try and reap any zombies that may exist
//
reap_zombies();
}
//-------------- POLL FILE DESCRIPTOR SET POPULATION HERE
//
nfds = 0;
// If we have a valid communication hub connection, add it to the poll set
//
if (commhub_fd >= 0) {
fds[nfds].fd = commhub_fd;
fds[nfds].events = POLLIN;
// but flag it as NOT belonging to a child module
//
poll_to_module[nfds] = -1;
nfds++;
}
// Then iterate through all of our modules and see if they need service
//
for (i = 0; i < NUM_MODULES; i++) {
// If this child module is flagged as active
//
if (module_status[i].active) {
// and we have a stdout file descriptor for this child
//
if (module_status[i].stdout_fd >= 0) {
// add it to the poll set
//
fds[nfds].fd = module_status[i].stdout_fd;
fds[nfds].events = POLLIN;
// and remember which module it goes to
//
poll_to_module[nfds] = i;
nfds++;
}
// if we have a stderr file descriptor for this child
//
if (module_status[i].stderr_fd >= 0) {
// add it to the poll set
//
fds[nfds].fd = module_status[i].stderr_fd;
fds[nfds].events = POLLIN;
// and remember which module it goes to
//
poll_to_module[nfds] = i;
nfds++;
}
}
}
// If we have any file descriptors to poll
//
if (nfds > 0) {
// poll them
//
poll_ret = poll(fds, nfds, POLL_TIMEOUT);
// if poll returns error
//
if (poll_ret < 0) {
// and it's just EINTR
//
if (errno == EINTR) {
// try the body of the while again
//
continue;
}
// if it is some other error
//
else {
// scream bloody murder
//
route_debug_message("Poll returned %d, errno = %d (%s)\n", poll_ret, errno, strerror(errno));
sleep(1);
}
}
}
// If we have zero functional file descriptors
//
else {
// Pretend that we called poll and it timed out
//
sleep(1);
poll_ret = 0;
}
//-------------- I/O PUMPING CODE BELOW
// if there are no file descriptors flagged as needing any action
//
if (poll_ret == 0) { continue; }
for (i = 0; i < nfds; i++) {
modnum = poll_to_module[i];
// If we have input and it is from a module...
//
if ( (fds[i].revents & POLLIN) && (modnum >= 0) ) {
read_ret = read(fds[i].fd, linebuffer, sizeof(linebuffer) - 1);
if (read_ret == 0) {
// clean up
// and skip this fd for now
//
handle_dead_fd(fds[i].fd);
continue;
}
else if ( (read_ret < 0) && (errno != EINTR) ) {
// clean up
// and skip this fd for now
//
handle_dead_fd(fds[i].fd);
continue;
}
// Terminate our read...
//
linebuffer[read_ret] = '\0';
if (fds[i].fd == module_status[modnum].stdout_fd) {
//route_debug_message("%s [%d]: %s", module_name[modnum], (int)module_status[modnum].pid, linebuffer);
}
else if (fds[i].fd == module_status[modnum].stderr_fd) {
route_error_message("%s [%d]: %s", module_name[modnum], (int)module_status[modnum].pid, linebuffer);
}
else {
route_debug_message("File descriptor %d does not belong to module %d (%s)!\n", fds[i].fd, modnum, module_name[modnum]);
}
}
// If we have input and it is from the commhub...
//
else if ( (fds[i].revents & POLLIN) && (fds[i].fd == commhub_fd) ) {
read_ret = get_message(commhub_fd, &incoming_msg);
if (read_ret < 0) {
// clean up
// and skip this fd for now
//
handle_dead_fd(fds[i].fd);
continue;
}
else {
// This passes the received message through the callback list
//
process_message(&incoming_msg);
}
}
// If it looks like we have a closed or invalid descriptor
//
if (fds[i].revents & (POLLERR | POLLHUP | POLLNVAL)) {
// clean up
// and skip this fd for now
//
handle_dead_fd(fds[i].fd);
continue;
}
}
}
route_debug_message("Attempting graceful exit...\n");
for (i = 0; i < NUM_MODULES; i++) {
if (module_status[i].active) {
route_debug_message("Sending SIGTERM to module %s [%d]\n", module_name[i], module_status[i].pid);
kill(module_status[i].pid, SIGTERM);
}
}
sleep(1);
return 0;
}