bill_communication.c 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742
  1. /*
  2. * Copyright (c) 2019 Clementine Computing LLC.
  3. *
  4. * This file is part of PopuFare.
  5. *
  6. * PopuFare is free software: you can redistribute it and/or modify
  7. * it under the terms of the GNU Affero General Public License as published by
  8. * the Free Software Foundation, either version 3 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * PopuFare is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU Affero General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU Affero General Public License
  17. * along with PopuFare. If not, see <https://www.gnu.org/licenses/>.
  18. *
  19. */
  20. #include <sys/socket.h>
  21. #include <sys/types.h>
  22. #include <sys/stat.h>
  23. #include <sys/un.h>
  24. #include <netinet/in.h>
  25. #include <arpa/inet.h>
  26. #include <stdio.h>
  27. #include <stdlib.h>
  28. #include <unistd.h>
  29. #include <errno.h>
  30. #include <poll.h>
  31. #include <time.h>
  32. #include "../common/common_defs.h"
  33. #include "../commhub/commhub.h"
  34. #include "../commhub/client_utils.h"
  35. #include "billdb.h"
  36. //----------GLOBAL STATE VARIABLES
  37. int flush_in_progress = 0; //This flag is used to tell if there is a flush in progress
  38. time_t last_sync_attempt = 0; //Time of the last connection attempt
  39. int commhub_fd = -1; //File descriptor of our connection to the comm hub
  40. int server_fd = -1; //File descriptor of our connection to the sync server
  41. bill_status real_bill_status = {0};
  42. time_t last_watermark_warning = 0; //The time of our last high watermark warning for the driver when space is low
  43. //This function attempts to connect to the bill server...
  44. int connect_to_bill_server()
  45. {
  46. int fd;
  47. int retval;
  48. struct sockaddr_in addr;
  49. fd = socket(PF_INET, SOCK_STREAM, 0);
  50. if(fd < 0)
  51. return -1;
  52. addr.sin_family = AF_INET;
  53. addr.sin_port = htons(BILL_SERVER_PORT);
  54. inet_aton(BILL_SERVER_IP, &addr.sin_addr);
  55. retval = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
  56. if(retval < 0)
  57. {
  58. close(fd);
  59. return -2;
  60. }
  61. return fd;
  62. }
  63. int handle_watermark_warnings(billdb_context *ctx, int force)
  64. {
  65. time_t now = time(NULL);
  66. struct message_record error_message;
  67. if(!ctx)
  68. return -1;
  69. if(ctx->num_free_records < BILLING_CRITICAL_THRESHOLD) //if we have a critical high watermark condition
  70. {
  71. //and we are either allowed to force a message or it is time to deliver the next one
  72. if(force || ((now - last_watermark_warning) >= BILLING_CRITICAL_FREQUENCY))
  73. {
  74. if(commhub_fd >= 0)
  75. {
  76. format_log_message(&error_message, LOGLEVEL_ERROR, "LOG CRITICAL: Call Dispatch");
  77. send_message(commhub_fd, &error_message);
  78. format_driver_message(&error_message, LOGLEVEL_ERROR, "LOG CRITICAL: Call Dispatch");
  79. send_message(commhub_fd, &error_message);
  80. last_watermark_warning = now;
  81. }
  82. else
  83. {
  84. return -1;
  85. }
  86. }
  87. }
  88. else if(ctx->num_free_records < BILLING_HIGH_THRESHOLD) //if we have a high watermark condition
  89. {
  90. //and we are either allowed to force a message or it is time to deliver the next one
  91. if(force || ((now - last_watermark_warning) >= BILLING_HIGH_FREQUENCY))
  92. {
  93. if(commhub_fd >= 0)
  94. {
  95. format_log_message(&error_message, LOGLEVEL_ERROR, "LOG HALF FULL: Call Dispatch");
  96. send_message(commhub_fd, &error_message);
  97. format_driver_message(&error_message, LOGLEVEL_ERROR, "LOG HALF FULL: Call Dispatch");
  98. send_message(commhub_fd, &error_message);
  99. last_watermark_warning = now;
  100. }
  101. else
  102. {
  103. return -1;
  104. }
  105. }
  106. }
  107. return 0;
  108. }
  109. int send_next_log(int fd, billdb_context *ctx, int next_idx)
  110. {
  111. char buffer[BILLING_LINE_SIZE + 2] = {0};
  112. int n, i, ret;
  113. if(!ctx)
  114. return 0;
  115. if(!ctx->bills)
  116. return 0;
  117. if(next_idx < 0)
  118. return 0;
  119. //Grab the billing log lone
  120. strncpy(buffer, ctx->bills[next_idx].data, BILLING_LINE_SIZE);
  121. n = strlen(buffer);
  122. //if it's zero length, that means that we've struck it from the record
  123. //already (it was ack'd on this same run through the poll loop).
  124. if(n == 0)
  125. {
  126. return 0;
  127. }
  128. //Add a newline
  129. buffer[n++] = '\n';
  130. buffer[n] = '\0';
  131. i = 0;
  132. //send it!
  133. while(i < n)
  134. {
  135. ret = send(fd, buffer + i, n - i, 0);
  136. if( ret <= 0 )
  137. return -1;
  138. i += ret;
  139. }
  140. //And mark it as sent now, so that we will only revisit it once the timer has run out...
  141. ctx->last_tx[next_idx] = time(NULL);
  142. return 1;
  143. }
  144. int handle_bill_reply(char *line, billdb_context *ctx)
  145. {
  146. char buffer[LINE_BUFFER_SIZE];
  147. int input_idx = 0;
  148. int eol = 0;
  149. int retval;
  150. //Extract the first tab-delimited field from the input line...
  151. input_idx += get_field(buffer, line + input_idx, sizeof(buffer), &eol);
  152. //If that field is blank, then we ignore this line
  153. if( buffer[0] == '\0' )
  154. {
  155. return 0;
  156. }
  157. if( !strcasecmp(buffer, "ACK") )
  158. {
  159. if( eol )
  160. {
  161. // printf("ACK: Premature end of line!\n");
  162. return -1;
  163. }
  164. //Get the next field (this should be a checksum...)
  165. input_idx += get_field(buffer, line + input_idx, sizeof(buffer), &eol);
  166. real_bill_status.last_ack_time = time(NULL); //update ack time
  167. retval = clear_billing_entry(ctx, buffer); //clear the billing entry
  168. real_bill_status.unsynced_messages = NUM_BILLING_ENTRIES - ctx->num_free_records; //update buffer count
  169. return retval;
  170. }
  171. else if( !strcasecmp(buffer, "DUP") )
  172. {
  173. if( eol )
  174. {
  175. // printf("DUP: Premature end of line!\n");
  176. return -1;
  177. }
  178. //Get the next field (this should be a checksum...)
  179. input_idx += get_field(buffer, line + input_idx, sizeof(buffer), &eol);
  180. real_bill_status.last_ack_time = time(NULL); //update ack time
  181. retval = clear_billing_entry(ctx, buffer); //clear the billing entry
  182. real_bill_status.unsynced_messages = NUM_BILLING_ENTRIES - ctx->num_free_records; //update buffer count
  183. return retval;
  184. }
  185. else if( !strcasecmp(buffer, "IGN") )
  186. {
  187. if( eol )
  188. {
  189. // printf("IGN: Premature end of line!\n");
  190. return -1;
  191. }
  192. //Get the next field (this should be a checksum...)
  193. input_idx += get_field(buffer, line + input_idx, sizeof(buffer), &eol);
  194. real_bill_status.last_ack_time = time(NULL); //update ack time
  195. retval = clear_billing_entry(ctx, buffer); //clear the billing entry
  196. real_bill_status.unsynced_messages = NUM_BILLING_ENTRIES - ctx->num_free_records; //update buffer count
  197. return retval;
  198. }
  199. else
  200. {
  201. fprintf(stderr, "Unknown command \"%s\"\n", buffer);
  202. }
  203. return 0;
  204. }
  205. void maintain_ipc_hub_connect(char *progname)
  206. {
  207. struct message_record outgoing_msg;
  208. if(commhub_fd < 0) //if we have no connection to the communication hub
  209. {
  210. commhub_fd = connect_to_message_server(progname); //try and get one
  211. // printf("commhub_fd = %d\n", commhub_fd);
  212. if(commhub_fd >= 0) //if it worked
  213. {
  214. //Subscribe to the basics
  215. subscribe_to_default_messages(commhub_fd);
  216. //Subscribe to the command mailboxes we act on
  217. prepare_message(&outgoing_msg, MAILBOX_SUBSCRIBE, MAILBOX_BILLING_LOG, strlen(MAILBOX_BILLING_LOG));
  218. send_message(commhub_fd,&outgoing_msg);
  219. prepare_message(&outgoing_msg, MAILBOX_SUBSCRIBE, MAILBOX_PADDLE_ACK, strlen(MAILBOX_PADDLE_ACK));
  220. send_message(commhub_fd,&outgoing_msg);
  221. //Request updated status information...
  222. prepare_message(&outgoing_msg, MAILBOX_STATUS_REQUEST, "", 0);
  223. send_message(commhub_fd,&outgoing_msg);
  224. }
  225. }
  226. }
  227. int send_bill_update(int force)
  228. {
  229. struct message_record outgoing_msg;
  230. //if EITHER the force flag is set, OR our record differs from the last one on file from the IPC hub...
  231. if( force || memcmp(&bill_stat, &real_bill_status, sizeof(real_bill_status)) )
  232. {
  233. if(commhub_fd >= 0)
  234. {
  235. //prepare and send it
  236. prepare_message(&outgoing_msg, MAILBOX_BILL_STATUS, &real_bill_status, sizeof(real_bill_status));
  237. send_message(commhub_fd,&outgoing_msg);
  238. }
  239. return 1;
  240. }
  241. return 0;
  242. }
  243. message_callback_return handle_billing_log_message(struct message_record *msg, void *param)
  244. {
  245. billdb_context *ctx = (billdb_context *)param;
  246. int retval = 0;
  247. int ignore = 0;
  248. // If this is a diagnostic log message, the first character will be a priority
  249. // code, otherwise for a billing log message it will be numeric.
  250. char priority = ((char *)msg->payload)[0];
  251. switch(priority)
  252. {
  253. case LOGLEVEL_DEBUG: //if this is a diagnostic message with a priority of LOGLEVEL_DEBUG
  254. //and there are fewer than REJECT_DIAG_DEBUG_THRESHOLD free buffer slots
  255. if(ctx->num_free_records < REJECT_DIAG_DEBUG_THRESHOLD)
  256. {
  257. ignore = 1; //then for safety, ignore this message
  258. }
  259. else
  260. {
  261. ignore = 0; //otherwise, if we have plenty of space, store it
  262. }
  263. break;
  264. case LOGLEVEL_WARN: //if this is a diagnostic message with a priority of LOGLEVEL_WARN
  265. //and there are fewer than REJECT_DIAG_WARN_THRESHOLD free buffer slots
  266. if(ctx->num_free_records < REJECT_DIAG_WARN_THRESHOLD)
  267. {
  268. ignore = 1; //then for safety, ignore this message
  269. }
  270. else
  271. {
  272. ignore = 0; //otherwise, if we have plenty of space, store it
  273. }
  274. break;
  275. case LOGLEVEL_ERROR: //if this is a diagnostic message with a priority of LOGLEVEL_ERROR
  276. //and there are fewer than REJECT_DIAG_ERROR_THRESHOLD free buffer slots
  277. if(ctx->num_free_records < REJECT_DIAG_ERROR_THRESHOLD)
  278. {
  279. ignore = 1; //then for safety, ignore this message
  280. }
  281. else
  282. {
  283. ignore = 0; //otherwise, if we have plenty of space, store it
  284. }
  285. break;
  286. default: //If this message does not start with a known diagnostic priority code, it is a billing message
  287. ignore = 0; //and therefore can never be ignored.
  288. break;
  289. }
  290. if(ignore) //If we've decided to ignore the message
  291. {
  292. // This means we are ignoring a message. This should only happen when buffer space
  293. // is scarce and the message is a diagnostic log message whith a priority that does not
  294. // justify potentially missing a billing entry later.
  295. }
  296. else //Otherwise, we want to process it as normal.
  297. {
  298. retval = add_billing_entry(ctx, (char *)msg->payload); //Attempt to add the message to the billing log
  299. switch(retval) //Test to see if that worked
  300. {
  301. case FAIL_FULL: //if the add failed
  302. case FAIL_MEM:
  303. if(commhub_fd >= 0) //and we can talk to the commhub
  304. {
  305. struct message_record error_message;
  306. format_driver_message(&error_message, LOGLEVEL_ERROR, "LOG FULL: Call Dispatch"); //notify the driver
  307. send_message(commhub_fd, &error_message);
  308. }
  309. break;
  310. default: //Otherwise, if the add succeded
  311. real_bill_status.unsynced_messages = NUM_BILLING_ENTRIES - ctx->num_free_records; //update buffer count
  312. break;
  313. }
  314. send_bill_update(0); //In either case, sent a status update if needed.
  315. }
  316. return MESSAGE_HANDLED_CONT;
  317. }
  318. message_callback_return handle_status_request_message(struct message_record *msg, void *param)
  319. {
  320. billdb_context *ctx = (billdb_context *)param;
  321. real_bill_status.unsynced_messages = NUM_BILLING_ENTRIES - ctx->num_free_records; //update buffer count
  322. //force a billing log status update if one is requested by a newly subscribing module
  323. send_bill_update(1);
  324. return MESSAGE_HANDLED_CONT;
  325. }
  326. int paddle_ack_flag = 0; //a variable to track if a paddle selection success has occurred (a driver has logged in)
  327. //this callback catches paddle selection acknowledge messages on the MAILBOX_PADDLE_ACK mailing list and
  328. //test to see if they report success, if so it sets the above flag so that we have the opportunity to warn a
  329. //newly logged in driver about a high-water-mark condition.
  330. message_callback_return handle_paddle_ack(struct message_record *msg, void *param)
  331. {
  332. set_paddle_req *payload = (set_paddle_req *)msg->payload;
  333. if(payload)
  334. {
  335. paddle_ack_flag = (payload->request == payload->result);
  336. }
  337. return MESSAGE_HANDLED_CONT;
  338. }
  339. int main(int argc, char **argv)
  340. {
  341. struct pollfd fds[2];
  342. int nfds = 0;
  343. int poll_return = 0;
  344. int read_return = 0;
  345. int next_sync_idx = -1;
  346. int i;
  347. struct message_record incoming_msg;
  348. struct message_record outgoing_msg;
  349. char input_line[LINE_BUFFER_SIZE] = {0};
  350. int input_idx = 0;
  351. int checked_idx = 0;
  352. billdb_context ctx = {0};
  353. //------------------
  354. configure_signal_handlers(argv[0]);
  355. maintain_ipc_hub_connect(argv[0]);
  356. register_system_status_callbacks();
  357. //register our module-specific callback
  358. register_dispatch_callback(MAILBOX_BILLING_LOG, CALLBACK_USER(1), handle_billing_log_message, &ctx);
  359. register_dispatch_callback(MAILBOX_STATUS_REQUEST, CALLBACK_USER(2), handle_status_request_message, &ctx);
  360. register_dispatch_callback(MAILBOX_PADDLE_ACK, CALLBACK_USER(3), handle_paddle_ack, NULL);
  361. read_return = attach_to_billdb(&ctx);
  362. if( read_return < 0 )
  363. {
  364. fprintf(stdout, "Database is missing or corrupt. Attempting to format new database.\n");
  365. read_return = format_new_billdb();
  366. if( read_return < 0 )
  367. {
  368. fprintf(stderr, "Database cannot be created. Aborting!\n");
  369. return -1;
  370. }
  371. else
  372. {
  373. read_return = attach_to_billdb(&ctx);
  374. if( read_return < 0 )
  375. {
  376. fprintf(stderr, "New database is ALSO missing or corrupt. Aborting.\n");
  377. return -1;
  378. }
  379. }
  380. }
  381. while( exit_request_status == EXIT_REQUEST_NONE )
  382. {
  383. time_t now = time(NULL);
  384. RESET_WATCHDOG();
  385. //DEBUG
  386. printf("[%lli] billdb: heartbeat\n", get_usec_time());
  387. //DEBUG
  388. maintain_ipc_hub_connect(argv[0]);
  389. if(server_fd < 0) //If we don't have a connection to the sync server...
  390. {
  391. if( (now - last_sync_attempt) > DEFAULT_CONNECT_RETRY ) //See if it is time to try again
  392. {
  393. if( tunnel_is_up() ) //and if the tunnel thinks it is up
  394. {
  395. server_fd = connect_to_bill_server(); //if so, try again...
  396. if(server_fd >= 0) //if it worked
  397. {
  398. input_idx = 0; //reset our buffer index
  399. last_sync_attempt = 0;
  400. }
  401. else
  402. {
  403. last_sync_attempt = now;
  404. }
  405. }
  406. }
  407. }
  408. //Every time through the loop check and see if we need to warn the driver of a high water mark condition
  409. //in the billing log (running out of space, and our warning frequency timer is up). If a driver has just
  410. //freshly logged in, we send a warning and reset our frequency timers.
  411. handle_watermark_warnings(&ctx, paddle_ack_flag);
  412. paddle_ack_flag = 0;
  413. nfds=0;
  414. //If we have a connection to the billing server
  415. if(server_fd >= 0)
  416. {
  417. //tell poll that we care about it
  418. fds[nfds].fd = server_fd;
  419. fds[nfds].events = POLLIN;
  420. //See if the database thinks we have anything that we need to try and sync right now...
  421. next_sync_idx = next_pending_entry(&ctx);
  422. if(next_sync_idx >= 0) //If so, ask poll() to unblock us when there is room in the pipe to the server.
  423. {
  424. fds[nfds].events |= POLLOUT;
  425. }
  426. //and advance the counter of active file descriptors
  427. nfds++;
  428. }
  429. if(commhub_fd >= 0)
  430. {
  431. fds[nfds].fd = commhub_fd;
  432. fds[nfds].events = POLLIN;
  433. nfds++;
  434. }
  435. if(nfds > 0)
  436. {
  437. poll_return = poll(fds, nfds, POLL_TIMEOUT);
  438. }
  439. else
  440. {
  441. usleep(POLL_TIMEOUT * 1000);
  442. poll_return = 0;
  443. }
  444. if(poll_return <= 0)
  445. {
  446. continue;
  447. }
  448. for(i=0; i < nfds; i++)
  449. {
  450. if( fds[i].fd == server_fd )
  451. {
  452. if(fds[i].revents & POLLIN)
  453. {
  454. read_return = recv(fds[i].fd, input_line + input_idx, sizeof(input_line) - input_idx, 0);
  455. // If the socket has closed politely (0), or had an error other than EINTR...
  456. //
  457. if( (read_return == 0) || ((read_return < 0) && (errno != EINTR)) )
  458. {
  459. close(server_fd);
  460. server_fd = -1;
  461. // EXPERIMENTAL: this might need to go when moving to production.
  462. // It's here to try and reduce the polling frequency when no connection
  463. // is present
  464. //
  465. usleep(POLL_TIMEOUT * 1000);
  466. break;
  467. }
  468. else
  469. {
  470. //this test is required otherwise an EINTR case would be treated as -1 bytes read, and bad stuff would happen...
  471. if(read_return > 0)
  472. {
  473. input_idx += read_return;
  474. do
  475. {
  476. //Advance until we either hit the end of the buffer, or we hit a line-terminator
  477. while(checked_idx < input_idx)
  478. {
  479. if( (input_line[checked_idx] == '\r') || (input_line[checked_idx] == '\n') )
  480. {
  481. break;
  482. }
  483. else
  484. {
  485. checked_idx++;
  486. }
  487. }
  488. //If we didn't hit the end of the input...
  489. if(checked_idx != input_idx)
  490. {
  491. int j,k;
  492. //Null terminate the line we got as a string...
  493. input_line[checked_idx] = '\0';
  494. //Do something useful with the string input_buffer...
  495. if( handle_bill_reply(input_line, &ctx) < 0 )
  496. {
  497. // printf("Command Failed: \"%s\"\n", input_line);
  498. }
  499. else
  500. {
  501. //If the server has ack'd a billing entry, that means that we may need up update the pass
  502. //associated with that billing entry...
  503. prepare_message(&outgoing_msg, MAILBOX_UPDATE_PASSES, "", 0);
  504. send_message(commhub_fd, &outgoing_msg);
  505. //Update our pass time status...
  506. send_bill_update(0);
  507. }
  508. //Now that we've done that, we can bump the rest of characters to the beginning of the next line...
  509. k = input_idx - (checked_idx + 1);
  510. for(j=0; j < k; j++)
  511. {
  512. input_line[j] = input_line[j + checked_idx + 1];
  513. }
  514. input_idx = j;
  515. checked_idx = 0;
  516. }
  517. //If we have hit an overflow condition such that our buffer is full and no newline has been received
  518. if(input_idx == sizeof(input_line))
  519. {
  520. close(server_fd);
  521. server_fd = -1;
  522. break;
  523. }
  524. } while(checked_idx < input_idx);
  525. }
  526. }
  527. }
  528. else if(fds[i].revents & (POLLERR | POLLHUP | POLLNVAL)) //If we've lost connection, break this loop and poll all over again
  529. {
  530. close(server_fd);
  531. server_fd = -1;
  532. break;
  533. }
  534. if(fds[i].revents & POLLOUT)
  535. {
  536. //send more logs here
  537. send_next_log(server_fd, &ctx, next_sync_idx);
  538. real_bill_status.last_sync_time = time(NULL);
  539. send_bill_update(0);
  540. }
  541. }
  542. else if( fds[i].fd == commhub_fd )
  543. {
  544. //If we've lost connection, break this loop and poll all over again
  545. if(fds[i].revents & (POLLERR | POLLHUP | POLLNVAL))
  546. {
  547. close(commhub_fd);
  548. commhub_fd = -1;
  549. break;
  550. }
  551. if(fds[i].revents & POLLIN)
  552. {
  553. read_return = get_message(commhub_fd, &incoming_msg);
  554. if( read_return < 0 )
  555. {
  556. close(commhub_fd);
  557. commhub_fd = -1;
  558. break;
  559. }
  560. process_message(&incoming_msg);
  561. }
  562. }
  563. }
  564. }
  565. printf("Detatching from Bill Database\n");
  566. detach_from_billdb(&ctx);
  567. printf("Closing connections\n");
  568. if(server_fd >= 0)
  569. {
  570. close(server_fd);
  571. server_fd = -1;
  572. }
  573. if(commhub_fd >= 0)
  574. {
  575. close(commhub_fd);
  576. server_fd = -1;
  577. }
  578. printf("Goodbye.\n");
  579. return 0;
  580. }