| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751 |
- /*
- * Copyright (c) 2019 Clementine Computing LLC.
- *
- * This file is part of PopuFare.
- *
- * PopuFare is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * PopuFare is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with PopuFare. If not, see <https://www.gnu.org/licenses/>.
- *
- */
- #include <sys/socket.h>
- #include <sys/types.h>
- #include <sys/stat.h>
- #include <sys/un.h>
- #include <netinet/in.h>
- #include <arpa/inet.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <unistd.h>
- #include <errno.h>
- #include <poll.h>
- #include <time.h>
- #include "../common/common_defs.h"
- #include "../commhub/commhub.h"
- #include "../commhub/client_utils.h"
- #include "billdb.h"
- //----------GLOBAL STATE VARIABLES
- int flush_in_progress = 0; //This flag is used to tell if there is a flush in progress
- time_t last_sync_attempt = 0; //Time of the last connection attempt
- int commhub_fd = -1; //File descriptor of our connection to the comm hub
- int server_fd = -1; //File descriptor of our connection to the sync server
- bill_status real_bill_status = {0};
- time_t last_watermark_warning = 0; //The time of our last high watermark warning for the driver when space is low
- //This function attempts to connect to the bill server...
- int connect_to_bill_server()
- {
- int fd;
- int retval;
- struct sockaddr_in addr;
- fd = socket(PF_INET, SOCK_STREAM, 0);
- if(fd < 0)
- return -1;
- addr.sin_family = AF_INET;
- addr.sin_port = htons(BILL_SERVER_PORT);
- inet_aton(BILL_SERVER_IP, &addr.sin_addr);
- retval = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
- if(retval < 0)
- {
- close(fd);
- return -2;
- }
- return fd;
- }
- int handle_watermark_warnings(billdb_context *ctx, int force)
- {
- time_t now = time(NULL);
- struct message_record error_message;
- if(!ctx)
- return -1;
- if(ctx->num_free_records < BILLING_CRITICAL_THRESHOLD) //if we have a critical high watermark condition
- {
- //and we are either allowed to force a message or it is time to deliver the next one
- if(force || ((now - last_watermark_warning) >= BILLING_CRITICAL_FREQUENCY))
- {
- if(commhub_fd >= 0)
- {
- format_log_message(&error_message, LOGLEVEL_ERROR, "LOG CRITICAL: Call Dispatch");
- send_message(commhub_fd, &error_message);
- format_driver_message(&error_message, LOGLEVEL_ERROR, "LOG CRITICAL: Call Dispatch");
- send_message(commhub_fd, &error_message);
- last_watermark_warning = now;
- }
- else
- {
- return -1;
- }
- }
- }
- else if(ctx->num_free_records < BILLING_HIGH_THRESHOLD) //if we have a high watermark condition
- {
- //and we are either allowed to force a message or it is time to deliver the next one
- if(force || ((now - last_watermark_warning) >= BILLING_HIGH_FREQUENCY))
- {
- if(commhub_fd >= 0)
- {
- format_log_message(&error_message, LOGLEVEL_ERROR, "LOG HALF FULL: Call Dispatch");
- send_message(commhub_fd, &error_message);
- format_driver_message(&error_message, LOGLEVEL_ERROR, "LOG HALF FULL: Call Dispatch");
- send_message(commhub_fd, &error_message);
- last_watermark_warning = now;
- }
- else
- {
- return -1;
- }
- }
- }
- return 0;
- }
- int send_next_log(int fd, billdb_context *ctx, int next_idx)
- {
- char buffer[BILLING_LINE_SIZE + 2] = {0};
- int n, i, ret;
- if(!ctx)
- return 0;
- if(!ctx->bills)
- return 0;
- if(next_idx < 0)
- return 0;
- //Grab the billing log lone
- strncpy(buffer, ctx->bills[next_idx].data, BILLING_LINE_SIZE);
- n = strlen(buffer);
- //if it's zero length, that means that we've struck it from the record
- //already (it was ack'd on this same run through the poll loop).
- if(n == 0)
- {
- return 0;
- }
- //Add a newline
- buffer[n++] = '\n';
- buffer[n] = '\0';
- i = 0;
- //send it!
- while(i < n)
- {
- ret = send(fd, buffer + i, n - i, 0);
- if( ret <= 0 )
- return -1;
- i += ret;
- }
- //And mark it as sent now, so that we will only revisit it once the timer has run out...
- ctx->last_tx[next_idx] = time(NULL);
- return 1;
- }
- int handle_bill_reply(char *line, billdb_context *ctx)
- {
- char buffer[LINE_BUFFER_SIZE];
- int input_idx = 0;
- int eol = 0;
- int retval;
- //Extract the first tab-delimited field from the input line...
- input_idx += get_field(buffer, line + input_idx, sizeof(buffer), &eol);
- //If that field is blank, then we ignore this line
- if( buffer[0] == '\0' )
- {
- return 0;
- }
- if( !strcasecmp(buffer, "ACK") )
- {
- if( eol )
- {
- // printf("ACK: Premature end of line!\n");
- return -1;
- }
- //Get the next field (this should be a checksum...)
- input_idx += get_field(buffer, line + input_idx, sizeof(buffer), &eol);
- real_bill_status.last_ack_time = time(NULL); //update ack time
- retval = clear_billing_entry(ctx, buffer); //clear the billing entry
- real_bill_status.unsynced_messages = NUM_BILLING_ENTRIES - ctx->num_free_records; //update buffer count
- return retval;
- }
- else if( !strcasecmp(buffer, "DUP") )
- {
- if( eol )
- {
- // printf("DUP: Premature end of line!\n");
- return -1;
- }
- //Get the next field (this should be a checksum...)
- input_idx += get_field(buffer, line + input_idx, sizeof(buffer), &eol);
- real_bill_status.last_ack_time = time(NULL); //update ack time
- retval = clear_billing_entry(ctx, buffer); //clear the billing entry
- real_bill_status.unsynced_messages = NUM_BILLING_ENTRIES - ctx->num_free_records; //update buffer count
- return retval;
- }
- else if( !strcasecmp(buffer, "IGN") )
- {
- if( eol )
- {
- // printf("IGN: Premature end of line!\n");
- return -1;
- }
- //Get the next field (this should be a checksum...)
- input_idx += get_field(buffer, line + input_idx, sizeof(buffer), &eol);
- real_bill_status.last_ack_time = time(NULL); //update ack time
- retval = clear_billing_entry(ctx, buffer); //clear the billing entry
- real_bill_status.unsynced_messages = NUM_BILLING_ENTRIES - ctx->num_free_records; //update buffer count
- return retval;
- }
- else
- {
- fprintf(stderr, "Unknown command \"%s\"\n", buffer);
- }
- return 0;
- }
- void maintain_ipc_hub_connect(char *progname)
- {
- struct message_record outgoing_msg;
- if(commhub_fd < 0) //if we have no connection to the communication hub
- {
- commhub_fd = connect_to_message_server(progname); //try and get one
- if(commhub_fd >= 0) //if it worked
- {
- //Subscribe to the basics
- subscribe_to_default_messages(commhub_fd);
- //Subscribe to the command mailboxes we act on
- prepare_message(&outgoing_msg, MAILBOX_SUBSCRIBE, MAILBOX_BILLING_LOG, strlen(MAILBOX_BILLING_LOG));
- send_message(commhub_fd,&outgoing_msg);
- prepare_message(&outgoing_msg, MAILBOX_SUBSCRIBE, MAILBOX_PADDLE_ACK, strlen(MAILBOX_PADDLE_ACK));
- send_message(commhub_fd,&outgoing_msg);
- //Request updated status information...
- prepare_message(&outgoing_msg, MAILBOX_STATUS_REQUEST, "", 0);
- send_message(commhub_fd,&outgoing_msg);
- }
- }
- }
- int send_bill_update(int force)
- {
- struct message_record outgoing_msg;
- //if EITHER the force flag is set, OR our record differs from the last one on file from the IPC hub...
- if( force || memcmp(&bill_stat, &real_bill_status, sizeof(real_bill_status)) )
- {
- if(commhub_fd >= 0)
- {
- //prepare and send it
- prepare_message(&outgoing_msg, MAILBOX_BILL_STATUS, &real_bill_status, sizeof(real_bill_status));
- send_message(commhub_fd,&outgoing_msg);
- }
- return 1;
- }
- return 0;
- }
- message_callback_return handle_billing_log_message(struct message_record *msg, void *param)
- {
- billdb_context *ctx = (billdb_context *)param;
- int retval = 0;
- int ignore = 0;
- // If this is a diagnostic log message, the first character will be a priority
- // code, otherwise for a billing log message it will be numeric.
- char priority = ((char *)msg->payload)[0];
- switch(priority)
- {
- case LOGLEVEL_DEBUG: //if this is a diagnostic message with a priority of LOGLEVEL_DEBUG
- //and there are fewer than REJECT_DIAG_DEBUG_THRESHOLD free buffer slots
- if(ctx->num_free_records < REJECT_DIAG_DEBUG_THRESHOLD)
- {
- ignore = 1; //then for safety, ignore this message
- }
- else
- {
- ignore = 0; //otherwise, if we have plenty of space, store it
- }
- break;
- case LOGLEVEL_WARN: //if this is a diagnostic message with a priority of LOGLEVEL_WARN
- //and there are fewer than REJECT_DIAG_WARN_THRESHOLD free buffer slots
- if(ctx->num_free_records < REJECT_DIAG_WARN_THRESHOLD)
- {
- ignore = 1; //then for safety, ignore this message
- }
- else
- {
- ignore = 0; //otherwise, if we have plenty of space, store it
- }
- break;
- case LOGLEVEL_ERROR: //if this is a diagnostic message with a priority of LOGLEVEL_ERROR
- //and there are fewer than REJECT_DIAG_ERROR_THRESHOLD free buffer slots
- if(ctx->num_free_records < REJECT_DIAG_ERROR_THRESHOLD)
- {
- ignore = 1; //then for safety, ignore this message
- }
- else
- {
- ignore = 0; //otherwise, if we have plenty of space, store it
- }
- break;
- default: //If this message does not start with a known diagnostic priority code, it is a billing message
- ignore = 0; //and therefore can never be ignored.
- break;
- }
- if(ignore) //If we've decided to ignore the message
- {
- // This means we are ignoring a message. This should only happen when buffer space
- // is scarce and the message is a diagnostic log message whith a priority that does not
- // justify potentially missing a billing entry later.
- }
- else //Otherwise, we want to process it as normal.
- {
- retval = add_billing_entry(ctx, (char *)msg->payload); //Attempt to add the message to the billing log
- switch(retval) //Test to see if that worked
- {
- case FAIL_FULL: //if the add failed
- case FAIL_MEM:
- if(commhub_fd >= 0) //and we can talk to the commhub
- {
- struct message_record error_message;
- format_driver_message(&error_message, LOGLEVEL_ERROR, "LOG FULL: Call Dispatch"); //notify the driver
- send_message(commhub_fd, &error_message);
- }
- break;
- default: //Otherwise, if the add succeded
- real_bill_status.unsynced_messages = NUM_BILLING_ENTRIES - ctx->num_free_records; //update buffer count
- break;
- }
- send_bill_update(0); //In either case, sent a status update if needed.
- }
- return MESSAGE_HANDLED_CONT;
- }
- message_callback_return handle_status_request_message(struct message_record *msg, void *param)
- {
- billdb_context *ctx = (billdb_context *)param;
- real_bill_status.unsynced_messages = NUM_BILLING_ENTRIES - ctx->num_free_records; //update buffer count
- //force a billing log status update if one is requested by a newly subscribing module
- send_bill_update(1);
- return MESSAGE_HANDLED_CONT;
- }
- int paddle_ack_flag = 0; //a variable to track if a paddle selection success has occurred (a driver has logged in)
- //this callback catches paddle selection acknowledge messages on the MAILBOX_PADDLE_ACK mailing list and
- //test to see if they report success, if so it sets the above flag so that we have the opportunity to warn a
- //newly logged in driver about a high-water-mark condition.
- message_callback_return handle_paddle_ack(struct message_record *msg, void *param)
- {
- set_paddle_req *payload = (set_paddle_req *)msg->payload;
- if(payload)
- {
- paddle_ack_flag = (payload->request == payload->result);
- }
- return MESSAGE_HANDLED_CONT;
- }
- int main(int argc, char **argv)
- {
- struct pollfd fds[2];
- int nfds = 0;
- int poll_return = 0;
- int read_return = 0;
- int next_sync_idx = -1;
- int i;
- struct message_record incoming_msg;
- struct message_record outgoing_msg;
- char input_line[LINE_BUFFER_SIZE] = {0};
- int input_idx = 0;
- int checked_idx = 0;
- #ifdef DEBUG_PRINT
- long long int _usec_now, _usec_prv, _usec_del;
- _usec_del = 60000000;
- _usec_now = get_usec_time();
- _usec_prv = _usec_now;
- #endif
- billdb_context ctx = {0};
- //------------------
- configure_signal_handlers(argv[0]);
- maintain_ipc_hub_connect(argv[0]);
- register_system_status_callbacks();
- //register our module-specific callback
- register_dispatch_callback(MAILBOX_BILLING_LOG, CALLBACK_USER(1), handle_billing_log_message, &ctx);
- register_dispatch_callback(MAILBOX_STATUS_REQUEST, CALLBACK_USER(2), handle_status_request_message, &ctx);
- register_dispatch_callback(MAILBOX_PADDLE_ACK, CALLBACK_USER(3), handle_paddle_ack, NULL);
- read_return = attach_to_billdb(&ctx);
- if( read_return < 0 )
- {
- fprintf(stdout, "Database is missing or corrupt. Attempting to format new database.\n");
- read_return = format_new_billdb();
- if( read_return < 0 )
- {
- fprintf(stderr, "Database cannot be created. Aborting!\n");
- return -1;
- }
- else
- {
- read_return = attach_to_billdb(&ctx);
- if( read_return < 0 )
- {
- fprintf(stderr, "New database is ALSO missing or corrupt. Aborting.\n");
- return -1;
- }
- }
- }
- while( exit_request_status == EXIT_REQUEST_NONE )
- {
- time_t now = time(NULL);
- RESET_WATCHDOG();
- #ifdef DEBUG_PRINT
- _usec_now = get_usec_time();
- if ((_usec_now - _usec_prv) > _usec_del) {
- printf("[%lli] billdb: heartbeat\n", get_usec_time());
- _usec_prv = _usec_now;
- }
- #endif
- maintain_ipc_hub_connect(argv[0]);
- if(server_fd < 0) //If we don't have a connection to the sync server...
- {
- if( (now - last_sync_attempt) > DEFAULT_CONNECT_RETRY ) //See if it is time to try again
- {
- if( tunnel_is_up() ) //and if the tunnel thinks it is up
- {
- server_fd = connect_to_bill_server(); //if so, try again...
- if(server_fd >= 0) //if it worked
- {
- input_idx = 0; //reset our buffer index
- last_sync_attempt = 0;
- }
- else
- {
- last_sync_attempt = now;
- }
- }
- }
- }
- //Every time through the loop check and see if we need to warn the driver of a high water mark condition
- //in the billing log (running out of space, and our warning frequency timer is up). If a driver has just
- //freshly logged in, we send a warning and reset our frequency timers.
- handle_watermark_warnings(&ctx, paddle_ack_flag);
- paddle_ack_flag = 0;
- nfds=0;
- //If we have a connection to the billing server
- if(server_fd >= 0)
- {
- //tell poll that we care about it
- fds[nfds].fd = server_fd;
- fds[nfds].events = POLLIN;
- //See if the database thinks we have anything that we need to try and sync right now...
- next_sync_idx = next_pending_entry(&ctx);
- if(next_sync_idx >= 0) //If so, ask poll() to unblock us when there is room in the pipe to the server.
- {
- fds[nfds].events |= POLLOUT;
- }
- //and advance the counter of active file descriptors
- nfds++;
- }
- if(commhub_fd >= 0)
- {
- fds[nfds].fd = commhub_fd;
- fds[nfds].events = POLLIN;
- nfds++;
- }
- if(nfds > 0)
- {
- poll_return = poll(fds, nfds, POLL_TIMEOUT);
- }
- else
- {
- usleep(POLL_TIMEOUT * 1000);
- poll_return = 0;
- }
- if(poll_return <= 0)
- {
- continue;
- }
- for(i=0; i < nfds; i++)
- {
- if( fds[i].fd == server_fd )
- {
- if(fds[i].revents & POLLIN)
- {
- read_return = recv(fds[i].fd, input_line + input_idx, sizeof(input_line) - input_idx, 0);
- // If the socket has closed politely (0), or had an error other than EINTR...
- //
- if( (read_return == 0) || ((read_return < 0) && (errno != EINTR)) )
- {
- close(server_fd);
- server_fd = -1;
- // EXPERIMENTAL: this might need to go when moving to production.
- // It's here to try and reduce the polling frequency when no connection
- // is present
- //
- usleep(POLL_TIMEOUT * 1000);
- break;
- }
- else
- {
- //this test is required otherwise an EINTR case would be treated as -1 bytes read, and bad stuff would happen...
- if(read_return > 0)
- {
- input_idx += read_return;
- do
- {
- //Advance until we either hit the end of the buffer, or we hit a line-terminator
- while(checked_idx < input_idx)
- {
- if( (input_line[checked_idx] == '\r') || (input_line[checked_idx] == '\n') )
- {
- break;
- }
- else
- {
- checked_idx++;
- }
- }
- //If we didn't hit the end of the input...
- if(checked_idx != input_idx)
- {
- int j,k;
- //Null terminate the line we got as a string...
- input_line[checked_idx] = '\0';
- //Do something useful with the string input_buffer...
- if( handle_bill_reply(input_line, &ctx) < 0 )
- {
- // printf("Command Failed: \"%s\"\n", input_line);
- }
- else
- {
- //If the server has ack'd a billing entry, that means that we may need up update the pass
- //associated with that billing entry...
- prepare_message(&outgoing_msg, MAILBOX_UPDATE_PASSES, "", 0);
- send_message(commhub_fd, &outgoing_msg);
- //Update our pass time status...
- send_bill_update(0);
- }
- //Now that we've done that, we can bump the rest of characters to the beginning of the next line...
- k = input_idx - (checked_idx + 1);
- for(j=0; j < k; j++)
- {
- input_line[j] = input_line[j + checked_idx + 1];
- }
- input_idx = j;
- checked_idx = 0;
- }
- //If we have hit an overflow condition such that our buffer is full and no newline has been received
- if(input_idx == sizeof(input_line))
- {
- close(server_fd);
- server_fd = -1;
- break;
- }
- } while(checked_idx < input_idx);
- }
- }
- }
- else if(fds[i].revents & (POLLERR | POLLHUP | POLLNVAL)) //If we've lost connection, break this loop and poll all over again
- {
- close(server_fd);
- server_fd = -1;
- break;
- }
- if(fds[i].revents & POLLOUT)
- {
- //send more logs here
- send_next_log(server_fd, &ctx, next_sync_idx);
- real_bill_status.last_sync_time = time(NULL);
- send_bill_update(0);
- }
- }
- else if( fds[i].fd == commhub_fd )
- {
- //If we've lost connection, break this loop and poll all over again
- if(fds[i].revents & (POLLERR | POLLHUP | POLLNVAL))
- {
- close(commhub_fd);
- commhub_fd = -1;
- break;
- }
- if(fds[i].revents & POLLIN)
- {
- read_return = get_message(commhub_fd, &incoming_msg);
- if( read_return < 0 )
- {
- close(commhub_fd);
- commhub_fd = -1;
- break;
- }
- process_message(&incoming_msg);
- }
- }
- }
- }
- printf("Detatching from Bill Database\n");
- detach_from_billdb(&ctx);
- printf("Closing connections\n");
- if(server_fd >= 0)
- {
- close(server_fd);
- server_fd = -1;
- }
- if(commhub_fd >= 0)
- {
- close(commhub_fd);
- server_fd = -1;
- }
- printf("Goodbye.\n");
- return 0;
- }
|