client_supervisor.c 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159
  1. /*
  2. * Copyright (c) 2019 Clementine Computing LLC.
  3. *
  4. * This file is part of PopuFare.
  5. *
  6. * PopuFare is free software: you can redistribute it and/or modify
  7. * it under the terms of the GNU Affero General Public License as published by
  8. * the Free Software Foundation, either version 3 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * PopuFare is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU Affero General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU Affero General Public License
  17. * along with PopuFare. If not, see <https://www.gnu.org/licenses/>.
  18. *
  19. */
  20. #include <linux/limits.h>
  21. #include <sys/types.h>
  22. #include <sys/wait.h>
  23. #include <sysexits.h>
  24. #include <stdio.h>
  25. #include <stdlib.h>
  26. #include <unistd.h>
  27. #include <signal.h>
  28. #include <string.h>
  29. #include <strings.h>
  30. #include <errno.h>
  31. #include <time.h>
  32. #include <poll.h>
  33. #include <syslog.h>
  34. #include <stdarg.h>
  35. #include "../common/common_defs.h"
  36. #include "../commhub/commhub.h"
  37. #include "../commhub/client_utils.h"
  38. int NUM_MODULES = 0; //Number of modules we are actively tracking
  39. int commhub_fd = -1; //File descriptor of our connection to the comm hub
  40. int zombie_alert = 0; //This flag is set by the SIGCHLD handler, and cleared by the reaper function
  41. int hup_alert = 0; //This is our flag that the SIGHUP handler sets and is cleared when a HUP has been delivered to the client modules
  42. int last_global_spawn = 0; //This is the time of the last new process creation event
  43. int last_system_stat = 0; //This is the time of our last system status check
  44. int system_stat_freemem = 0; //This is the number of KB of free memory in the system
  45. int system_stat_numproc = 0; //This is the number of running processes in the system
  46. float system_stat_loadavg = 0; //This is the 5 minute load average
  47. int error_logging_module = -1; //This is the error logging module's module number
  48. //(this is the module who's name matches SUPERVISOR_SPECIAL_BILLDB)
  49. #define PING_PUNISH_NONE (0)
  50. #define PING_PUNISH_WARN (1)
  51. #define PING_PUNISH_TERM (2)
  52. #define PING_PUNISH_KILL (3)
  53. struct module_record
  54. {
  55. time_t last_start; //last time this process was started
  56. time_t last_exit; //last time this process has reaped
  57. int ping_requested; //If this flag is nonzero, the specified module is subject to PING monitoring
  58. //however, if this flag is zero, the specified module is immune to PING monitoring.
  59. int active; //If this is zero, all further fields are invalid
  60. //-------------------------
  61. int exit_expected; //If this flag is set, we expect the process to exit (i.e. it's not an error if it does...)
  62. pid_t pid; //Process ID of this child module
  63. int stdin_fd; //File descriptor of this child's standard input (write to this to feed child)
  64. int stdout_fd; //File descriptor of this child's standard output (read from this to get output)
  65. int stderr_fd; //File descriptor of this child's standard error (read from this to get error output)
  66. long long int last_ping; //timestamp in microseconds of the last PING message sent to this process
  67. long long int last_pong; //timestamp in microseconds of the last PONG message received from this process
  68. long long int ping_lag; //Latest PING->PONG lag for this process
  69. int ping_punish; //Used to keep track of PING non-response action sequence
  70. } module_status[SUPERVISOR_MAX_MODULES] = {{0}}; //This array contains the state for each module
  71. char *module_name[SUPERVISOR_MAX_MODULES] = {0}; //This array contains pointers to the names of each module
  72. #define PIPE_R (0)
  73. #define PIPE_W (1)
  74. #define STDIN_H (0)
  75. #define STDOUT_H (1)
  76. #define STDERR_H (2)
  77. int route_debug_message(char *fmt, ...)
  78. {
  79. va_list ap;
  80. #ifdef SUPERVISOR_LOG_DEBUG_TO_BILLDB
  81. //IF... we have a connection to the commhub, and we have an instance of billdb under our care, and it is alive...
  82. if( (commhub_fd >= 0) && (error_logging_module >= 0) && module_status[error_logging_module].active )
  83. {
  84. int retval;
  85. int len = 0;
  86. struct message_record outgoing_msg;
  87. char payload[MAX_PAYLOAD_LENGTH] = {0};
  88. payload[len++] = LOGLEVEL_DEBUG;
  89. len += make_log_prefix(payload + len, MAX_PAYLOAD_LENGTH - len);
  90. va_start(ap, fmt); /* Initialize the va_list */
  91. len += vsnprintf(payload + len, MAX_PAYLOAD_LENGTH - len, fmt, ap);
  92. va_end(ap);
  93. prepare_message(&outgoing_msg, MAILBOX_BILLING_LOG, payload, len);
  94. retval = send_message(commhub_fd, &outgoing_msg);
  95. if(retval == 0)
  96. {
  97. return 0;
  98. }
  99. }
  100. #endif
  101. //IF either that didn't work, or the conditions were not met, send it to syslog...
  102. va_start(ap, fmt); /* Initialize the va_list */
  103. vsyslog(LOG_DEBUG, fmt, ap);
  104. va_end(ap);
  105. return 0;
  106. }
  107. int route_warning_message(char *fmt, ...)
  108. {
  109. va_list ap;
  110. #ifdef SUPERVISOR_LOG_WARNING_TO_BILLDB
  111. //IF... we have a connection to the commhub, and we have an instance of billdb under our care, and it is alive...
  112. if( (commhub_fd >= 0) && (error_logging_module >= 0) && module_status[error_logging_module].active )
  113. {
  114. int retval;
  115. int len = 0;
  116. struct message_record outgoing_msg;
  117. char payload[MAX_PAYLOAD_LENGTH] = {0};
  118. payload[len++] = LOGLEVEL_WARN;
  119. len += make_log_prefix(payload + len, MAX_PAYLOAD_LENGTH - len);
  120. va_start(ap, fmt); /* Initialize the va_list */
  121. len += vsnprintf(payload + len, MAX_PAYLOAD_LENGTH - len, fmt, ap);
  122. va_end(ap);
  123. prepare_message(&outgoing_msg, MAILBOX_BILLING_LOG, payload, len);
  124. retval = send_message(commhub_fd, &outgoing_msg);
  125. if(retval == 0)
  126. {
  127. return 0;
  128. }
  129. }
  130. #endif
  131. //IF either that didn't work, or the conditions were not met, send it to syslog...
  132. va_start(ap, fmt); /* Initialize the va_list */
  133. vsyslog(LOG_WARNING, fmt, ap);
  134. va_end(ap);
  135. return 0;
  136. }
  137. int route_error_message(char *fmt, ...)
  138. {
  139. va_list ap;
  140. #ifdef SUPERVISOR_LOG_ERROR_TO_BILLDB
  141. //IF... we have a connection to the commhub, and we have an instance of billdb under our care, and it is alive...
  142. if( (commhub_fd >= 0) && (error_logging_module >= 0) && module_status[error_logging_module].active )
  143. {
  144. int retval;
  145. int len = 0;
  146. struct message_record outgoing_msg;
  147. char payload[MAX_PAYLOAD_LENGTH] = {0};
  148. payload[len++] = LOGLEVEL_ERROR;
  149. len += make_log_prefix(payload + len, MAX_PAYLOAD_LENGTH - len);
  150. va_start(ap, fmt); /* Initialize the va_list */
  151. len += vsnprintf(payload + len, MAX_PAYLOAD_LENGTH - len, fmt, ap);
  152. va_end(ap);
  153. prepare_message(&outgoing_msg, MAILBOX_BILLING_LOG, payload, len);
  154. retval = send_message(commhub_fd, &outgoing_msg);
  155. if(retval == 0)
  156. {
  157. return 0;
  158. }
  159. }
  160. #endif
  161. //IF either that didn't work, or the conditions were not met, send it to syslog...
  162. va_start(ap, fmt); /* Initialize the va_list */
  163. vsyslog(LOG_ERR, fmt, ap);
  164. va_end(ap);
  165. return 0;
  166. }
  167. void warn_num_proc()
  168. {
  169. route_warning_message("System has %d processes running!\n", system_stat_numproc);
  170. }
  171. void warn_load_avg()
  172. {
  173. route_warning_message("System load average is %.1f!\n", system_stat_loadavg);
  174. }
  175. void warn_mem_free()
  176. {
  177. route_warning_message("System is down to %d KB of free RAM\n", system_stat_freemem);
  178. }
  179. void monitor_system_status()
  180. {
  181. int retval;
  182. int kb_free = 0;
  183. int n_procs = 0;
  184. float la5 = 0;
  185. char line[LINE_BUFFER_SIZE] = {0};
  186. FILE *f;
  187. f = fopen("/proc/loadavg", "rb");
  188. if(f)
  189. {
  190. fgets(line, sizeof(line), f);
  191. retval = sscanf(line, "%f %*f %*f %*i/%i", &la5, &n_procs);
  192. if(retval == 2)
  193. {
  194. system_stat_loadavg = la5;
  195. system_stat_numproc = n_procs;
  196. }
  197. fclose(f);
  198. }
  199. else
  200. {
  201. route_error_message("Cannot read /proc/loadavg\n");
  202. }
  203. f = fopen("/proc/meminfo", "rb");
  204. if(f)
  205. {
  206. while(!feof(f))
  207. {
  208. fgets(line, sizeof(line), f);
  209. retval = sscanf(line, "MemFree: %i kB\n", &kb_free);
  210. if(retval == 1)
  211. {
  212. system_stat_freemem = kb_free;
  213. break;
  214. }
  215. }
  216. fclose(f);
  217. }
  218. else
  219. {
  220. route_error_message("Cannot read /proc/meminfo\n");
  221. }
  222. if(system_stat_numproc > SUPERVISOR_WARN_NUM_PROC)
  223. {
  224. warn_num_proc();
  225. }
  226. if(system_stat_loadavg > SUPERVISOR_WARN_LOAD_AVG)
  227. {
  228. warn_load_avg();
  229. }
  230. if(system_stat_freemem < SUPERVISOR_WARN_LOW_MEM)
  231. {
  232. warn_mem_free();
  233. }
  234. }
  235. void send_heartbeat() {
  236. route_debug_message("heartbeat\n");
  237. }
  238. int launch_module(int n)
  239. {
  240. pid_t fork_ret;
  241. int retval;
  242. int my_stdin, my_stdout, my_stderr;
  243. int new_stdin[2], new_stdout[2], new_stderr[2];
  244. int record_confirm_pipe[2];
  245. char record_confirm_msg[4] = {'O','K','\n','\0'};
  246. int record_confirm_expect;
  247. if( (n < 0) || (n >= NUM_MODULES) )
  248. {
  249. return -1;
  250. }
  251. RESET_WATCHDOG();
  252. //Save our actual standard I/O fd's
  253. my_stdin = dup(STDIN_H);
  254. my_stdout = dup(STDOUT_H);
  255. my_stderr = dup(STDERR_H);
  256. //Generate pipes to be said handles for the child process
  257. pipe(new_stdin);
  258. pipe(new_stdout);
  259. pipe(new_stderr);
  260. //close our actual handles in preparation for forking
  261. close(STDIN_H);
  262. close(STDOUT_H);
  263. close(STDERR_H);
  264. //Assign the "far" end of the pipes to our handles so that the child process will use them
  265. dup2(new_stdin[PIPE_R], STDIN_H);
  266. dup2(new_stdout[PIPE_W], STDOUT_H);
  267. dup2(new_stderr[PIPE_W], STDERR_H);
  268. //Create a pipe for record-keeping confirmation purposes (this prevents a race condition with SIGCHLD
  269. pipe(record_confirm_pipe);
  270. record_confirm_expect = strlen(record_confirm_msg);
  271. //Ask the kernel to create a new process to hold our new module
  272. fork_ret = fork();
  273. if(fork_ret == 0) //If we are the child process
  274. {
  275. //We don't need this end of the confirmation pipe
  276. close(record_confirm_pipe[PIPE_W]);
  277. // Wait for the parent process to confirm that we have been recorded before proceeding
  278. //with the exec attempt so that if it fails, the parent process will know what to do with the
  279. //SIGCHLD signal.
  280. retval = read(record_confirm_pipe[PIPE_R], record_confirm_msg, record_confirm_expect);
  281. //If the parent process can't confirm our existence, we're in deep shit...
  282. if(retval != record_confirm_expect)
  283. {
  284. fprintf(stderr, "Never got recordkeeping confirmation!\n");
  285. exit(EX_UNAVAILABLE);
  286. }
  287. //We are now done with our confirmation pipe
  288. close(record_confirm_pipe[PIPE_R]);
  289. //Close the child's copy of the parent's standard I/O fds
  290. close(my_stdin);
  291. close(my_stdout);
  292. close(my_stderr);
  293. //Close the parent's end of the pipes connecting the two processes
  294. close(new_stdin[PIPE_W]);
  295. close(new_stdout[PIPE_R]);
  296. close(new_stderr[PIPE_R]);
  297. //Attempt to exec the child process now...
  298. retval = execl(module_name[n], module_name[n], NULL);
  299. if(retval)
  300. {
  301. fprintf(stderr, "Cannot execl(\"%s\", \"%s\", NULL)!\n", module_name[n], module_name[n]);
  302. exit(EX_UNAVAILABLE);
  303. }
  304. exit(0);
  305. }
  306. else if(fork_ret != -1) //If we are the parent process
  307. {
  308. //Record our child PID
  309. module_status[n].pid = fork_ret;
  310. //Record our end of the child's standard file descriptors
  311. module_status[n].stdin_fd = new_stdin[PIPE_W];
  312. module_status[n].stdout_fd = new_stdout[PIPE_R];
  313. module_status[n].stderr_fd = new_stderr[PIPE_R];
  314. //Reset our list ping/pong times to "never"
  315. module_status[n].last_ping = 0;
  316. module_status[n].last_pong = 0;
  317. module_status[n].ping_lag = 0;
  318. module_status[n].ping_punish = PING_PUNISH_NONE;
  319. //Record our process start time
  320. module_status[n].last_start = time(NULL);
  321. //This process is not currently expected to die
  322. module_status[n].exit_expected = 0;
  323. //Flag this child module as active
  324. module_status[n].active = 1;
  325. //Close the opposite end of all of our pipes
  326. close(record_confirm_pipe[PIPE_R]);
  327. close(new_stdin[PIPE_R]);
  328. close(new_stdout[PIPE_W]);
  329. close(new_stderr[PIPE_W]);
  330. //Close our copy of the child's standard I/O file descriptors
  331. close(STDIN_H);
  332. close(STDOUT_H);
  333. close(STDERR_H);
  334. //Restore the parent's original standard I/O file descriptors
  335. dup2(my_stdin, STDIN_H);
  336. dup2(my_stdout, STDOUT_H);
  337. dup2(my_stderr, STDERR_H);
  338. //Unblock the child process by transmitting our confirmation message
  339. retval = write(record_confirm_pipe[PIPE_W], record_confirm_msg, record_confirm_expect);
  340. if(retval != record_confirm_expect)
  341. {
  342. route_error_message("Cannot transmit recordkeeping confirmation!\n");
  343. }
  344. //We are now done with our recordkeeping confirmation pipe
  345. close(record_confirm_pipe[PIPE_W]);
  346. }
  347. else //Otherwise we have somehow FAILED to fork()
  348. {
  349. //restore our stderr file descriptor
  350. close(STDERR_H);
  351. dup2(my_stderr, STDERR_H);
  352. route_error_message("Cannot fork()!\n");
  353. exit(EX_OSERR);
  354. }
  355. return 0;
  356. }
  357. void expected_exit(int n)
  358. {
  359. route_debug_message("Module %s [%d] has finished as expected.\n", module_name[n], (int)module_status[n].pid);
  360. //flag this module as "clean" for the purpose of respawn
  361. //(so we won't count the short runtime of a process we asked to terminate against it)
  362. module_status[n].last_start = 0;
  363. module_status[n].last_exit = 0;
  364. }
  365. void unexpected_exit(int n)
  366. {
  367. int age;
  368. //Record our last stop time
  369. module_status[n].last_exit = time(NULL);
  370. age = (int)module_status[n].last_exit - (int)module_status[n].last_start;
  371. route_warning_message("Module %s [%d] has stopped running unexpectedly after %d seconds.\n", module_name[n], (int)module_status[n].pid, age);
  372. }
  373. void reap_zombies()
  374. {
  375. int status = 0;
  376. pid_t retval = 0;
  377. int i;
  378. do
  379. {
  380. // Clear the zombie alert HERE so that if things terminate after the loop finishes,
  381. //we'll get flagged. This could result in an extra flag, but we'll never miss one.
  382. //and since we're using the WNOHANG option, there is no harm in going through the waitpid()
  383. //loop one time extra.
  384. zombie_alert = 0;
  385. retval = waitpid(-1, &status, WNOHANG); //Wait for ANY child process
  386. if(retval > 0) //If waitpid() actually waited for the specified process...
  387. {
  388. for(i = 0; i < NUM_MODULES; i++)
  389. {
  390. //Go through our table of modules looking for an active module with a
  391. //matching PID...
  392. if( module_status[i].active && (module_status[i].pid == retval) )
  393. {
  394. //Give us an opportunity to do some clever cleanup if it is merited
  395. //These functions also set the last_exit time (and may zero the last_start
  396. //to flag a process as "clean" for respawn purposes).
  397. if(module_status[i].exit_expected)
  398. {
  399. expected_exit(i);
  400. }
  401. else
  402. {
  403. unexpected_exit(i);
  404. }
  405. //Set this module inactive
  406. module_status[i].active = 0;
  407. //Set this module's last ping/pong times to "never"
  408. module_status[i].last_ping = 0;
  409. module_status[i].last_pong = 0;
  410. module_status[i].ping_lag = 0;
  411. module_status[i].ping_punish = PING_PUNISH_NONE;
  412. //No more process
  413. module_status[i].pid = 0;
  414. //Close our end of this module's standard I/O file descriptors
  415. close(module_status[i].stdin_fd);
  416. close(module_status[i].stdout_fd);
  417. close(module_status[i].stderr_fd);
  418. module_status[i].stdin_fd = -1;
  419. module_status[i].stdout_fd = -1;
  420. module_status[i].stderr_fd = -1;
  421. break;
  422. }
  423. }
  424. }
  425. } while(retval > 0);
  426. }
  427. void sigchld_handler(int signum, siginfo_t *info, void *data)
  428. {
  429. zombie_alert = 1;
  430. }
  431. void sighup_handler(int signum, siginfo_t *into, void *data)
  432. {
  433. hup_alert = 1;
  434. }
  435. void setup_child_handler()
  436. {
  437. struct sigaction sa = {{0}};
  438. sa.sa_sigaction = sigchld_handler;
  439. sa.sa_flags = SA_NOCLDSTOP | SA_SIGINFO;
  440. sigfillset(&sa.sa_mask);
  441. sigaction(SIGCHLD, &sa, NULL);
  442. }
  443. void setup_hup_handler()
  444. {
  445. struct sigaction sa = {{0}};
  446. sa.sa_sigaction = sighup_handler;
  447. sa.sa_flags = SA_SIGINFO;
  448. sigfillset(&sa.sa_mask);
  449. sigaction(SIGHUP, &sa, NULL);
  450. }
  451. void monitor_spawn_list()
  452. {
  453. int i;
  454. int module_runtime;
  455. time_t now;
  456. for(i = 0; i < NUM_MODULES; i++)
  457. {
  458. now = time(NULL);
  459. //If it is too soon to attempt spawning any new process, then we're done for the moment
  460. if( (now - last_global_spawn) < SUPERVISOR_GLOBAL_SPAWN_RATE_LIMIT ) break;
  461. //If this module is still active, we don't need to spawn it...
  462. if(module_status[i].active) continue;
  463. //This number will only be valid if last_start != 0
  464. module_runtime = module_status[i].last_exit - module_status[i].last_start;
  465. if(module_status[i].last_start == 0) //If this module either made an expected termination or has never been launcher
  466. {
  467. last_global_spawn = now; //then update our last global spawn time
  468. launch_module(i); //and launch it...
  469. route_debug_message("Spawned module %s, PID = %d\n", module_name[i], (int)module_status[i].pid);
  470. }
  471. else if( module_runtime < SUPERVISOR_RESPAWN_DELAY_THRESHOLD ) //otherwise, if it has run before, but terminated unexpectedly in a very short time
  472. {
  473. if( (now - module_status[i].last_exit) >= SUPERVISOR_RESPAWN_RATE_LIMIT) //and it has been long enough to try again
  474. {
  475. last_global_spawn = now; //then update our last global spawn time
  476. launch_module(i); //and launch it...
  477. route_warning_message("Respawned module %s, PID = %d [rate limited]\n", module_name[i], (int)module_status[i].pid);
  478. }
  479. }
  480. else //otherwise, the module has terminated unexpectedly, but it had run for a long time, so we can respawn it immediately
  481. {
  482. last_global_spawn = now; //then update our last global spawn time
  483. launch_module(i); //and launch it...
  484. route_debug_message("Respawned module %s, PID = %d\n", module_name[i], (int)module_status[i].pid);
  485. }
  486. }
  487. }
  488. void handle_dead_fd(int fd)
  489. {
  490. int i;
  491. close(fd);
  492. if(fd == commhub_fd)
  493. {
  494. commhub_fd = -1;
  495. return;
  496. }
  497. for(i = 0; i < NUM_MODULES; i++)
  498. {
  499. if(module_status[i].stdin_fd == fd)
  500. {
  501. module_status[i].stdin_fd = -1;
  502. return;
  503. }
  504. if(module_status[i].stdout_fd == fd)
  505. {
  506. module_status[i].stdout_fd = -1;
  507. return;
  508. }
  509. if(module_status[i].stderr_fd == fd)
  510. {
  511. module_status[i].stderr_fd = -1;
  512. return;
  513. }
  514. }
  515. }
  516. message_callback_return handle_pong_message(struct message_record *msg, void *param)
  517. {
  518. int i;
  519. // printf("Got PONG from PID %d\n", (int)msg->header.sender);
  520. //Iterate through all of the modules
  521. for(i = 0; i < NUM_MODULES; i++)
  522. {
  523. //If this module is active, and its PID matches the PID of the sender of this PONG message
  524. if( module_status[i].active && (msg->header.sender == module_status[i].pid) )
  525. {
  526. module_status[i].last_pong = get_usec_time();
  527. module_status[i].ping_lag = (module_status[i].last_pong - module_status[i].last_ping);
  528. // printf("PING->PONG latency %lld microseconds.\n", module_status[i].ping_lag);
  529. }
  530. }
  531. return MESSAGE_HANDLED_CONT;
  532. }
  533. void clear_ping_attempt()
  534. {
  535. int i;
  536. for(i = 0; i < NUM_MODULES; i++)
  537. {
  538. if(module_status[i].active)
  539. {
  540. module_status[i].last_ping = module_status[i].last_pong = 0;
  541. module_status[i].ping_punish = PING_PUNISH_NONE;
  542. }
  543. }
  544. }
  545. void send_ping_message()
  546. {
  547. struct message_record outgoing_msg;
  548. long long int usec_now;
  549. char target[MAILBOX_NAME_MAX + 1];
  550. int i;
  551. if(commhub_fd < 0)
  552. {
  553. return;
  554. }
  555. for(i = 0; i < NUM_MODULES; i++)
  556. {
  557. //If this module is active AND included in PING monitoring
  558. if( module_status[i].active && module_status[i].ping_requested )
  559. {
  560. usec_now = get_usec_time();
  561. //If it is not yet time to re-ping this module
  562. if( (usec_now - module_status[i].last_ping) < SUPERVISOR_PING_INTERVAL)
  563. {
  564. continue; //skip it for now
  565. }
  566. //If we have PING'd this host but are still waiting for a PONG back
  567. if(module_status[i].last_pong < module_status[i].last_ping)
  568. {
  569. continue; //Don't flood it with pings
  570. }
  571. //If we've never PING'd this host, set it's PONG time to the same as its PING
  572. //time so it doesn't get killed for not having responded to its first PING.
  573. if(module_status[i].last_ping == 0)
  574. {
  575. module_status[i].last_pong = usec_now;
  576. }
  577. module_status[i].last_ping = usec_now;
  578. //Turn our PID into a magic mailbox name that addresses that PID.
  579. sprintf(target, ">%d", (int)module_status[i].pid);
  580. //Prepare a message to that magic mailbox with a payload consisting of the mailbox
  581. //name of the PING notification
  582. prepare_message(&outgoing_msg, target, MAILBOX_PING, strlen(MAILBOX_PING));
  583. //And send it!
  584. send_message(commhub_fd, &outgoing_msg);
  585. }
  586. }
  587. }
  588. void send_hup_message()
  589. {
  590. struct message_record outgoing_msg;
  591. if(commhub_fd < 0)
  592. {
  593. return;
  594. }
  595. printf("Sending HUP\n");
  596. prepare_message(&outgoing_msg, MAILBOX_HUP, "", 0);
  597. send_message(commhub_fd, &outgoing_msg);
  598. }
  599. void ping_punish_warn(int n, long long int delta)
  600. {
  601. route_debug_message("Module %s [%d] hasn't responded to a PING in %d seconds...\n", module_name[n], (int)module_status[n].pid, (int)USEC_TO_SEC(delta));
  602. }
  603. void ping_punish_term(int n, long long int delta)
  604. {
  605. route_warning_message("Module %s [%d] hasn't responded to a PING in %d seconds... Sending SIGTERM\n", module_name[n], (int)module_status[n].pid, (int)USEC_TO_SEC(delta));
  606. kill(module_status[n].pid, SIGTERM);
  607. }
  608. void ping_punish_kill(int n, long long int delta)
  609. {
  610. route_error_message("Module %s [%d] hasn't responded to a PING in %d seconds... Sending SIGKILL\n", module_name[n], (int)module_status[n].pid, (int)USEC_TO_SEC(delta));
  611. kill(module_status[n].pid, SIGKILL);
  612. }
  613. void enforce_ping_policy()
  614. {
  615. int i;
  616. long long int delta = 0;
  617. for(i = 0; i < NUM_MODULES; i++)
  618. {
  619. //If this module is both active AND included in the ping monitoring
  620. if( module_status[i].active && module_status[i].ping_requested )
  621. {
  622. if(module_status[i].last_ping == 0) //if we haven't sent a PING to this one...
  623. {
  624. //clear any punishment that would otherwise be pending and get the next module
  625. module_status[i].ping_punish = PING_PUNISH_NONE;
  626. continue;
  627. }
  628. //compute just how late this module is...
  629. delta = get_usec_time() - module_status[i].last_pong;
  630. switch(module_status[i].ping_punish)
  631. {
  632. case PING_PUNISH_NONE:
  633. if(delta > SUPERVISOR_PING_WARN_TIME)
  634. {
  635. ping_punish_warn(i, delta);
  636. module_status[i].ping_punish++;
  637. }
  638. break;
  639. case PING_PUNISH_WARN:
  640. if(delta > SUPERVISOR_PING_TERM_TIME)
  641. {
  642. ping_punish_term(i, delta);
  643. module_status[i].ping_punish++;
  644. }
  645. break;
  646. case PING_PUNISH_TERM:
  647. if(delta > SUPERVISOR_PING_KILL_TIME)
  648. {
  649. ping_punish_kill(i, delta);
  650. module_status[i].ping_punish++;
  651. }
  652. break;
  653. default:
  654. route_error_message("At our wits end with module %s [%d], just plain won't die!\n", module_name[i], (int)module_status[i].pid);
  655. break;
  656. }
  657. }
  658. }
  659. }
  660. void maintain_ipc_hub_connect(char *progname)
  661. {
  662. struct message_record outgoing_msg;
  663. if(commhub_fd < 0) //if we have no connection to the communication hub
  664. {
  665. commhub_fd = connect_to_message_server(progname); //try and get one
  666. // printf("commhub_fd = %d\n", commhub_fd);
  667. if(commhub_fd >= 0) //if it worked
  668. {
  669. //Subscribe to the command mailboxes we act on
  670. prepare_message(&outgoing_msg, MAILBOX_SUBSCRIBE, MAILBOX_PONG, strlen(MAILBOX_TOKEN_MAG));
  671. send_message(commhub_fd,&outgoing_msg);
  672. //Subscribe to the relevant status management mailboxes
  673. subscribe_to_default_messages(commhub_fd);
  674. }
  675. }
  676. }
  677. void usage(char *progname)
  678. {
  679. fprintf(stderr,"%s: %d modules given to be supervised, minimum = 1, maximum = %d\n", progname, NUM_MODULES, SUPERVISOR_MAX_MODULES - 1);
  680. fprintf(stderr,"%s: Usage: %s [module ...] [--[no-]ping] [module ...]\n", progname, progname);
  681. fprintf(stderr,"%s: By default modules will be subjet to PING monitoring through\n", progname);
  682. fprintf(stderr,"%s: the IPC hub, --no-ping will effect all modules following it until a --ping\n", progname);
  683. fprintf(stderr,"\n");
  684. exit(EX_USAGE);
  685. }
  686. void handle_command_line(int argc, char **argv)
  687. {
  688. int i;
  689. int needs_ping = 1;
  690. NUM_MODULES = 0;
  691. //Walk all of the given command line arguments and add each one as a module we have been asked to supervice
  692. for(i = 1; i < argc; i++)
  693. {
  694. if( !strcmp(argv[i], "--no-ping") )
  695. {
  696. needs_ping = 0;
  697. }
  698. else if ( !strcmp(argv[i], "--ping") )
  699. {
  700. needs_ping = 1;
  701. }
  702. else
  703. {
  704. //Here we look for the last / in the module name
  705. char *foo = rindex(argv[i], '/');
  706. if(foo) //if we found one, advance past it
  707. {
  708. foo++;
  709. }
  710. else //otherwise
  711. {
  712. foo = argv[i]; //use our whole argument
  713. }
  714. if( !strcmp(foo, SUPERVISOR_SPECIAL_BILLDB) ) //make sure it is at the end
  715. {
  716. //if it is, this one is our guy!
  717. error_logging_module = NUM_MODULES;
  718. }
  719. module_status[NUM_MODULES].ping_requested = needs_ping;
  720. module_name[NUM_MODULES++] = argv[i];
  721. }
  722. if(NUM_MODULES >= SUPERVISOR_MAX_MODULES)
  723. {
  724. usage(argv[0]);
  725. }
  726. }
  727. if(NUM_MODULES == 0)
  728. {
  729. usage(argv[0]);
  730. }
  731. }
  732. int main(int argc, char **argv)
  733. {
  734. int poll_ret;
  735. int nfds;
  736. int i;
  737. struct message_record incoming_msg;
  738. char linebuffer[LINE_BUFFER_SIZE] = {0};
  739. int read_ret;
  740. int modnum;
  741. struct pollfd fds[1 + (2 * SUPERVISOR_MAX_MODULES)] = {{0}};
  742. int poll_to_module[1 + (2 * SUPERVISOR_MAX_MODULES)] = {0};
  743. //Parse our command line to figure out what modules we need to supervise
  744. handle_command_line(argc, argv);
  745. //Configure our default signal handlers
  746. configure_signal_handlers(argv[0]);
  747. //As well as our module-specific zombie-reaping flag setter
  748. setup_child_handler();
  749. //This listens for SIGHUP messages and flags us when one has come it.
  750. setup_hup_handler();
  751. //Register our default keep-up-with-system status callbacks
  752. register_system_status_callbacks();
  753. //Except we want to exempt ourselves from the MAILBOX_EXIT and
  754. //MAILBOX_PING so as not to start ping storms or kill ourselves...
  755. register_dispatch_callback(MAILBOX_EXIT, CALLBACK_PREPROCESS, ignore_message, NULL);
  756. register_dispatch_callback(MAILBOX_PING, CALLBACK_PREPROCESS, ignore_message, NULL);
  757. //Add our specific handlers
  758. register_dispatch_callback(MAILBOX_PONG, CALLBACK_USER(1), handle_pong_message, NULL);
  759. // This is the main processing loop which monitors system status and pumps I/O from
  760. //all of the client modules (debug and error messages) and from the IPC hub when it is up
  761. //as indicated by poll().
  762. while( exit_request_status == EXIT_REQUEST_NONE )
  763. {
  764. RESET_WATCHDOG();
  765. //-------------- SYSTEM STATE MAINTENANCE CODE HERE
  766. if( (time(NULL) - last_system_stat) >= SUPERVISOR_SYSTEM_STAT_INTERVAL)
  767. {
  768. monitor_system_status();
  769. send_heartbeat();
  770. last_system_stat = time(NULL);
  771. }
  772. //Do any maintenance on our spawn list as needed
  773. monitor_spawn_list();
  774. //Try and keep in touch with the IPC hub if possible
  775. maintain_ipc_hub_connect(argv[0]);
  776. //If the communications hub is up, we may need to maintain our PING states
  777. if(commhub_fd >= 0)
  778. {
  779. //Send out any PING messages that are required
  780. send_ping_message();
  781. //Deal with any delinquents who don't answer their damn pings
  782. enforce_ping_policy();
  783. if(hup_alert)
  784. {
  785. hup_alert = 0;
  786. send_hup_message();
  787. }
  788. }
  789. else //If there is no connection to the IPC hub
  790. {
  791. clear_ping_attempt(); //clear any PING attempt records we may have active
  792. }
  793. //If our SIGCHLD handler has notified us that there is at least 1 zombie to reap
  794. if(zombie_alert)
  795. {
  796. //Try and reap any zombies that may exist
  797. reap_zombies();
  798. }
  799. //-------------- POLL FILE DESCRIPTOR SET POPULATION HERE
  800. nfds = 0;
  801. //If we have a valid communication hub connection, add it to the poll set
  802. if(commhub_fd >= 0)
  803. {
  804. fds[nfds].fd = commhub_fd;
  805. fds[nfds].events = POLLIN;
  806. poll_to_module[nfds] = -1; //but flag it as NOT belonging to a child module
  807. nfds++;
  808. }
  809. //Then iterate through all of our modules and see if they need service
  810. for(i = 0; i < NUM_MODULES; i++)
  811. {
  812. if(module_status[i].active) //If this child module is flagged as active
  813. {
  814. if(module_status[i].stdout_fd >= 0) //and we have a stdout file descriptor for this child
  815. {
  816. fds[nfds].fd = module_status[i].stdout_fd; //add it to the poll set
  817. fds[nfds].events = POLLIN;
  818. poll_to_module[nfds] = i; //and remember which module it goes to
  819. nfds++;
  820. }
  821. if(module_status[i].stderr_fd >= 0) //if we have a stderr file descriptor for this child
  822. {
  823. fds[nfds].fd = module_status[i].stderr_fd; //add it to the poll set
  824. fds[nfds].events = POLLIN;
  825. poll_to_module[nfds] = i; //and remember which module it goes to
  826. nfds++;
  827. }
  828. }
  829. }
  830. if(nfds > 0) //If we have any file descriptors to poll
  831. {
  832. poll_ret = poll(fds, nfds, POLL_TIMEOUT); //poll them
  833. if(poll_ret < 0) //if poll returns error
  834. {
  835. if(errno == EINTR) //and it's just EINTR
  836. {
  837. continue; //try the body of the while again
  838. }
  839. else //if it is some other error
  840. {
  841. //scream bloody murder
  842. route_debug_message("Poll returned %d, errno = %d (%s)\n", poll_ret, errno, strerror(errno));
  843. sleep(1);
  844. }
  845. }
  846. }
  847. else //If we have zero functional file descriptors
  848. {
  849. sleep(1); //Pretend that we called poll and it timed out
  850. poll_ret = 0;
  851. }
  852. //-------------- I/O PUMPING CODE BELOW
  853. if(poll_ret == 0) //if there are no file descriptors flagged as needing any action
  854. {
  855. continue;
  856. }
  857. for(i = 0; i < nfds; i++)
  858. {
  859. modnum = poll_to_module[i];
  860. if( (fds[i].revents & POLLIN) && (modnum >= 0) ) //If we have input and it is from a module...
  861. {
  862. read_ret = read(fds[i].fd, linebuffer, sizeof(linebuffer) - 1);
  863. if(read_ret == 0)
  864. {
  865. handle_dead_fd(fds[i].fd); //clean up
  866. continue; //and skip this fd for now
  867. }
  868. else if( (read_ret < 0) && (errno != EINTR) )
  869. {
  870. handle_dead_fd(fds[i].fd); //clean up
  871. continue; //and skip this fd for now
  872. }
  873. //Terminate our read...
  874. linebuffer[read_ret] = '\0';
  875. if(fds[i].fd == module_status[modnum].stdout_fd)
  876. {
  877. //route_debug_message("%s [%d]: %s", module_name[modnum], (int)module_status[modnum].pid, linebuffer);
  878. }
  879. else if(fds[i].fd == module_status[modnum].stderr_fd)
  880. {
  881. route_error_message("%s [%d]: %s", module_name[modnum], (int)module_status[modnum].pid, linebuffer);
  882. }
  883. else
  884. {
  885. route_debug_message("File descriptor %d does not belong to module %d (%s)!\n", fds[i].fd, modnum, module_name[modnum]);
  886. }
  887. }
  888. else if( (fds[i].revents & POLLIN) && (fds[i].fd == commhub_fd) ) //If we have input and it is from the commhub...
  889. {
  890. read_ret = get_message(commhub_fd, &incoming_msg);
  891. if(read_ret < 0)
  892. {
  893. handle_dead_fd(fds[i].fd); //clean up
  894. continue; //and skip this fd for now
  895. }
  896. else
  897. {
  898. process_message(&incoming_msg); //This passes the received message through the callback list
  899. }
  900. }
  901. //If it looks like we have a closed or invalid descriptor
  902. if(fds[i].revents & (POLLERR | POLLHUP | POLLNVAL))
  903. {
  904. handle_dead_fd(fds[i].fd); //clean up
  905. continue; //and skip this fd for now
  906. }
  907. }
  908. }
  909. route_debug_message("Attempting graceful exit...\n");
  910. for(i = 0; i < NUM_MODULES; i++)
  911. {
  912. if(module_status[i].active)
  913. {
  914. route_debug_message("Sending SIGTERM to module %s [%d]\n", module_name[i], module_status[i].pid);
  915. kill(module_status[i].pid, SIGTERM);
  916. }
  917. }
  918. sleep(1);
  919. return 0;
  920. }