/*
* Copyright (c) 2019 Clementine Computing LLC.
*
* This file is part of PopuFare.
*
* PopuFare is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* PopuFare is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with PopuFare. If not, see .
*
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "../common/common_defs.h"
#include "../commhub/commhub.h"
#include "../commhub/client_utils.h"
#include "billdb.h"
//----------GLOBAL STATE VARIABLES
int flush_in_progress = 0; //This flag is used to tell if there is a flush in progress
time_t last_sync_attempt = 0; //Time of the last connection attempt
int commhub_fd = -1; //File descriptor of our connection to the comm hub
int server_fd = -1; //File descriptor of our connection to the sync server
bill_status real_bill_status = {0};
time_t last_watermark_warning = 0; //The time of our last high watermark warning for the driver when space is low
//This function attempts to connect to the bill server...
int connect_to_bill_server()
{
int fd;
int retval;
struct sockaddr_in addr;
fd = socket(PF_INET, SOCK_STREAM, 0);
if(fd < 0)
return -1;
addr.sin_family = AF_INET;
addr.sin_port = htons(BILL_SERVER_PORT);
inet_aton(BILL_SERVER_IP, &addr.sin_addr);
retval = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
if(retval < 0)
{
close(fd);
return -2;
}
return fd;
}
int handle_watermark_warnings(billdb_context *ctx, int force)
{
time_t now = time(NULL);
struct message_record error_message;
if(!ctx)
return -1;
if(ctx->num_free_records < BILLING_CRITICAL_THRESHOLD) //if we have a critical high watermark condition
{
//and we are either allowed to force a message or it is time to deliver the next one
if(force || ((now - last_watermark_warning) >= BILLING_CRITICAL_FREQUENCY))
{
if(commhub_fd >= 0)
{
format_log_message(&error_message, LOGLEVEL_ERROR, "LOG CRITICAL: Call Dispatch");
send_message(commhub_fd, &error_message);
format_driver_message(&error_message, LOGLEVEL_ERROR, "LOG CRITICAL: Call Dispatch");
send_message(commhub_fd, &error_message);
last_watermark_warning = now;
}
else
{
return -1;
}
}
}
else if(ctx->num_free_records < BILLING_HIGH_THRESHOLD) //if we have a high watermark condition
{
//and we are either allowed to force a message or it is time to deliver the next one
if(force || ((now - last_watermark_warning) >= BILLING_HIGH_FREQUENCY))
{
if(commhub_fd >= 0)
{
format_log_message(&error_message, LOGLEVEL_ERROR, "LOG HALF FULL: Call Dispatch");
send_message(commhub_fd, &error_message);
format_driver_message(&error_message, LOGLEVEL_ERROR, "LOG HALF FULL: Call Dispatch");
send_message(commhub_fd, &error_message);
last_watermark_warning = now;
}
else
{
return -1;
}
}
}
return 0;
}
int send_next_log(int fd, billdb_context *ctx, int next_idx)
{
char buffer[BILLING_LINE_SIZE + 2] = {0};
int n, i, ret;
if(!ctx)
return 0;
if(!ctx->bills)
return 0;
if(next_idx < 0)
return 0;
//Grab the billing log lone
strncpy(buffer, ctx->bills[next_idx].data, BILLING_LINE_SIZE);
n = strlen(buffer);
//if it's zero length, that means that we've struck it from the record
//already (it was ack'd on this same run through the poll loop).
if(n == 0)
{
return 0;
}
//Add a newline
buffer[n++] = '\n';
buffer[n] = '\0';
i = 0;
//send it!
while(i < n)
{
ret = send(fd, buffer + i, n - i, 0);
if( ret <= 0 )
return -1;
i += ret;
}
//And mark it as sent now, so that we will only revisit it once the timer has run out...
ctx->last_tx[next_idx] = time(NULL);
return 1;
}
int handle_bill_reply(char *line, billdb_context *ctx)
{
char buffer[LINE_BUFFER_SIZE];
int input_idx = 0;
int eol = 0;
int retval;
//Extract the first tab-delimited field from the input line...
input_idx += get_field(buffer, line + input_idx, sizeof(buffer), &eol);
//If that field is blank, then we ignore this line
if( buffer[0] == '\0' )
{
return 0;
}
if( !strcasecmp(buffer, "ACK") )
{
if( eol )
{
// printf("ACK: Premature end of line!\n");
return -1;
}
//Get the next field (this should be a checksum...)
input_idx += get_field(buffer, line + input_idx, sizeof(buffer), &eol);
real_bill_status.last_ack_time = time(NULL); //update ack time
retval = clear_billing_entry(ctx, buffer); //clear the billing entry
real_bill_status.unsynced_messages = NUM_BILLING_ENTRIES - ctx->num_free_records; //update buffer count
return retval;
}
else if( !strcasecmp(buffer, "DUP") )
{
if( eol )
{
// printf("DUP: Premature end of line!\n");
return -1;
}
//Get the next field (this should be a checksum...)
input_idx += get_field(buffer, line + input_idx, sizeof(buffer), &eol);
real_bill_status.last_ack_time = time(NULL); //update ack time
retval = clear_billing_entry(ctx, buffer); //clear the billing entry
real_bill_status.unsynced_messages = NUM_BILLING_ENTRIES - ctx->num_free_records; //update buffer count
return retval;
}
else if( !strcasecmp(buffer, "IGN") )
{
if( eol )
{
// printf("IGN: Premature end of line!\n");
return -1;
}
//Get the next field (this should be a checksum...)
input_idx += get_field(buffer, line + input_idx, sizeof(buffer), &eol);
real_bill_status.last_ack_time = time(NULL); //update ack time
retval = clear_billing_entry(ctx, buffer); //clear the billing entry
real_bill_status.unsynced_messages = NUM_BILLING_ENTRIES - ctx->num_free_records; //update buffer count
return retval;
}
else
{
fprintf(stderr, "Unknown command \"%s\"\n", buffer);
}
return 0;
}
void maintain_ipc_hub_connect(char *progname)
{
struct message_record outgoing_msg;
if(commhub_fd < 0) //if we have no connection to the communication hub
{
commhub_fd = connect_to_message_server(progname); //try and get one
if(commhub_fd >= 0) //if it worked
{
//Subscribe to the basics
subscribe_to_default_messages(commhub_fd);
//Subscribe to the command mailboxes we act on
prepare_message(&outgoing_msg, MAILBOX_SUBSCRIBE, MAILBOX_BILLING_LOG, strlen(MAILBOX_BILLING_LOG));
send_message(commhub_fd,&outgoing_msg);
prepare_message(&outgoing_msg, MAILBOX_SUBSCRIBE, MAILBOX_PADDLE_ACK, strlen(MAILBOX_PADDLE_ACK));
send_message(commhub_fd,&outgoing_msg);
//Request updated status information...
prepare_message(&outgoing_msg, MAILBOX_STATUS_REQUEST, "", 0);
send_message(commhub_fd,&outgoing_msg);
}
}
}
int send_bill_update(int force)
{
struct message_record outgoing_msg;
//if EITHER the force flag is set, OR our record differs from the last one on file from the IPC hub...
if( force || memcmp(&bill_stat, &real_bill_status, sizeof(real_bill_status)) )
{
if(commhub_fd >= 0)
{
//prepare and send it
prepare_message(&outgoing_msg, MAILBOX_BILL_STATUS, &real_bill_status, sizeof(real_bill_status));
send_message(commhub_fd,&outgoing_msg);
}
return 1;
}
return 0;
}
message_callback_return handle_billing_log_message(struct message_record *msg, void *param)
{
billdb_context *ctx = (billdb_context *)param;
int retval = 0;
int ignore = 0;
// If this is a diagnostic log message, the first character will be a priority
// code, otherwise for a billing log message it will be numeric.
char priority = ((char *)msg->payload)[0];
switch(priority)
{
case LOGLEVEL_DEBUG: //if this is a diagnostic message with a priority of LOGLEVEL_DEBUG
//and there are fewer than REJECT_DIAG_DEBUG_THRESHOLD free buffer slots
if(ctx->num_free_records < REJECT_DIAG_DEBUG_THRESHOLD)
{
ignore = 1; //then for safety, ignore this message
}
else
{
ignore = 0; //otherwise, if we have plenty of space, store it
}
break;
case LOGLEVEL_WARN: //if this is a diagnostic message with a priority of LOGLEVEL_WARN
//and there are fewer than REJECT_DIAG_WARN_THRESHOLD free buffer slots
if(ctx->num_free_records < REJECT_DIAG_WARN_THRESHOLD)
{
ignore = 1; //then for safety, ignore this message
}
else
{
ignore = 0; //otherwise, if we have plenty of space, store it
}
break;
case LOGLEVEL_ERROR: //if this is a diagnostic message with a priority of LOGLEVEL_ERROR
//and there are fewer than REJECT_DIAG_ERROR_THRESHOLD free buffer slots
if(ctx->num_free_records < REJECT_DIAG_ERROR_THRESHOLD)
{
ignore = 1; //then for safety, ignore this message
}
else
{
ignore = 0; //otherwise, if we have plenty of space, store it
}
break;
default: //If this message does not start with a known diagnostic priority code, it is a billing message
ignore = 0; //and therefore can never be ignored.
break;
}
if(ignore) //If we've decided to ignore the message
{
// This means we are ignoring a message. This should only happen when buffer space
// is scarce and the message is a diagnostic log message whith a priority that does not
// justify potentially missing a billing entry later.
}
else //Otherwise, we want to process it as normal.
{
retval = add_billing_entry(ctx, (char *)msg->payload); //Attempt to add the message to the billing log
switch(retval) //Test to see if that worked
{
case FAIL_FULL: //if the add failed
case FAIL_MEM:
if(commhub_fd >= 0) //and we can talk to the commhub
{
struct message_record error_message;
format_driver_message(&error_message, LOGLEVEL_ERROR, "LOG FULL: Call Dispatch"); //notify the driver
send_message(commhub_fd, &error_message);
}
break;
default: //Otherwise, if the add succeded
real_bill_status.unsynced_messages = NUM_BILLING_ENTRIES - ctx->num_free_records; //update buffer count
break;
}
send_bill_update(0); //In either case, sent a status update if needed.
}
return MESSAGE_HANDLED_CONT;
}
message_callback_return handle_status_request_message(struct message_record *msg, void *param)
{
billdb_context *ctx = (billdb_context *)param;
real_bill_status.unsynced_messages = NUM_BILLING_ENTRIES - ctx->num_free_records; //update buffer count
//force a billing log status update if one is requested by a newly subscribing module
send_bill_update(1);
return MESSAGE_HANDLED_CONT;
}
int paddle_ack_flag = 0; //a variable to track if a paddle selection success has occurred (a driver has logged in)
//this callback catches paddle selection acknowledge messages on the MAILBOX_PADDLE_ACK mailing list and
//test to see if they report success, if so it sets the above flag so that we have the opportunity to warn a
//newly logged in driver about a high-water-mark condition.
message_callback_return handle_paddle_ack(struct message_record *msg, void *param)
{
set_paddle_req *payload = (set_paddle_req *)msg->payload;
if(payload)
{
paddle_ack_flag = (payload->request == payload->result);
}
return MESSAGE_HANDLED_CONT;
}
int main(int argc, char **argv)
{
struct pollfd fds[2];
int nfds = 0;
int poll_return = 0;
int read_return = 0;
int next_sync_idx = -1;
int i;
struct message_record incoming_msg;
struct message_record outgoing_msg;
char input_line[LINE_BUFFER_SIZE] = {0};
int input_idx = 0;
int checked_idx = 0;
#ifdef DEBUG_PRINT
long long int _usec_now, _usec_prv, _usec_del;
_usec_del = 60000000;
_usec_now = get_usec_time();
_usec_prv = _usec_now;
#endif
billdb_context ctx = {0};
//------------------
configure_signal_handlers(argv[0]);
maintain_ipc_hub_connect(argv[0]);
register_system_status_callbacks();
//register our module-specific callback
register_dispatch_callback(MAILBOX_BILLING_LOG, CALLBACK_USER(1), handle_billing_log_message, &ctx);
register_dispatch_callback(MAILBOX_STATUS_REQUEST, CALLBACK_USER(2), handle_status_request_message, &ctx);
register_dispatch_callback(MAILBOX_PADDLE_ACK, CALLBACK_USER(3), handle_paddle_ack, NULL);
read_return = attach_to_billdb(&ctx);
if( read_return < 0 )
{
fprintf(stdout, "Database is missing or corrupt. Attempting to format new database.\n");
read_return = format_new_billdb();
if( read_return < 0 )
{
fprintf(stderr, "Database cannot be created. Aborting!\n");
return -1;
}
else
{
read_return = attach_to_billdb(&ctx);
if( read_return < 0 )
{
fprintf(stderr, "New database is ALSO missing or corrupt. Aborting.\n");
return -1;
}
}
}
while( exit_request_status == EXIT_REQUEST_NONE )
{
time_t now = time(NULL);
RESET_WATCHDOG();
#ifdef DEBUG_PRINT
_usec_now = get_usec_time();
if ((_usec_now - _usec_prv) > _usec_del) {
printf("[%lli] billdb: heartbeat\n", get_usec_time());
_usec_prv = _usec_now;
}
#endif
maintain_ipc_hub_connect(argv[0]);
if(server_fd < 0) //If we don't have a connection to the sync server...
{
if( (now - last_sync_attempt) > DEFAULT_CONNECT_RETRY ) //See if it is time to try again
{
if( tunnel_is_up() ) //and if the tunnel thinks it is up
{
server_fd = connect_to_bill_server(); //if so, try again...
if(server_fd >= 0) //if it worked
{
input_idx = 0; //reset our buffer index
last_sync_attempt = 0;
}
else
{
last_sync_attempt = now;
}
}
}
}
//Every time through the loop check and see if we need to warn the driver of a high water mark condition
//in the billing log (running out of space, and our warning frequency timer is up). If a driver has just
//freshly logged in, we send a warning and reset our frequency timers.
handle_watermark_warnings(&ctx, paddle_ack_flag);
paddle_ack_flag = 0;
nfds=0;
//If we have a connection to the billing server
if(server_fd >= 0)
{
//tell poll that we care about it
fds[nfds].fd = server_fd;
fds[nfds].events = POLLIN;
//See if the database thinks we have anything that we need to try and sync right now...
next_sync_idx = next_pending_entry(&ctx);
if(next_sync_idx >= 0) //If so, ask poll() to unblock us when there is room in the pipe to the server.
{
fds[nfds].events |= POLLOUT;
}
//and advance the counter of active file descriptors
nfds++;
}
if(commhub_fd >= 0)
{
fds[nfds].fd = commhub_fd;
fds[nfds].events = POLLIN;
nfds++;
}
if(nfds > 0)
{
poll_return = poll(fds, nfds, POLL_TIMEOUT);
}
else
{
usleep(POLL_TIMEOUT * 1000);
poll_return = 0;
}
if(poll_return <= 0)
{
continue;
}
for(i=0; i < nfds; i++)
{
if( fds[i].fd == server_fd )
{
if(fds[i].revents & POLLIN)
{
read_return = recv(fds[i].fd, input_line + input_idx, sizeof(input_line) - input_idx, 0);
// If the socket has closed politely (0), or had an error other than EINTR...
//
if( (read_return == 0) || ((read_return < 0) && (errno != EINTR)) )
{
close(server_fd);
server_fd = -1;
// EXPERIMENTAL: this might need to go when moving to production.
// It's here to try and reduce the polling frequency when no connection
// is present
//
usleep(POLL_TIMEOUT * 1000);
break;
}
else
{
//this test is required otherwise an EINTR case would be treated as -1 bytes read, and bad stuff would happen...
if(read_return > 0)
{
input_idx += read_return;
do
{
//Advance until we either hit the end of the buffer, or we hit a line-terminator
while(checked_idx < input_idx)
{
if( (input_line[checked_idx] == '\r') || (input_line[checked_idx] == '\n') )
{
break;
}
else
{
checked_idx++;
}
}
//If we didn't hit the end of the input...
if(checked_idx != input_idx)
{
int j,k;
//Null terminate the line we got as a string...
input_line[checked_idx] = '\0';
//Do something useful with the string input_buffer...
if( handle_bill_reply(input_line, &ctx) < 0 )
{
// printf("Command Failed: \"%s\"\n", input_line);
}
else
{
//If the server has ack'd a billing entry, that means that we may need up update the pass
//associated with that billing entry...
prepare_message(&outgoing_msg, MAILBOX_UPDATE_PASSES, "", 0);
send_message(commhub_fd, &outgoing_msg);
//Update our pass time status...
send_bill_update(0);
}
//Now that we've done that, we can bump the rest of characters to the beginning of the next line...
k = input_idx - (checked_idx + 1);
for(j=0; j < k; j++)
{
input_line[j] = input_line[j + checked_idx + 1];
}
input_idx = j;
checked_idx = 0;
}
//If we have hit an overflow condition such that our buffer is full and no newline has been received
if(input_idx == sizeof(input_line))
{
close(server_fd);
server_fd = -1;
break;
}
} while(checked_idx < input_idx);
}
}
}
else if(fds[i].revents & (POLLERR | POLLHUP | POLLNVAL)) //If we've lost connection, break this loop and poll all over again
{
close(server_fd);
server_fd = -1;
break;
}
if(fds[i].revents & POLLOUT)
{
//send more logs here
send_next_log(server_fd, &ctx, next_sync_idx);
real_bill_status.last_sync_time = time(NULL);
send_bill_update(0);
}
}
else if( fds[i].fd == commhub_fd )
{
//If we've lost connection, break this loop and poll all over again
if(fds[i].revents & (POLLERR | POLLHUP | POLLNVAL))
{
close(commhub_fd);
commhub_fd = -1;
break;
}
if(fds[i].revents & POLLIN)
{
read_return = get_message(commhub_fd, &incoming_msg);
if( read_return < 0 )
{
close(commhub_fd);
commhub_fd = -1;
break;
}
process_message(&incoming_msg);
}
}
}
}
printf("Detatching from Bill Database\n");
detach_from_billdb(&ctx);
printf("Closing connections\n");
if(server_fd >= 0)
{
close(server_fd);
server_fd = -1;
}
if(commhub_fd >= 0)
{
close(commhub_fd);
server_fd = -1;
}
printf("Goodbye.\n");
return 0;
}