client_supervisor.c 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153
  1. /*
  2. * Copyright (c) 2019 Clementine Computing LLC.
  3. *
  4. * This file is part of PopuFare.
  5. *
  6. * PopuFare is free software: you can redistribute it and/or modify
  7. * it under the terms of the GNU Affero General Public License as published by
  8. * the Free Software Foundation, either version 3 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * PopuFare is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU Affero General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU Affero General Public License
  17. * along with PopuFare. If not, see <https://www.gnu.org/licenses/>.
  18. *
  19. */
  20. #include <linux/limits.h>
  21. #include <sys/types.h>
  22. #include <sys/wait.h>
  23. #include <sysexits.h>
  24. #include <stdio.h>
  25. #include <stdlib.h>
  26. #include <unistd.h>
  27. #include <signal.h>
  28. #include <string.h>
  29. #include <strings.h>
  30. #include <errno.h>
  31. #include <time.h>
  32. #include <poll.h>
  33. #include <syslog.h>
  34. #include <stdarg.h>
  35. #include "../common/common_defs.h"
  36. #include "../commhub/commhub.h"
  37. #include "../commhub/client_utils.h"
  38. int NUM_MODULES = 0; //Number of modules we are actively tracking
  39. int commhub_fd = -1; //File descriptor of our connection to the comm hub
  40. int zombie_alert = 0; //This flag is set by the SIGCHLD handler, and cleared by the reaper function
  41. int hup_alert = 0; //This is our flag that the SIGHUP handler sets and is cleared when a HUP has been delivered to the client modules
  42. int last_global_spawn = 0; //This is the time of the last new process creation event
  43. int last_system_stat = 0; //This is the time of our last system status check
  44. int system_stat_freemem = 0; //This is the number of KB of free memory in the system
  45. int system_stat_numproc = 0; //This is the number of running processes in the system
  46. float system_stat_loadavg = 0; //This is the 5 minute load average
  47. int error_logging_module = -1; //This is the error logging module's module number
  48. //(this is the module who's name matches SUPERVISOR_SPECIAL_BILLDB)
  49. #define PING_PUNISH_NONE (0)
  50. #define PING_PUNISH_WARN (1)
  51. #define PING_PUNISH_TERM (2)
  52. #define PING_PUNISH_KILL (3)
  53. struct module_record
  54. {
  55. time_t last_start; //last time this process was started
  56. time_t last_exit; //last time this process has reaped
  57. int ping_requested; //If this flag is nonzero, the specified module is subject to PING monitoring
  58. //however, if this flag is zero, the specified module is immune to PING monitoring.
  59. int active; //If this is zero, all further fields are invalid
  60. //-------------------------
  61. int exit_expected; //If this flag is set, we expect the process to exit (i.e. it's not an error if it does...)
  62. pid_t pid; //Process ID of this child module
  63. int stdin_fd; //File descriptor of this child's standard input (write to this to feed child)
  64. int stdout_fd; //File descriptor of this child's standard output (read from this to get output)
  65. int stderr_fd; //File descriptor of this child's standard error (read from this to get error output)
  66. long long int last_ping; //timestamp in microseconds of the last PING message sent to this process
  67. long long int last_pong; //timestamp in microseconds of the last PONG message received from this process
  68. long long int ping_lag; //Latest PING->PONG lag for this process
  69. int ping_punish; //Used to keep track of PING non-response action sequence
  70. } module_status[SUPERVISOR_MAX_MODULES] = {{0}}; //This array contains the state for each module
  71. char *module_name[SUPERVISOR_MAX_MODULES] = {0}; //This array contains pointers to the names of each module
  72. #define PIPE_R (0)
  73. #define PIPE_W (1)
  74. #define STDIN_H (0)
  75. #define STDOUT_H (1)
  76. #define STDERR_H (2)
  77. int route_debug_message(char *fmt, ...)
  78. {
  79. va_list ap;
  80. #ifdef SUPERVISOR_LOG_DEBUG_TO_BILLDB
  81. //IF... we have a connection to the commhub, and we have an instance of billdb under our care, and it is alive...
  82. if( (commhub_fd >= 0) && (error_logging_module >= 0) && module_status[error_logging_module].active )
  83. {
  84. int retval;
  85. int len = 0;
  86. struct message_record outgoing_msg;
  87. char payload[MAX_PAYLOAD_LENGTH] = {0};
  88. payload[len++] = LOGLEVEL_DEBUG;
  89. len += make_log_prefix(payload + len, MAX_PAYLOAD_LENGTH - len);
  90. va_start(ap, fmt); /* Initialize the va_list */
  91. len += vsnprintf(payload + len, MAX_PAYLOAD_LENGTH - len, fmt, ap);
  92. va_end(ap);
  93. prepare_message(&outgoing_msg, MAILBOX_BILLING_LOG, payload, len);
  94. retval = send_message(commhub_fd, &outgoing_msg);
  95. if(retval == 0)
  96. {
  97. return 0;
  98. }
  99. }
  100. #endif
  101. //IF either that didn't work, or the conditions were not met, send it to syslog...
  102. va_start(ap, fmt); /* Initialize the va_list */
  103. vsyslog(LOG_DEBUG, fmt, ap);
  104. va_end(ap);
  105. return 0;
  106. }
  107. int route_warning_message(char *fmt, ...)
  108. {
  109. va_list ap;
  110. #ifdef SUPERVISOR_LOG_WARNING_TO_BILLDB
  111. //IF... we have a connection to the commhub, and we have an instance of billdb under our care, and it is alive...
  112. if( (commhub_fd >= 0) && (error_logging_module >= 0) && module_status[error_logging_module].active )
  113. {
  114. int retval;
  115. int len = 0;
  116. struct message_record outgoing_msg;
  117. char payload[MAX_PAYLOAD_LENGTH] = {0};
  118. payload[len++] = LOGLEVEL_WARN;
  119. len += make_log_prefix(payload + len, MAX_PAYLOAD_LENGTH - len);
  120. va_start(ap, fmt); /* Initialize the va_list */
  121. len += vsnprintf(payload + len, MAX_PAYLOAD_LENGTH - len, fmt, ap);
  122. va_end(ap);
  123. prepare_message(&outgoing_msg, MAILBOX_BILLING_LOG, payload, len);
  124. retval = send_message(commhub_fd, &outgoing_msg);
  125. if(retval == 0)
  126. {
  127. return 0;
  128. }
  129. }
  130. #endif
  131. //IF either that didn't work, or the conditions were not met, send it to syslog...
  132. va_start(ap, fmt); /* Initialize the va_list */
  133. vsyslog(LOG_WARNING, fmt, ap);
  134. va_end(ap);
  135. return 0;
  136. }
  137. int route_error_message(char *fmt, ...)
  138. {
  139. va_list ap;
  140. #ifdef SUPERVISOR_LOG_ERROR_TO_BILLDB
  141. //IF... we have a connection to the commhub, and we have an instance of billdb under our care, and it is alive...
  142. if( (commhub_fd >= 0) && (error_logging_module >= 0) && module_status[error_logging_module].active )
  143. {
  144. int retval;
  145. int len = 0;
  146. struct message_record outgoing_msg;
  147. char payload[MAX_PAYLOAD_LENGTH] = {0};
  148. payload[len++] = LOGLEVEL_ERROR;
  149. len += make_log_prefix(payload + len, MAX_PAYLOAD_LENGTH - len);
  150. va_start(ap, fmt); /* Initialize the va_list */
  151. len += vsnprintf(payload + len, MAX_PAYLOAD_LENGTH - len, fmt, ap);
  152. va_end(ap);
  153. prepare_message(&outgoing_msg, MAILBOX_BILLING_LOG, payload, len);
  154. retval = send_message(commhub_fd, &outgoing_msg);
  155. if(retval == 0)
  156. {
  157. return 0;
  158. }
  159. }
  160. #endif
  161. //IF either that didn't work, or the conditions were not met, send it to syslog...
  162. va_start(ap, fmt); /* Initialize the va_list */
  163. vsyslog(LOG_ERR, fmt, ap);
  164. va_end(ap);
  165. return 0;
  166. }
  167. void warn_num_proc()
  168. {
  169. route_warning_message("System has %d processes running!\n", system_stat_numproc);
  170. }
  171. void warn_load_avg()
  172. {
  173. route_warning_message("System load average is %.1f!\n", system_stat_loadavg);
  174. }
  175. void warn_mem_free()
  176. {
  177. route_warning_message("System is down to %d KB of free RAM\n", system_stat_freemem);
  178. }
  179. void monitor_system_status()
  180. {
  181. int retval;
  182. int kb_free = 0;
  183. int n_procs = 0;
  184. float la5 = 0;
  185. char line[LINE_BUFFER_SIZE] = {0};
  186. FILE *f;
  187. f = fopen("/proc/loadavg", "rb");
  188. if(f)
  189. {
  190. fgets(line, sizeof(line), f);
  191. retval = sscanf(line, "%f %*f %*f %*i/%i", &la5, &n_procs);
  192. if(retval == 2)
  193. {
  194. system_stat_loadavg = la5;
  195. system_stat_numproc = n_procs;
  196. }
  197. fclose(f);
  198. }
  199. else
  200. {
  201. route_error_message("Cannot read /proc/loadavg\n");
  202. }
  203. f = fopen("/proc/meminfo", "rb");
  204. if(f)
  205. {
  206. while(!feof(f))
  207. {
  208. fgets(line, sizeof(line), f);
  209. retval = sscanf(line, "MemFree: %i kB\n", &kb_free);
  210. if(retval == 1)
  211. {
  212. system_stat_freemem = kb_free;
  213. break;
  214. }
  215. }
  216. fclose(f);
  217. }
  218. else
  219. {
  220. route_error_message("Cannot read /proc/meminfo\n");
  221. }
  222. if(system_stat_numproc > SUPERVISOR_WARN_NUM_PROC)
  223. {
  224. warn_num_proc();
  225. }
  226. if(system_stat_loadavg > SUPERVISOR_WARN_LOAD_AVG)
  227. {
  228. warn_load_avg();
  229. }
  230. if(system_stat_freemem < SUPERVISOR_WARN_LOW_MEM)
  231. {
  232. warn_mem_free();
  233. }
  234. }
  235. int launch_module(int n)
  236. {
  237. pid_t fork_ret;
  238. int retval;
  239. int my_stdin, my_stdout, my_stderr;
  240. int new_stdin[2], new_stdout[2], new_stderr[2];
  241. int record_confirm_pipe[2];
  242. char record_confirm_msg[4] = {'O','K','\n','\0'};
  243. int record_confirm_expect;
  244. if( (n < 0) || (n >= NUM_MODULES) )
  245. {
  246. return -1;
  247. }
  248. RESET_WATCHDOG();
  249. //Save our actual standard I/O fd's
  250. my_stdin = dup(STDIN_H);
  251. my_stdout = dup(STDOUT_H);
  252. my_stderr = dup(STDERR_H);
  253. //Generate pipes to be said handles for the child process
  254. pipe(new_stdin);
  255. pipe(new_stdout);
  256. pipe(new_stderr);
  257. //close our actual handles in preparation for forking
  258. close(STDIN_H);
  259. close(STDOUT_H);
  260. close(STDERR_H);
  261. //Assign the "far" end of the pipes to our handles so that the child process will use them
  262. dup2(new_stdin[PIPE_R], STDIN_H);
  263. dup2(new_stdout[PIPE_W], STDOUT_H);
  264. dup2(new_stderr[PIPE_W], STDERR_H);
  265. //Create a pipe for record-keeping confirmation purposes (this prevents a race condition with SIGCHLD
  266. pipe(record_confirm_pipe);
  267. record_confirm_expect = strlen(record_confirm_msg);
  268. //Ask the kernel to create a new process to hold our new module
  269. fork_ret = fork();
  270. if(fork_ret == 0) //If we are the child process
  271. {
  272. //We don't need this end of the confirmation pipe
  273. close(record_confirm_pipe[PIPE_W]);
  274. // Wait for the parent process to confirm that we have been recorded before proceeding
  275. //with the exec attempt so that if it fails, the parent process will know what to do with the
  276. //SIGCHLD signal.
  277. retval = read(record_confirm_pipe[PIPE_R], record_confirm_msg, record_confirm_expect);
  278. //If the parent process can't confirm our existence, we're in deep shit...
  279. if(retval != record_confirm_expect)
  280. {
  281. fprintf(stderr, "Never got recordkeeping confirmation!\n");
  282. exit(EX_UNAVAILABLE);
  283. }
  284. //We are now done with our confirmation pipe
  285. close(record_confirm_pipe[PIPE_R]);
  286. //Close the child's copy of the parent's standard I/O fds
  287. close(my_stdin);
  288. close(my_stdout);
  289. close(my_stderr);
  290. //Close the parent's end of the pipes connecting the two processes
  291. close(new_stdin[PIPE_W]);
  292. close(new_stdout[PIPE_R]);
  293. close(new_stderr[PIPE_R]);
  294. //Attempt to exec the child process now...
  295. retval = execl(module_name[n], module_name[n], NULL);
  296. if(retval)
  297. {
  298. fprintf(stderr, "Cannot execl(\"%s\", \"%s\", NULL)!\n", module_name[n], module_name[n]);
  299. exit(EX_UNAVAILABLE);
  300. }
  301. exit(0);
  302. }
  303. else if(fork_ret != -1) //If we are the parent process
  304. {
  305. //Record our child PID
  306. module_status[n].pid = fork_ret;
  307. //Record our end of the child's standard file descriptors
  308. module_status[n].stdin_fd = new_stdin[PIPE_W];
  309. module_status[n].stdout_fd = new_stdout[PIPE_R];
  310. module_status[n].stderr_fd = new_stderr[PIPE_R];
  311. //Reset our list ping/pong times to "never"
  312. module_status[n].last_ping = 0;
  313. module_status[n].last_pong = 0;
  314. module_status[n].ping_lag = 0;
  315. module_status[n].ping_punish = PING_PUNISH_NONE;
  316. //Record our process start time
  317. module_status[n].last_start = time(NULL);
  318. //This process is not currently expected to die
  319. module_status[n].exit_expected = 0;
  320. //Flag this child module as active
  321. module_status[n].active = 1;
  322. //Close the opposite end of all of our pipes
  323. close(record_confirm_pipe[PIPE_R]);
  324. close(new_stdin[PIPE_R]);
  325. close(new_stdout[PIPE_W]);
  326. close(new_stderr[PIPE_W]);
  327. //Close our copy of the child's standard I/O file descriptors
  328. close(STDIN_H);
  329. close(STDOUT_H);
  330. close(STDERR_H);
  331. //Restore the parent's original standard I/O file descriptors
  332. dup2(my_stdin, STDIN_H);
  333. dup2(my_stdout, STDOUT_H);
  334. dup2(my_stderr, STDERR_H);
  335. //Unblock the child process by transmitting our confirmation message
  336. retval = write(record_confirm_pipe[PIPE_W], record_confirm_msg, record_confirm_expect);
  337. if(retval != record_confirm_expect)
  338. {
  339. route_error_message("Cannot transmit recordkeeping confirmation!\n");
  340. }
  341. //We are now done with our recordkeeping confirmation pipe
  342. close(record_confirm_pipe[PIPE_W]);
  343. }
  344. else //Otherwise we have somehow FAILED to fork()
  345. {
  346. //restore our stderr file descriptor
  347. close(STDERR_H);
  348. dup2(my_stderr, STDERR_H);
  349. route_error_message("Cannot fork()!\n");
  350. exit(EX_OSERR);
  351. }
  352. return 0;
  353. }
  354. void expected_exit(int n)
  355. {
  356. route_debug_message("Module %s [%d] has finished as expected.\n", module_name[n], (int)module_status[n].pid);
  357. //flag this module as "clean" for the purpose of respawn
  358. //(so we won't count the short runtime of a process we asked to terminate against it)
  359. module_status[n].last_start = 0;
  360. module_status[n].last_exit = 0;
  361. }
  362. void unexpected_exit(int n)
  363. {
  364. int age;
  365. //Record our last stop time
  366. module_status[n].last_exit = time(NULL);
  367. age = (int)module_status[n].last_exit - (int)module_status[n].last_start;
  368. route_warning_message("Module %s [%d] has stopped running unexpectedly after %d seconds.\n", module_name[n], (int)module_status[n].pid, age);
  369. }
  370. void reap_zombies()
  371. {
  372. int status = 0;
  373. pid_t retval = 0;
  374. int i;
  375. do
  376. {
  377. // Clear the zombie alert HERE so that if things terminate after the loop finishes,
  378. //we'll get flagged. This could result in an extra flag, but we'll never miss one.
  379. //and since we're using the WNOHANG option, there is no harm in going through the waitpid()
  380. //loop one time extra.
  381. zombie_alert = 0;
  382. retval = waitpid(-1, &status, WNOHANG); //Wait for ANY child process
  383. if(retval > 0) //If waitpid() actually waited for the specified process...
  384. {
  385. for(i = 0; i < NUM_MODULES; i++)
  386. {
  387. //Go through our table of modules looking for an active module with a
  388. //matching PID...
  389. if( module_status[i].active && (module_status[i].pid == retval) )
  390. {
  391. //Give us an opportunity to do some clever cleanup if it is merited
  392. //These functions also set the last_exit time (and may zero the last_start
  393. //to flag a process as "clean" for respawn purposes).
  394. if(module_status[i].exit_expected)
  395. {
  396. expected_exit(i);
  397. }
  398. else
  399. {
  400. unexpected_exit(i);
  401. }
  402. //Set this module inactive
  403. module_status[i].active = 0;
  404. //Set this module's last ping/pong times to "never"
  405. module_status[i].last_ping = 0;
  406. module_status[i].last_pong = 0;
  407. module_status[i].ping_lag = 0;
  408. module_status[i].ping_punish = PING_PUNISH_NONE;
  409. //No more process
  410. module_status[i].pid = 0;
  411. //Close our end of this module's standard I/O file descriptors
  412. close(module_status[i].stdin_fd);
  413. close(module_status[i].stdout_fd);
  414. close(module_status[i].stderr_fd);
  415. module_status[i].stdin_fd = -1;
  416. module_status[i].stdout_fd = -1;
  417. module_status[i].stderr_fd = -1;
  418. break;
  419. }
  420. }
  421. }
  422. } while(retval > 0);
  423. }
  424. void sigchld_handler(int signum, siginfo_t *info, void *data)
  425. {
  426. zombie_alert = 1;
  427. }
  428. void sighup_handler(int signum, siginfo_t *into, void *data)
  429. {
  430. hup_alert = 1;
  431. }
  432. void setup_child_handler()
  433. {
  434. struct sigaction sa = {{0}};
  435. sa.sa_sigaction = sigchld_handler;
  436. sa.sa_flags = SA_NOCLDSTOP | SA_SIGINFO;
  437. sigfillset(&sa.sa_mask);
  438. sigaction(SIGCHLD, &sa, NULL);
  439. }
  440. void setup_hup_handler()
  441. {
  442. struct sigaction sa = {{0}};
  443. sa.sa_sigaction = sighup_handler;
  444. sa.sa_flags = SA_SIGINFO;
  445. sigfillset(&sa.sa_mask);
  446. sigaction(SIGHUP, &sa, NULL);
  447. }
  448. void monitor_spawn_list()
  449. {
  450. int i;
  451. int module_runtime;
  452. time_t now;
  453. for(i = 0; i < NUM_MODULES; i++)
  454. {
  455. now = time(NULL);
  456. //If it is too soon to attempt spawning any new process, then we're done for the moment
  457. if( (now - last_global_spawn) < SUPERVISOR_GLOBAL_SPAWN_RATE_LIMIT ) break;
  458. //If this module is still active, we don't need to spawn it...
  459. if(module_status[i].active) continue;
  460. //This number will only be valid if last_start != 0
  461. module_runtime = module_status[i].last_exit - module_status[i].last_start;
  462. if(module_status[i].last_start == 0) //If this module either made an expected termination or has never been launcher
  463. {
  464. last_global_spawn = now; //then update our last global spawn time
  465. launch_module(i); //and launch it...
  466. route_debug_message("Spawned module %s, PID = %d\n", module_name[i], (int)module_status[i].pid);
  467. }
  468. else if( module_runtime < SUPERVISOR_RESPAWN_DELAY_THRESHOLD ) //otherwise, if it has run before, but terminated unexpectedly in a very short time
  469. {
  470. if( (now - module_status[i].last_exit) >= SUPERVISOR_RESPAWN_RATE_LIMIT) //and it has been long enough to try again
  471. {
  472. last_global_spawn = now; //then update our last global spawn time
  473. launch_module(i); //and launch it...
  474. route_warning_message("Respawned module %s, PID = %d [rate limited]\n", module_name[i], (int)module_status[i].pid);
  475. }
  476. }
  477. else //otherwise, the module has terminated unexpectedly, but it had run for a long time, so we can respawn it immediately
  478. {
  479. last_global_spawn = now; //then update our last global spawn time
  480. launch_module(i); //and launch it...
  481. route_debug_message("Respawned module %s, PID = %d\n", module_name[i], (int)module_status[i].pid);
  482. }
  483. }
  484. }
  485. void handle_dead_fd(int fd)
  486. {
  487. int i;
  488. close(fd);
  489. if(fd == commhub_fd)
  490. {
  491. commhub_fd = -1;
  492. return;
  493. }
  494. for(i = 0; i < NUM_MODULES; i++)
  495. {
  496. if(module_status[i].stdin_fd == fd)
  497. {
  498. module_status[i].stdin_fd = -1;
  499. return;
  500. }
  501. if(module_status[i].stdout_fd == fd)
  502. {
  503. module_status[i].stdout_fd = -1;
  504. return;
  505. }
  506. if(module_status[i].stderr_fd == fd)
  507. {
  508. module_status[i].stderr_fd = -1;
  509. return;
  510. }
  511. }
  512. }
  513. message_callback_return handle_pong_message(struct message_record *msg, void *param)
  514. {
  515. int i;
  516. // printf("Got PONG from PID %d\n", (int)msg->header.sender);
  517. //Iterate through all of the modules
  518. for(i = 0; i < NUM_MODULES; i++)
  519. {
  520. //If this module is active, and its PID matches the PID of the sender of this PONG message
  521. if( module_status[i].active && (msg->header.sender == module_status[i].pid) )
  522. {
  523. module_status[i].last_pong = get_usec_time();
  524. module_status[i].ping_lag = (module_status[i].last_pong - module_status[i].last_ping);
  525. // printf("PING->PONG latency %lld microseconds.\n", module_status[i].ping_lag);
  526. }
  527. }
  528. return MESSAGE_HANDLED_CONT;
  529. }
  530. void clear_ping_attempt()
  531. {
  532. int i;
  533. for(i = 0; i < NUM_MODULES; i++)
  534. {
  535. if(module_status[i].active)
  536. {
  537. module_status[i].last_ping = module_status[i].last_pong = 0;
  538. module_status[i].ping_punish = PING_PUNISH_NONE;
  539. }
  540. }
  541. }
  542. void send_ping_message()
  543. {
  544. struct message_record outgoing_msg;
  545. long long int usec_now;
  546. char target[MAILBOX_NAME_MAX + 1];
  547. int i;
  548. if(commhub_fd < 0)
  549. {
  550. return;
  551. }
  552. for(i = 0; i < NUM_MODULES; i++)
  553. {
  554. //If this module is active AND included in PING monitoring
  555. if( module_status[i].active && module_status[i].ping_requested )
  556. {
  557. usec_now = get_usec_time();
  558. //If it is not yet time to re-ping this module
  559. if( (usec_now - module_status[i].last_ping) < SUPERVISOR_PING_INTERVAL)
  560. {
  561. continue; //skip it for now
  562. }
  563. //If we have PING'd this host but are still waiting for a PONG back
  564. if(module_status[i].last_pong < module_status[i].last_ping)
  565. {
  566. continue; //Don't flood it with pings
  567. }
  568. //If we've never PING'd this host, set it's PONG time to the same as its PING
  569. //time so it doesn't get killed for not having responded to its first PING.
  570. if(module_status[i].last_ping == 0)
  571. {
  572. module_status[i].last_pong = usec_now;
  573. }
  574. module_status[i].last_ping = usec_now;
  575. //Turn our PID into a magic mailbox name that addresses that PID.
  576. sprintf(target, ">%d", (int)module_status[i].pid);
  577. //Prepare a message to that magic mailbox with a payload consisting of the mailbox
  578. //name of the PING notification
  579. prepare_message(&outgoing_msg, target, MAILBOX_PING, strlen(MAILBOX_PING));
  580. //And send it!
  581. send_message(commhub_fd, &outgoing_msg);
  582. }
  583. }
  584. }
  585. void send_hup_message()
  586. {
  587. struct message_record outgoing_msg;
  588. if(commhub_fd < 0)
  589. {
  590. return;
  591. }
  592. printf("Sending HUP\n");
  593. prepare_message(&outgoing_msg, MAILBOX_HUP, "", 0);
  594. send_message(commhub_fd, &outgoing_msg);
  595. }
  596. void ping_punish_warn(int n, long long int delta)
  597. {
  598. route_debug_message("Module %s [%d] hasn't responded to a PING in %d seconds...\n", module_name[n], (int)module_status[n].pid, (int)USEC_TO_SEC(delta));
  599. }
  600. void ping_punish_term(int n, long long int delta)
  601. {
  602. route_warning_message("Module %s [%d] hasn't responded to a PING in %d seconds... Sending SIGTERM\n", module_name[n], (int)module_status[n].pid, (int)USEC_TO_SEC(delta));
  603. kill(module_status[n].pid, SIGTERM);
  604. }
  605. void ping_punish_kill(int n, long long int delta)
  606. {
  607. route_error_message("Module %s [%d] hasn't responded to a PING in %d seconds... Sending SIGKILL\n", module_name[n], (int)module_status[n].pid, (int)USEC_TO_SEC(delta));
  608. kill(module_status[n].pid, SIGKILL);
  609. }
  610. void enforce_ping_policy()
  611. {
  612. int i;
  613. long long int delta = 0;
  614. for(i = 0; i < NUM_MODULES; i++)
  615. {
  616. //If this module is both active AND included in the ping monitoring
  617. if( module_status[i].active && module_status[i].ping_requested )
  618. {
  619. if(module_status[i].last_ping == 0) //if we haven't sent a PING to this one...
  620. {
  621. //clear any punishment that would otherwise be pending and get the next module
  622. module_status[i].ping_punish = PING_PUNISH_NONE;
  623. continue;
  624. }
  625. //compute just how late this module is...
  626. delta = get_usec_time() - module_status[i].last_pong;
  627. switch(module_status[i].ping_punish)
  628. {
  629. case PING_PUNISH_NONE:
  630. if(delta > SUPERVISOR_PING_WARN_TIME)
  631. {
  632. ping_punish_warn(i, delta);
  633. module_status[i].ping_punish++;
  634. }
  635. break;
  636. case PING_PUNISH_WARN:
  637. if(delta > SUPERVISOR_PING_TERM_TIME)
  638. {
  639. ping_punish_term(i, delta);
  640. module_status[i].ping_punish++;
  641. }
  642. break;
  643. case PING_PUNISH_TERM:
  644. if(delta > SUPERVISOR_PING_KILL_TIME)
  645. {
  646. ping_punish_kill(i, delta);
  647. module_status[i].ping_punish++;
  648. }
  649. break;
  650. default:
  651. route_error_message("At our wits end with module %s [%d], just plain won't die!\n", module_name[i], (int)module_status[i].pid);
  652. break;
  653. }
  654. }
  655. }
  656. }
  657. void maintain_ipc_hub_connect(char *progname)
  658. {
  659. struct message_record outgoing_msg;
  660. if(commhub_fd < 0) //if we have no connection to the communication hub
  661. {
  662. commhub_fd = connect_to_message_server(progname); //try and get one
  663. // printf("commhub_fd = %d\n", commhub_fd);
  664. if(commhub_fd >= 0) //if it worked
  665. {
  666. //Subscribe to the command mailboxes we act on
  667. prepare_message(&outgoing_msg, MAILBOX_SUBSCRIBE, MAILBOX_PONG, strlen(MAILBOX_TOKEN_MAG));
  668. send_message(commhub_fd,&outgoing_msg);
  669. //Subscribe to the relevant status management mailboxes
  670. subscribe_to_default_messages(commhub_fd);
  671. }
  672. }
  673. }
  674. void usage(char *progname)
  675. {
  676. fprintf(stderr,"%s: %d modules given to be supervised, minimum = 1, maximum = %d\n", progname, NUM_MODULES, SUPERVISOR_MAX_MODULES - 1);
  677. fprintf(stderr,"%s: Usage: %s [module ...] [--[no-]ping] [module ...]\n", progname, progname);
  678. fprintf(stderr,"%s: By default modules will be subjet to PING monitoring through\n", progname);
  679. fprintf(stderr,"%s: the IPC hub, --no-ping will effect all modules following it until a --ping\n", progname);
  680. fprintf(stderr,"\n");
  681. exit(EX_USAGE);
  682. }
  683. void handle_command_line(int argc, char **argv)
  684. {
  685. int i;
  686. int needs_ping = 1;
  687. NUM_MODULES = 0;
  688. //Walk all of the given command line arguments and add each one as a module we have been asked to supervice
  689. for(i = 1; i < argc; i++)
  690. {
  691. if( !strcmp(argv[i], "--no-ping") )
  692. {
  693. needs_ping = 0;
  694. }
  695. else if ( !strcmp(argv[i], "--ping") )
  696. {
  697. needs_ping = 1;
  698. }
  699. else
  700. {
  701. //Here we look for the last / in the module name
  702. char *foo = rindex(argv[i], '/');
  703. if(foo) //if we found one, advance past it
  704. {
  705. foo++;
  706. }
  707. else //otherwise
  708. {
  709. foo = argv[i]; //use our whole argument
  710. }
  711. if( !strcmp(foo, SUPERVISOR_SPECIAL_BILLDB) ) //make sure it is at the end
  712. {
  713. //if it is, this one is our guy!
  714. error_logging_module = NUM_MODULES;
  715. }
  716. module_status[NUM_MODULES].ping_requested = needs_ping;
  717. module_name[NUM_MODULES++] = argv[i];
  718. }
  719. if(NUM_MODULES >= SUPERVISOR_MAX_MODULES)
  720. {
  721. usage(argv[0]);
  722. }
  723. }
  724. if(NUM_MODULES == 0)
  725. {
  726. usage(argv[0]);
  727. }
  728. }
  729. int main(int argc, char **argv)
  730. {
  731. int poll_ret;
  732. int nfds;
  733. int i;
  734. struct message_record incoming_msg;
  735. char linebuffer[LINE_BUFFER_SIZE] = {0};
  736. int read_ret;
  737. int modnum;
  738. struct pollfd fds[1 + (2 * SUPERVISOR_MAX_MODULES)] = {{0}};
  739. int poll_to_module[1 + (2 * SUPERVISOR_MAX_MODULES)] = {0};
  740. //Parse our command line to figure out what modules we need to supervise
  741. handle_command_line(argc, argv);
  742. //Configure our default signal handlers
  743. configure_signal_handlers(argv[0]);
  744. //As well as our module-specific zombie-reaping flag setter
  745. setup_child_handler();
  746. //This listens for SIGHUP messages and flags us when one has come it.
  747. setup_hup_handler();
  748. //Register our default keep-up-with-system status callbacks
  749. register_system_status_callbacks();
  750. //Except we want to exempt ourselves from the MAILBOX_EXIT and
  751. //MAILBOX_PING so as not to start ping storms or kill ourselves...
  752. register_dispatch_callback(MAILBOX_EXIT, CALLBACK_PREPROCESS, ignore_message, NULL);
  753. register_dispatch_callback(MAILBOX_PING, CALLBACK_PREPROCESS, ignore_message, NULL);
  754. //Add our specific handlers
  755. register_dispatch_callback(MAILBOX_PONG, CALLBACK_USER(1), handle_pong_message, NULL);
  756. // This is the main processing loop which monitors system status and pumps I/O from
  757. //all of the client modules (debug and error messages) and from the IPC hub when it is up
  758. //as indicated by poll().
  759. while( exit_request_status == EXIT_REQUEST_NONE )
  760. {
  761. RESET_WATCHDOG();
  762. //-------------- SYSTEM STATE MAINTENANCE CODE HERE
  763. if( (time(NULL) - last_system_stat) >= SUPERVISOR_SYSTEM_STAT_INTERVAL)
  764. {
  765. monitor_system_status();
  766. last_system_stat = time(NULL);
  767. }
  768. //Do any maintenance on our spawn list as needed
  769. monitor_spawn_list();
  770. //Try and keep in touch with the IPC hub if possible
  771. maintain_ipc_hub_connect(argv[0]);
  772. //If the communications hub is up, we may need to maintain our PING states
  773. if(commhub_fd >= 0)
  774. {
  775. //Send out any PING messages that are required
  776. send_ping_message();
  777. //Deal with any delinquents who don't answer their damn pings
  778. enforce_ping_policy();
  779. if(hup_alert)
  780. {
  781. hup_alert = 0;
  782. send_hup_message();
  783. }
  784. }
  785. else //If there is no connection to the IPC hub
  786. {
  787. clear_ping_attempt(); //clear any PING attempt records we may have active
  788. }
  789. //If our SIGCHLD handler has notified us that there is at least 1 zombie to reap
  790. if(zombie_alert)
  791. {
  792. //Try and reap any zombies that may exist
  793. reap_zombies();
  794. }
  795. //-------------- POLL FILE DESCRIPTOR SET POPULATION HERE
  796. nfds = 0;
  797. //If we have a valid communication hub connection, add it to the poll set
  798. if(commhub_fd >= 0)
  799. {
  800. fds[nfds].fd = commhub_fd;
  801. fds[nfds].events = POLLIN;
  802. poll_to_module[nfds] = -1; //but flag it as NOT belonging to a child module
  803. nfds++;
  804. }
  805. //Then iterate through all of our modules and see if they need service
  806. for(i = 0; i < NUM_MODULES; i++)
  807. {
  808. if(module_status[i].active) //If this child module is flagged as active
  809. {
  810. if(module_status[i].stdout_fd >= 0) //and we have a stdout file descriptor for this child
  811. {
  812. fds[nfds].fd = module_status[i].stdout_fd; //add it to the poll set
  813. fds[nfds].events = POLLIN;
  814. poll_to_module[nfds] = i; //and remember which module it goes to
  815. nfds++;
  816. }
  817. if(module_status[i].stderr_fd >= 0) //if we have a stderr file descriptor for this child
  818. {
  819. fds[nfds].fd = module_status[i].stderr_fd; //add it to the poll set
  820. fds[nfds].events = POLLIN;
  821. poll_to_module[nfds] = i; //and remember which module it goes to
  822. nfds++;
  823. }
  824. }
  825. }
  826. if(nfds > 0) //If we have any file descriptors to poll
  827. {
  828. poll_ret = poll(fds, nfds, POLL_TIMEOUT); //poll them
  829. if(poll_ret < 0) //if poll returns error
  830. {
  831. if(errno == EINTR) //and it's just EINTR
  832. {
  833. continue; //try the body of the while again
  834. }
  835. else //if it is some other error
  836. {
  837. //scream bloody murder
  838. route_debug_message("Poll returned %d, errno = %d (%s)\n", poll_ret, errno, strerror(errno));
  839. sleep(1);
  840. }
  841. }
  842. }
  843. else //If we have zero functional file descriptors
  844. {
  845. sleep(1); //Pretend that we called poll and it timed out
  846. poll_ret = 0;
  847. }
  848. //-------------- I/O PUMPING CODE BELOW
  849. if(poll_ret == 0) //if there are no file descriptors flagged as needing any action
  850. {
  851. continue;
  852. }
  853. for(i = 0; i < nfds; i++)
  854. {
  855. modnum = poll_to_module[i];
  856. if( (fds[i].revents & POLLIN) && (modnum >= 0) ) //If we have input and it is from a module...
  857. {
  858. read_ret = read(fds[i].fd, linebuffer, sizeof(linebuffer) - 1);
  859. if(read_ret == 0)
  860. {
  861. handle_dead_fd(fds[i].fd); //clean up
  862. continue; //and skip this fd for now
  863. }
  864. else if( (read_ret < 0) && (errno != EINTR) )
  865. {
  866. handle_dead_fd(fds[i].fd); //clean up
  867. continue; //and skip this fd for now
  868. }
  869. //Terminate our read...
  870. linebuffer[read_ret] = '\0';
  871. if(fds[i].fd == module_status[modnum].stdout_fd)
  872. {
  873. route_debug_message("%s [%d]: %s", module_name[modnum], (int)module_status[modnum].pid, linebuffer);
  874. }
  875. else if(fds[i].fd == module_status[modnum].stderr_fd)
  876. {
  877. route_error_message("%s [%d]: %s", module_name[modnum], (int)module_status[modnum].pid, linebuffer);
  878. }
  879. else
  880. {
  881. route_debug_message("File descriptor %d does not belong to module %d (%s)!\n", fds[i].fd, modnum, module_name[modnum]);
  882. }
  883. }
  884. else if( (fds[i].revents & POLLIN) && (fds[i].fd == commhub_fd) ) //If we have input and it is from the commhub...
  885. {
  886. read_ret = get_message(commhub_fd, &incoming_msg);
  887. if(read_ret < 0)
  888. {
  889. handle_dead_fd(fds[i].fd); //clean up
  890. continue; //and skip this fd for now
  891. }
  892. else
  893. {
  894. process_message(&incoming_msg); //This passes the received message through the callback list
  895. }
  896. }
  897. //If it looks like we have a closed or invalid descriptor
  898. if(fds[i].revents & (POLLERR | POLLHUP | POLLNVAL))
  899. {
  900. handle_dead_fd(fds[i].fd); //clean up
  901. continue; //and skip this fd for now
  902. }
  903. }
  904. }
  905. route_debug_message("Attempting graceful exit...\n");
  906. for(i = 0; i < NUM_MODULES; i++)
  907. {
  908. if(module_status[i].active)
  909. {
  910. route_debug_message("Sending SIGTERM to module %s [%d]\n", module_name[i], module_status[i].pid);
  911. kill(module_status[i].pid, SIGTERM);
  912. }
  913. }
  914. sleep(1);
  915. return 0;
  916. }