/* * Copyright (c) 2019 Clementine Computing LLC. * * This file is part of PopuFare. * * PopuFare is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * PopuFare is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with PopuFare. If not, see . * */ #include #include #include #include #include #include #include #include #include #include #include #include #include "../common/common_defs.h" #include "../commhub/commhub.h" #include "../commhub/client_utils.h" #include "billdb.h" //----------GLOBAL STATE VARIABLES int flush_in_progress = 0; //This flag is used to tell if there is a flush in progress time_t last_sync_attempt = 0; //Time of the last connection attempt int commhub_fd = -1; //File descriptor of our connection to the comm hub int server_fd = -1; //File descriptor of our connection to the sync server bill_status real_bill_status = {0}; time_t last_watermark_warning = 0; //The time of our last high watermark warning for the driver when space is low //This function attempts to connect to the bill server... int connect_to_bill_server() { int fd; int retval; struct sockaddr_in addr; fd = socket(PF_INET, SOCK_STREAM, 0); if(fd < 0) return -1; addr.sin_family = AF_INET; addr.sin_port = htons(BILL_SERVER_PORT); inet_aton(BILL_SERVER_IP, &addr.sin_addr); retval = connect(fd, (struct sockaddr *)&addr, sizeof(addr)); if(retval < 0) { close(fd); return -2; } return fd; } int handle_watermark_warnings(billdb_context *ctx, int force) { time_t now = time(NULL); struct message_record error_message; if(!ctx) return -1; if(ctx->num_free_records < BILLING_CRITICAL_THRESHOLD) //if we have a critical high watermark condition { //and we are either allowed to force a message or it is time to deliver the next one if(force || ((now - last_watermark_warning) >= BILLING_CRITICAL_FREQUENCY)) { if(commhub_fd >= 0) { format_log_message(&error_message, LOGLEVEL_ERROR, "LOG CRITICAL: Call Dispatch"); send_message(commhub_fd, &error_message); format_driver_message(&error_message, LOGLEVEL_ERROR, "LOG CRITICAL: Call Dispatch"); send_message(commhub_fd, &error_message); last_watermark_warning = now; } else { return -1; } } } else if(ctx->num_free_records < BILLING_HIGH_THRESHOLD) //if we have a high watermark condition { //and we are either allowed to force a message or it is time to deliver the next one if(force || ((now - last_watermark_warning) >= BILLING_HIGH_FREQUENCY)) { if(commhub_fd >= 0) { format_log_message(&error_message, LOGLEVEL_ERROR, "LOG HALF FULL: Call Dispatch"); send_message(commhub_fd, &error_message); format_driver_message(&error_message, LOGLEVEL_ERROR, "LOG HALF FULL: Call Dispatch"); send_message(commhub_fd, &error_message); last_watermark_warning = now; } else { return -1; } } } return 0; } int send_next_log(int fd, billdb_context *ctx, int next_idx) { char buffer[BILLING_LINE_SIZE + 2] = {0}; int n, i, ret; if(!ctx) return 0; if(!ctx->bills) return 0; if(next_idx < 0) return 0; //Grab the billing log lone strncpy(buffer, ctx->bills[next_idx].data, BILLING_LINE_SIZE); n = strlen(buffer); //if it's zero length, that means that we've struck it from the record //already (it was ack'd on this same run through the poll loop). if(n == 0) { return 0; } //Add a newline buffer[n++] = '\n'; buffer[n] = '\0'; i = 0; //send it! while(i < n) { ret = send(fd, buffer + i, n - i, 0); if( ret <= 0 ) return -1; i += ret; } //And mark it as sent now, so that we will only revisit it once the timer has run out... ctx->last_tx[next_idx] = time(NULL); return 1; } int handle_bill_reply(char *line, billdb_context *ctx) { char buffer[LINE_BUFFER_SIZE]; int input_idx = 0; int eol = 0; int retval; //Extract the first tab-delimited field from the input line... input_idx += get_field(buffer, line + input_idx, sizeof(buffer), &eol); //If that field is blank, then we ignore this line if( buffer[0] == '\0' ) { return 0; } if( !strcasecmp(buffer, "ACK") ) { if( eol ) { // printf("ACK: Premature end of line!\n"); return -1; } //Get the next field (this should be a checksum...) input_idx += get_field(buffer, line + input_idx, sizeof(buffer), &eol); real_bill_status.last_ack_time = time(NULL); //update ack time retval = clear_billing_entry(ctx, buffer); //clear the billing entry real_bill_status.unsynced_messages = NUM_BILLING_ENTRIES - ctx->num_free_records; //update buffer count return retval; } else if( !strcasecmp(buffer, "DUP") ) { if( eol ) { // printf("DUP: Premature end of line!\n"); return -1; } //Get the next field (this should be a checksum...) input_idx += get_field(buffer, line + input_idx, sizeof(buffer), &eol); real_bill_status.last_ack_time = time(NULL); //update ack time retval = clear_billing_entry(ctx, buffer); //clear the billing entry real_bill_status.unsynced_messages = NUM_BILLING_ENTRIES - ctx->num_free_records; //update buffer count return retval; } else if( !strcasecmp(buffer, "IGN") ) { if( eol ) { // printf("IGN: Premature end of line!\n"); return -1; } //Get the next field (this should be a checksum...) input_idx += get_field(buffer, line + input_idx, sizeof(buffer), &eol); real_bill_status.last_ack_time = time(NULL); //update ack time retval = clear_billing_entry(ctx, buffer); //clear the billing entry real_bill_status.unsynced_messages = NUM_BILLING_ENTRIES - ctx->num_free_records; //update buffer count return retval; } else { fprintf(stderr, "Unknown command \"%s\"\n", buffer); } return 0; } void maintain_ipc_hub_connect(char *progname) { struct message_record outgoing_msg; if(commhub_fd < 0) //if we have no connection to the communication hub { commhub_fd = connect_to_message_server(progname); //try and get one // printf("commhub_fd = %d\n", commhub_fd); if(commhub_fd >= 0) //if it worked { //Subscribe to the basics subscribe_to_default_messages(commhub_fd); //Subscribe to the command mailboxes we act on prepare_message(&outgoing_msg, MAILBOX_SUBSCRIBE, MAILBOX_BILLING_LOG, strlen(MAILBOX_BILLING_LOG)); send_message(commhub_fd,&outgoing_msg); prepare_message(&outgoing_msg, MAILBOX_SUBSCRIBE, MAILBOX_PADDLE_ACK, strlen(MAILBOX_PADDLE_ACK)); send_message(commhub_fd,&outgoing_msg); //Request updated status information... prepare_message(&outgoing_msg, MAILBOX_STATUS_REQUEST, "", 0); send_message(commhub_fd,&outgoing_msg); } } } int send_bill_update(int force) { struct message_record outgoing_msg; //if EITHER the force flag is set, OR our record differs from the last one on file from the IPC hub... if( force || memcmp(&bill_stat, &real_bill_status, sizeof(real_bill_status)) ) { if(commhub_fd >= 0) { //prepare and send it prepare_message(&outgoing_msg, MAILBOX_BILL_STATUS, &real_bill_status, sizeof(real_bill_status)); send_message(commhub_fd,&outgoing_msg); } return 1; } return 0; } message_callback_return handle_billing_log_message(struct message_record *msg, void *param) { billdb_context *ctx = (billdb_context *)param; int retval = 0; int ignore = 0; // If this is a diagnostic log message, the first character will be a priority // code, otherwise for a billing log message it will be numeric. char priority = ((char *)msg->payload)[0]; switch(priority) { case LOGLEVEL_DEBUG: //if this is a diagnostic message with a priority of LOGLEVEL_DEBUG //and there are fewer than REJECT_DIAG_DEBUG_THRESHOLD free buffer slots if(ctx->num_free_records < REJECT_DIAG_DEBUG_THRESHOLD) { ignore = 1; //then for safety, ignore this message } else { ignore = 0; //otherwise, if we have plenty of space, store it } break; case LOGLEVEL_WARN: //if this is a diagnostic message with a priority of LOGLEVEL_WARN //and there are fewer than REJECT_DIAG_WARN_THRESHOLD free buffer slots if(ctx->num_free_records < REJECT_DIAG_WARN_THRESHOLD) { ignore = 1; //then for safety, ignore this message } else { ignore = 0; //otherwise, if we have plenty of space, store it } break; case LOGLEVEL_ERROR: //if this is a diagnostic message with a priority of LOGLEVEL_ERROR //and there are fewer than REJECT_DIAG_ERROR_THRESHOLD free buffer slots if(ctx->num_free_records < REJECT_DIAG_ERROR_THRESHOLD) { ignore = 1; //then for safety, ignore this message } else { ignore = 0; //otherwise, if we have plenty of space, store it } break; default: //If this message does not start with a known diagnostic priority code, it is a billing message ignore = 0; //and therefore can never be ignored. break; } if(ignore) //If we've decided to ignore the message { // This means we are ignoring a message. This should only happen when buffer space // is scarce and the message is a diagnostic log message whith a priority that does not // justify potentially missing a billing entry later. } else //Otherwise, we want to process it as normal. { retval = add_billing_entry(ctx, (char *)msg->payload); //Attempt to add the message to the billing log switch(retval) //Test to see if that worked { case FAIL_FULL: //if the add failed case FAIL_MEM: if(commhub_fd >= 0) //and we can talk to the commhub { struct message_record error_message; format_driver_message(&error_message, LOGLEVEL_ERROR, "LOG FULL: Call Dispatch"); //notify the driver send_message(commhub_fd, &error_message); } break; default: //Otherwise, if the add succeded real_bill_status.unsynced_messages = NUM_BILLING_ENTRIES - ctx->num_free_records; //update buffer count break; } send_bill_update(0); //In either case, sent a status update if needed. } return MESSAGE_HANDLED_CONT; } message_callback_return handle_status_request_message(struct message_record *msg, void *param) { billdb_context *ctx = (billdb_context *)param; real_bill_status.unsynced_messages = NUM_BILLING_ENTRIES - ctx->num_free_records; //update buffer count //force a billing log status update if one is requested by a newly subscribing module send_bill_update(1); return MESSAGE_HANDLED_CONT; } int paddle_ack_flag = 0; //a variable to track if a paddle selection success has occurred (a driver has logged in) //this callback catches paddle selection acknowledge messages on the MAILBOX_PADDLE_ACK mailing list and //test to see if they report success, if so it sets the above flag so that we have the opportunity to warn a //newly logged in driver about a high-water-mark condition. message_callback_return handle_paddle_ack(struct message_record *msg, void *param) { set_paddle_req *payload = (set_paddle_req *)msg->payload; if(payload) { paddle_ack_flag = (payload->request == payload->result); } return MESSAGE_HANDLED_CONT; } int main(int argc, char **argv) { struct pollfd fds[2]; int nfds = 0; int poll_return = 0; int read_return = 0; int next_sync_idx = -1; int i; struct message_record incoming_msg; struct message_record outgoing_msg; char input_line[LINE_BUFFER_SIZE] = {0}; int input_idx = 0; int checked_idx = 0; billdb_context ctx = {0}; //------------------ configure_signal_handlers(argv[0]); maintain_ipc_hub_connect(argv[0]); register_system_status_callbacks(); //register our module-specific callback register_dispatch_callback(MAILBOX_BILLING_LOG, CALLBACK_USER(1), handle_billing_log_message, &ctx); register_dispatch_callback(MAILBOX_STATUS_REQUEST, CALLBACK_USER(2), handle_status_request_message, &ctx); register_dispatch_callback(MAILBOX_PADDLE_ACK, CALLBACK_USER(3), handle_paddle_ack, NULL); read_return = attach_to_billdb(&ctx); if( read_return < 0 ) { fprintf(stdout, "Database is missing or corrupt. Attempting to format new database.\n"); read_return = format_new_billdb(); if( read_return < 0 ) { fprintf(stderr, "Database cannot be created. Aborting!\n"); return -1; } else { read_return = attach_to_billdb(&ctx); if( read_return < 0 ) { fprintf(stderr, "New database is ALSO missing or corrupt. Aborting.\n"); return -1; } } } while( exit_request_status == EXIT_REQUEST_NONE ) { time_t now = time(NULL); RESET_WATCHDOG(); //DEBUG printf("[%lli] billdb: heartbeat\n", get_usec_time()); //DEBUG maintain_ipc_hub_connect(argv[0]); if(server_fd < 0) //If we don't have a connection to the sync server... { if( (now - last_sync_attempt) > DEFAULT_CONNECT_RETRY ) //See if it is time to try again { if( tunnel_is_up() ) //and if the tunnel thinks it is up { server_fd = connect_to_bill_server(); //if so, try again... // printf("server_fd = %d\n", server_fd); if(server_fd >= 0) //if it worked { input_idx = 0; //reset our buffer index last_sync_attempt = 0; } else { last_sync_attempt = now; } } } } //Every time through the loop check and see if we need to warn the driver of a high water mark condition //in the billing log (running out of space, and our warning frequency timer is up). If a driver has just //freshly logged in, we send a warning and reset our frequency timers. handle_watermark_warnings(&ctx, paddle_ack_flag); paddle_ack_flag = 0; nfds=0; //If we have a connection to the billing server if(server_fd >= 0) { //tell poll that we care about it fds[nfds].fd = server_fd; fds[nfds].events = POLLIN; //See if the database thinks we have anything that we need to try and sync right now... next_sync_idx = next_pending_entry(&ctx); if(next_sync_idx >= 0) //If so, ask poll() to unblock us when there is room in the pipe to the server. { fds[nfds].events |= POLLOUT; } //and advance the counter of active file descriptors nfds++; } if(commhub_fd >= 0) { fds[nfds].fd = commhub_fd; fds[nfds].events = POLLIN; nfds++; } if(nfds > 0) { poll_return = poll(fds, nfds, POLL_TIMEOUT); } else { usleep(POLL_TIMEOUT * 1000); poll_return = 0; } if(poll_return <= 0) { continue; } //DEBUG printf("## cp.nfd %i\n", (int)nfds); for(i=0; i < nfds; i++) { if( fds[i].fd == server_fd ) { if(fds[i].revents & POLLIN) { read_return = recv(fds[i].fd, input_line + input_idx, sizeof(input_line) - input_idx, 0); //If the socket has closed politely (0), or had an error other than EINTR... if( (read_return == 0) || ((read_return < 0) && (errno != EINTR)) ) { //DEBUG printf("## bang\n"); close(server_fd); server_fd = -1; usleep(POLL_TIMEOUT * 1000); break; } else { //this test is required otherwise an EINTR case would be treated as -1 bytes read, and bad stuff would happen... if(read_return > 0) { input_idx += read_return; do { //Advance until we either hit the end of the buffer, or we hit a line-terminator while(checked_idx < input_idx) { if( (input_line[checked_idx] == '\r') || (input_line[checked_idx] == '\n') ) { break; } else { checked_idx++; } } //If we didn't hit the end of the input... if(checked_idx != input_idx) { int j,k; //Null terminate the line we got as a string... input_line[checked_idx] = '\0'; //Do something useful with the string input_buffer... if( handle_bill_reply(input_line, &ctx) < 0 ) { // printf("Command Failed: \"%s\"\n", input_line); } else { //If the server has ack'd a billing entry, that means that we may need up update the pass //associated with that billing entry... prepare_message(&outgoing_msg, MAILBOX_UPDATE_PASSES, "", 0); send_message(commhub_fd, &outgoing_msg); //Update our pass time status... send_bill_update(0); } //Now that we've done that, we can bump the rest of characters to the beginning of the next line... k = input_idx - (checked_idx + 1); for(j=0; j < k; j++) { input_line[j] = input_line[j + checked_idx + 1]; } input_idx = j; checked_idx = 0; } //If we have hit an overflow condition such that our buffer is full and no newline has been received if(input_idx == sizeof(input_line)) { close(server_fd); server_fd = -1; break; } } while(checked_idx < input_idx); } } } else if(fds[i].revents & (POLLERR | POLLHUP | POLLNVAL)) //If we've lost connection, break this loop and poll all over again { close(server_fd); server_fd = -1; break; } if(fds[i].revents & POLLOUT) { //send more logs here send_next_log(server_fd, &ctx, next_sync_idx); real_bill_status.last_sync_time = time(NULL); send_bill_update(0); } } else if( fds[i].fd == commhub_fd ) { //If we've lost connection, break this loop and poll all over again if(fds[i].revents & (POLLERR | POLLHUP | POLLNVAL)) { close(commhub_fd); commhub_fd = -1; break; } if(fds[i].revents & POLLIN) { // printf("Trying to read from hub...\n"); read_return = get_message(commhub_fd, &incoming_msg); if( read_return < 0 ) { close(commhub_fd); commhub_fd = -1; break; } process_message(&incoming_msg); } } } } printf("Detatching from Bill Database\n"); detach_from_billdb(&ctx); printf("Closing connections\n"); if(server_fd >= 0) { close(server_fd); server_fd = -1; } if(commhub_fd >= 0) { close(commhub_fd); server_fd = -1; } printf("Goodbye.\n"); return 0; }