|
|
@@ -2,17 +2,17 @@
|
|
|
* Copyright (c) 2019 Clementine Computing LLC.
|
|
|
*
|
|
|
* This file is part of PopuFare.
|
|
|
- *
|
|
|
+ *
|
|
|
* PopuFare is free software: you can redistribute it and/or modify
|
|
|
* it under the terms of the GNU Affero General Public License as published by
|
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
|
* (at your option) any later version.
|
|
|
- *
|
|
|
+ *
|
|
|
* PopuFare is distributed in the hope that it will be useful,
|
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
* GNU Affero General Public License for more details.
|
|
|
- *
|
|
|
+ *
|
|
|
* You should have received a copy of the GNU Affero General Public License
|
|
|
* along with PopuFare. If not, see <https://www.gnu.org/licenses/>.
|
|
|
*
|
|
|
@@ -42,16 +42,16 @@
|
|
|
|
|
|
//----------GLOBAL STATE VARIABLES
|
|
|
|
|
|
-int flush_in_progress = 0; //This flag is used to tell if there is a flush in progress
|
|
|
+int flush_in_progress = 0; //This flag is used to tell if there is a flush in progress
|
|
|
|
|
|
-time_t last_sync_attempt = 0; //Time of the last connection attempt
|
|
|
+time_t last_sync_attempt = 0; //Time of the last connection attempt
|
|
|
|
|
|
-int commhub_fd = -1; //File descriptor of our connection to the comm hub
|
|
|
-int server_fd = -1; //File descriptor of our connection to the sync server
|
|
|
+int commhub_fd = -1; //File descriptor of our connection to the comm hub
|
|
|
+int server_fd = -1; //File descriptor of our connection to the sync server
|
|
|
|
|
|
bill_status real_bill_status = {0};
|
|
|
|
|
|
-time_t last_watermark_warning = 0; //The time of our last high watermark warning for the driver when space is low
|
|
|
+time_t last_watermark_warning = 0; //The time of our last high watermark warning for the driver when space is low
|
|
|
|
|
|
//This function attempts to connect to the bill server...
|
|
|
int connect_to_bill_server()
|
|
|
@@ -84,11 +84,11 @@ int handle_watermark_warnings(billdb_context *ctx, int force)
|
|
|
{
|
|
|
time_t now = time(NULL);
|
|
|
struct message_record error_message;
|
|
|
-
|
|
|
+
|
|
|
if(!ctx)
|
|
|
return -1;
|
|
|
-
|
|
|
- if(ctx->num_free_records < BILLING_CRITICAL_THRESHOLD) //if we have a critical high watermark condition
|
|
|
+
|
|
|
+ if(ctx->num_free_records < BILLING_CRITICAL_THRESHOLD) //if we have a critical high watermark condition
|
|
|
{
|
|
|
//and we are either allowed to force a message or it is time to deliver the next one
|
|
|
if(force || ((now - last_watermark_warning) >= BILLING_CRITICAL_FREQUENCY))
|
|
|
@@ -107,7 +107,7 @@ int handle_watermark_warnings(billdb_context *ctx, int force)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- else if(ctx->num_free_records < BILLING_HIGH_THRESHOLD) //if we have a high watermark condition
|
|
|
+ else if(ctx->num_free_records < BILLING_HIGH_THRESHOLD) //if we have a high watermark condition
|
|
|
{
|
|
|
//and we are either allowed to force a message or it is time to deliver the next one
|
|
|
if(force || ((now - last_watermark_warning) >= BILLING_HIGH_FREQUENCY))
|
|
|
@@ -126,7 +126,7 @@ int handle_watermark_warnings(billdb_context *ctx, int force)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
@@ -134,21 +134,21 @@ int send_next_log(int fd, billdb_context *ctx, int next_idx)
|
|
|
{
|
|
|
char buffer[BILLING_LINE_SIZE + 2] = {0};
|
|
|
int n, i, ret;
|
|
|
-
|
|
|
+
|
|
|
if(!ctx)
|
|
|
return 0;
|
|
|
-
|
|
|
+
|
|
|
if(!ctx->bills)
|
|
|
return 0;
|
|
|
-
|
|
|
+
|
|
|
if(next_idx < 0)
|
|
|
return 0;
|
|
|
-
|
|
|
+
|
|
|
//Grab the billing log lone
|
|
|
strncpy(buffer, ctx->bills[next_idx].data, BILLING_LINE_SIZE);
|
|
|
n = strlen(buffer);
|
|
|
|
|
|
- //if it's zero length, that means that we've struck it from the record
|
|
|
+ //if it's zero length, that means that we've struck it from the record
|
|
|
//already (it was ack'd on this same run through the poll loop).
|
|
|
if(n == 0)
|
|
|
{
|
|
|
@@ -158,23 +158,23 @@ int send_next_log(int fd, billdb_context *ctx, int next_idx)
|
|
|
//Add a newline
|
|
|
buffer[n++] = '\n';
|
|
|
buffer[n] = '\0';
|
|
|
-
|
|
|
+
|
|
|
i = 0;
|
|
|
|
|
|
- //send it!
|
|
|
+ //send it!
|
|
|
while(i < n)
|
|
|
{
|
|
|
ret = send(fd, buffer + i, n - i, 0);
|
|
|
-
|
|
|
+
|
|
|
if( ret <= 0 )
|
|
|
return -1;
|
|
|
-
|
|
|
+
|
|
|
i += ret;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
//And mark it as sent now, so that we will only revisit it once the timer has run out...
|
|
|
ctx->last_tx[next_idx] = time(NULL);
|
|
|
-
|
|
|
+
|
|
|
return 1;
|
|
|
}
|
|
|
|
|
|
@@ -182,20 +182,20 @@ int handle_bill_reply(char *line, billdb_context *ctx)
|
|
|
{
|
|
|
|
|
|
char buffer[LINE_BUFFER_SIZE];
|
|
|
-
|
|
|
+
|
|
|
int input_idx = 0;
|
|
|
int eol = 0;
|
|
|
int retval;
|
|
|
-
|
|
|
+
|
|
|
//Extract the first tab-delimited field from the input line...
|
|
|
input_idx += get_field(buffer, line + input_idx, sizeof(buffer), &eol);
|
|
|
-
|
|
|
+
|
|
|
//If that field is blank, then we ignore this line
|
|
|
if( buffer[0] == '\0' )
|
|
|
{
|
|
|
return 0;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if( !strcasecmp(buffer, "ACK") )
|
|
|
{
|
|
|
if( eol )
|
|
|
@@ -203,16 +203,16 @@ int handle_bill_reply(char *line, billdb_context *ctx)
|
|
|
// printf("ACK: Premature end of line!\n");
|
|
|
return -1;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
//Get the next field (this should be a checksum...)
|
|
|
input_idx += get_field(buffer, line + input_idx, sizeof(buffer), &eol);
|
|
|
-
|
|
|
- real_bill_status.last_ack_time = time(NULL); //update ack time
|
|
|
-
|
|
|
- retval = clear_billing_entry(ctx, buffer); //clear the billing entry
|
|
|
-
|
|
|
- real_bill_status.unsynced_messages = NUM_BILLING_ENTRIES - ctx->num_free_records; //update buffer count
|
|
|
-
|
|
|
+
|
|
|
+ real_bill_status.last_ack_time = time(NULL); //update ack time
|
|
|
+
|
|
|
+ retval = clear_billing_entry(ctx, buffer); //clear the billing entry
|
|
|
+
|
|
|
+ real_bill_status.unsynced_messages = NUM_BILLING_ENTRIES - ctx->num_free_records; //update buffer count
|
|
|
+
|
|
|
return retval;
|
|
|
}
|
|
|
else if( !strcasecmp(buffer, "DUP") )
|
|
|
@@ -222,16 +222,16 @@ int handle_bill_reply(char *line, billdb_context *ctx)
|
|
|
// printf("DUP: Premature end of line!\n");
|
|
|
return -1;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
//Get the next field (this should be a checksum...)
|
|
|
input_idx += get_field(buffer, line + input_idx, sizeof(buffer), &eol);
|
|
|
-
|
|
|
- real_bill_status.last_ack_time = time(NULL); //update ack time
|
|
|
-
|
|
|
- retval = clear_billing_entry(ctx, buffer); //clear the billing entry
|
|
|
-
|
|
|
- real_bill_status.unsynced_messages = NUM_BILLING_ENTRIES - ctx->num_free_records; //update buffer count
|
|
|
-
|
|
|
+
|
|
|
+ real_bill_status.last_ack_time = time(NULL); //update ack time
|
|
|
+
|
|
|
+ retval = clear_billing_entry(ctx, buffer); //clear the billing entry
|
|
|
+
|
|
|
+ real_bill_status.unsynced_messages = NUM_BILLING_ENTRIES - ctx->num_free_records; //update buffer count
|
|
|
+
|
|
|
return retval;
|
|
|
}
|
|
|
else if( !strcasecmp(buffer, "IGN") )
|
|
|
@@ -241,16 +241,16 @@ int handle_bill_reply(char *line, billdb_context *ctx)
|
|
|
// printf("IGN: Premature end of line!\n");
|
|
|
return -1;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
//Get the next field (this should be a checksum...)
|
|
|
input_idx += get_field(buffer, line + input_idx, sizeof(buffer), &eol);
|
|
|
|
|
|
- real_bill_status.last_ack_time = time(NULL); //update ack time
|
|
|
-
|
|
|
- retval = clear_billing_entry(ctx, buffer); //clear the billing entry
|
|
|
-
|
|
|
- real_bill_status.unsynced_messages = NUM_BILLING_ENTRIES - ctx->num_free_records; //update buffer count
|
|
|
-
|
|
|
+ real_bill_status.last_ack_time = time(NULL); //update ack time
|
|
|
+
|
|
|
+ retval = clear_billing_entry(ctx, buffer); //clear the billing entry
|
|
|
+
|
|
|
+ real_bill_status.unsynced_messages = NUM_BILLING_ENTRIES - ctx->num_free_records; //update buffer count
|
|
|
+
|
|
|
return retval;
|
|
|
}
|
|
|
else
|
|
|
@@ -265,24 +265,24 @@ void maintain_ipc_hub_connect(char *progname)
|
|
|
{
|
|
|
struct message_record outgoing_msg;
|
|
|
|
|
|
- if(commhub_fd < 0) //if we have no connection to the communication hub
|
|
|
+ if(commhub_fd < 0) //if we have no connection to the communication hub
|
|
|
{
|
|
|
- commhub_fd = connect_to_message_server(progname); //try and get one
|
|
|
-
|
|
|
+ commhub_fd = connect_to_message_server(progname); //try and get one
|
|
|
+
|
|
|
// printf("commhub_fd = %d\n", commhub_fd);
|
|
|
-
|
|
|
- if(commhub_fd >= 0) //if it worked
|
|
|
+
|
|
|
+ if(commhub_fd >= 0) //if it worked
|
|
|
{
|
|
|
//Subscribe to the basics
|
|
|
subscribe_to_default_messages(commhub_fd);
|
|
|
-
|
|
|
+
|
|
|
//Subscribe to the command mailboxes we act on
|
|
|
prepare_message(&outgoing_msg, MAILBOX_SUBSCRIBE, MAILBOX_BILLING_LOG, strlen(MAILBOX_BILLING_LOG));
|
|
|
send_message(commhub_fd,&outgoing_msg);
|
|
|
-
|
|
|
+
|
|
|
prepare_message(&outgoing_msg, MAILBOX_SUBSCRIBE, MAILBOX_PADDLE_ACK, strlen(MAILBOX_PADDLE_ACK));
|
|
|
send_message(commhub_fd,&outgoing_msg);
|
|
|
-
|
|
|
+
|
|
|
//Request updated status information...
|
|
|
prepare_message(&outgoing_msg, MAILBOX_STATUS_REQUEST, "", 0);
|
|
|
send_message(commhub_fd,&outgoing_msg);
|
|
|
@@ -294,7 +294,7 @@ void maintain_ipc_hub_connect(char *progname)
|
|
|
int send_bill_update(int force)
|
|
|
{
|
|
|
struct message_record outgoing_msg;
|
|
|
-
|
|
|
+
|
|
|
//if EITHER the force flag is set, OR our record differs from the last one on file from the IPC hub...
|
|
|
if( force || memcmp(&bill_stat, &real_bill_status, sizeof(real_bill_status)) )
|
|
|
{
|
|
|
@@ -302,12 +302,12 @@ int send_bill_update(int force)
|
|
|
{
|
|
|
//prepare and send it
|
|
|
prepare_message(&outgoing_msg, MAILBOX_BILL_STATUS, &real_bill_status, sizeof(real_bill_status));
|
|
|
- send_message(commhub_fd,&outgoing_msg);
|
|
|
+ send_message(commhub_fd,&outgoing_msg);
|
|
|
}
|
|
|
|
|
|
return 1;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
@@ -317,117 +317,117 @@ message_callback_return handle_billing_log_message(struct message_record *msg, v
|
|
|
int retval = 0;
|
|
|
int ignore = 0;
|
|
|
|
|
|
-
|
|
|
- // If this is a diagnostic log message, the first character will be a priority
|
|
|
+
|
|
|
+ // If this is a diagnostic log message, the first character will be a priority
|
|
|
// code, otherwise for a billing log message it will be numeric.
|
|
|
- char priority = ((char *)msg->payload)[0];
|
|
|
+ char priority = ((char *)msg->payload)[0];
|
|
|
|
|
|
switch(priority)
|
|
|
{
|
|
|
- case LOGLEVEL_DEBUG: //if this is a diagnostic message with a priority of LOGLEVEL_DEBUG
|
|
|
+ case LOGLEVEL_DEBUG: //if this is a diagnostic message with a priority of LOGLEVEL_DEBUG
|
|
|
//and there are fewer than REJECT_DIAG_DEBUG_THRESHOLD free buffer slots
|
|
|
-
|
|
|
- if(ctx->num_free_records < REJECT_DIAG_DEBUG_THRESHOLD)
|
|
|
+
|
|
|
+ if(ctx->num_free_records < REJECT_DIAG_DEBUG_THRESHOLD)
|
|
|
{
|
|
|
- ignore = 1; //then for safety, ignore this message
|
|
|
+ ignore = 1; //then for safety, ignore this message
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- ignore = 0; //otherwise, if we have plenty of space, store it
|
|
|
+ ignore = 0; //otherwise, if we have plenty of space, store it
|
|
|
}
|
|
|
break;
|
|
|
-
|
|
|
- case LOGLEVEL_WARN: //if this is a diagnostic message with a priority of LOGLEVEL_WARN
|
|
|
+
|
|
|
+ case LOGLEVEL_WARN: //if this is a diagnostic message with a priority of LOGLEVEL_WARN
|
|
|
//and there are fewer than REJECT_DIAG_WARN_THRESHOLD free buffer slots
|
|
|
-
|
|
|
- if(ctx->num_free_records < REJECT_DIAG_WARN_THRESHOLD)
|
|
|
+
|
|
|
+ if(ctx->num_free_records < REJECT_DIAG_WARN_THRESHOLD)
|
|
|
{
|
|
|
- ignore = 1; //then for safety, ignore this message
|
|
|
+ ignore = 1; //then for safety, ignore this message
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- ignore = 0; //otherwise, if we have plenty of space, store it
|
|
|
+ ignore = 0; //otherwise, if we have plenty of space, store it
|
|
|
}
|
|
|
break;
|
|
|
-
|
|
|
- case LOGLEVEL_ERROR: //if this is a diagnostic message with a priority of LOGLEVEL_ERROR
|
|
|
+
|
|
|
+ case LOGLEVEL_ERROR: //if this is a diagnostic message with a priority of LOGLEVEL_ERROR
|
|
|
//and there are fewer than REJECT_DIAG_ERROR_THRESHOLD free buffer slots
|
|
|
-
|
|
|
- if(ctx->num_free_records < REJECT_DIAG_ERROR_THRESHOLD)
|
|
|
+
|
|
|
+ if(ctx->num_free_records < REJECT_DIAG_ERROR_THRESHOLD)
|
|
|
{
|
|
|
- ignore = 1; //then for safety, ignore this message
|
|
|
+ ignore = 1; //then for safety, ignore this message
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- ignore = 0; //otherwise, if we have plenty of space, store it
|
|
|
+ ignore = 0; //otherwise, if we have plenty of space, store it
|
|
|
}
|
|
|
break;
|
|
|
-
|
|
|
- default: //If this message does not start with a known diagnostic priority code, it is a billing message
|
|
|
- ignore = 0; //and therefore can never be ignored.
|
|
|
+
|
|
|
+ default: //If this message does not start with a known diagnostic priority code, it is a billing message
|
|
|
+ ignore = 0; //and therefore can never be ignored.
|
|
|
break;
|
|
|
}
|
|
|
|
|
|
|
|
|
- if(ignore) //If we've decided to ignore the message
|
|
|
+ if(ignore) //If we've decided to ignore the message
|
|
|
{
|
|
|
- // This means we are ignoring a message. This should only happen when buffer space
|
|
|
- // is scarce and the message is a diagnostic log message whith a priority that does not
|
|
|
+ // This means we are ignoring a message. This should only happen when buffer space
|
|
|
+ // is scarce and the message is a diagnostic log message whith a priority that does not
|
|
|
// justify potentially missing a billing entry later.
|
|
|
}
|
|
|
- else //Otherwise, we want to process it as normal.
|
|
|
- {
|
|
|
- retval = add_billing_entry(ctx, (char *)msg->payload); //Attempt to add the message to the billing log
|
|
|
+ else //Otherwise, we want to process it as normal.
|
|
|
+ {
|
|
|
+ retval = add_billing_entry(ctx, (char *)msg->payload); //Attempt to add the message to the billing log
|
|
|
|
|
|
- switch(retval) //Test to see if that worked
|
|
|
+ switch(retval) //Test to see if that worked
|
|
|
{
|
|
|
- case FAIL_FULL: //if the add failed
|
|
|
+ case FAIL_FULL: //if the add failed
|
|
|
case FAIL_MEM:
|
|
|
- if(commhub_fd >= 0) //and we can talk to the commhub
|
|
|
+ if(commhub_fd >= 0) //and we can talk to the commhub
|
|
|
{
|
|
|
struct message_record error_message;
|
|
|
-
|
|
|
- format_driver_message(&error_message, LOGLEVEL_ERROR, "LOG FULL: Call Dispatch"); //notify the driver
|
|
|
+
|
|
|
+ format_driver_message(&error_message, LOGLEVEL_ERROR, "LOG FULL: Call Dispatch"); //notify the driver
|
|
|
send_message(commhub_fd, &error_message);
|
|
|
}
|
|
|
break;
|
|
|
-
|
|
|
- default: //Otherwise, if the add succeded
|
|
|
+
|
|
|
+ default: //Otherwise, if the add succeded
|
|
|
real_bill_status.unsynced_messages = NUM_BILLING_ENTRIES - ctx->num_free_records; //update buffer count
|
|
|
break;
|
|
|
}
|
|
|
|
|
|
- send_bill_update(0); //In either case, sent a status update if needed.
|
|
|
+ send_bill_update(0); //In either case, sent a status update if needed.
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
return MESSAGE_HANDLED_CONT;
|
|
|
}
|
|
|
|
|
|
message_callback_return handle_status_request_message(struct message_record *msg, void *param)
|
|
|
{
|
|
|
billdb_context *ctx = (billdb_context *)param;
|
|
|
-
|
|
|
+
|
|
|
real_bill_status.unsynced_messages = NUM_BILLING_ENTRIES - ctx->num_free_records; //update buffer count
|
|
|
-
|
|
|
+
|
|
|
//force a billing log status update if one is requested by a newly subscribing module
|
|
|
send_bill_update(1);
|
|
|
return MESSAGE_HANDLED_CONT;
|
|
|
}
|
|
|
|
|
|
-int paddle_ack_flag = 0; //a variable to track if a paddle selection success has occurred (a driver has logged in)
|
|
|
+int paddle_ack_flag = 0; //a variable to track if a paddle selection success has occurred (a driver has logged in)
|
|
|
|
|
|
//this callback catches paddle selection acknowledge messages on the MAILBOX_PADDLE_ACK mailing list and
|
|
|
-//test to see if they report success, if so it sets the above flag so that we have the opportunity to warn a
|
|
|
+//test to see if they report success, if so it sets the above flag so that we have the opportunity to warn a
|
|
|
//newly logged in driver about a high-water-mark condition.
|
|
|
message_callback_return handle_paddle_ack(struct message_record *msg, void *param)
|
|
|
{
|
|
|
set_paddle_req *payload = (set_paddle_req *)msg->payload;
|
|
|
-
|
|
|
+
|
|
|
if(payload)
|
|
|
{
|
|
|
- paddle_ack_flag = (payload->request == payload->result);
|
|
|
+ paddle_ack_flag = (payload->request == payload->result);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
return MESSAGE_HANDLED_CONT;
|
|
|
}
|
|
|
|
|
|
@@ -439,7 +439,7 @@ int main(int argc, char **argv)
|
|
|
int read_return = 0;
|
|
|
|
|
|
int next_sync_idx = -1;
|
|
|
-
|
|
|
+
|
|
|
int i;
|
|
|
|
|
|
struct message_record incoming_msg;
|
|
|
@@ -450,26 +450,26 @@ int main(int argc, char **argv)
|
|
|
int checked_idx = 0;
|
|
|
|
|
|
billdb_context ctx = {0};
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
+
|
|
|
//------------------
|
|
|
|
|
|
- configure_signal_handlers(argv[0]);
|
|
|
+ configure_signal_handlers(argv[0]);
|
|
|
maintain_ipc_hub_connect(argv[0]);
|
|
|
register_system_status_callbacks();
|
|
|
-
|
|
|
+
|
|
|
//register our module-specific callback
|
|
|
register_dispatch_callback(MAILBOX_BILLING_LOG, CALLBACK_USER(1), handle_billing_log_message, &ctx);
|
|
|
register_dispatch_callback(MAILBOX_STATUS_REQUEST, CALLBACK_USER(2), handle_status_request_message, &ctx);
|
|
|
register_dispatch_callback(MAILBOX_PADDLE_ACK, CALLBACK_USER(3), handle_paddle_ack, NULL);
|
|
|
-
|
|
|
+
|
|
|
read_return = attach_to_billdb(&ctx);
|
|
|
-
|
|
|
+
|
|
|
if( read_return < 0 )
|
|
|
{
|
|
|
fprintf(stdout, "Database is missing or corrupt. Attempting to format new database.\n");
|
|
|
read_return = format_new_billdb();
|
|
|
-
|
|
|
+
|
|
|
if( read_return < 0 )
|
|
|
{
|
|
|
fprintf(stderr, "Database cannot be created. Aborting!\n");
|
|
|
@@ -478,7 +478,7 @@ int main(int argc, char **argv)
|
|
|
else
|
|
|
{
|
|
|
read_return = attach_to_billdb(&ctx);
|
|
|
-
|
|
|
+
|
|
|
if( read_return < 0 )
|
|
|
{
|
|
|
fprintf(stderr, "New database is ALSO missing or corrupt. Aborting.\n");
|
|
|
@@ -486,28 +486,32 @@ int main(int argc, char **argv)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
while( exit_request_status == EXIT_REQUEST_NONE )
|
|
|
{
|
|
|
time_t now = time(NULL);
|
|
|
-
|
|
|
+
|
|
|
RESET_WATCHDOG();
|
|
|
-
|
|
|
+
|
|
|
+ //DEBUG
|
|
|
+ printf("[%lli] billdb: heartbeat\n", get_usec_time());
|
|
|
+ //DEBUG
|
|
|
+
|
|
|
maintain_ipc_hub_connect(argv[0]);
|
|
|
-
|
|
|
- if(server_fd < 0) //If we don't have a connection to the sync server...
|
|
|
+
|
|
|
+ if(server_fd < 0) //If we don't have a connection to the sync server...
|
|
|
{
|
|
|
- if( (now - last_sync_attempt) > DEFAULT_CONNECT_RETRY ) //See if it is time to try again
|
|
|
+ if( (now - last_sync_attempt) > DEFAULT_CONNECT_RETRY ) //See if it is time to try again
|
|
|
{
|
|
|
- if( tunnel_is_up() ) //and if the tunnel thinks it is up
|
|
|
+ if( tunnel_is_up() ) //and if the tunnel thinks it is up
|
|
|
{
|
|
|
- server_fd = connect_to_bill_server(); //if so, try again...
|
|
|
-
|
|
|
+ server_fd = connect_to_bill_server(); //if so, try again...
|
|
|
+
|
|
|
// printf("server_fd = %d\n", server_fd);
|
|
|
-
|
|
|
- if(server_fd >= 0) //if it worked
|
|
|
+
|
|
|
+ if(server_fd >= 0) //if it worked
|
|
|
{
|
|
|
- input_idx = 0; //reset our buffer index
|
|
|
+ input_idx = 0; //reset our buffer index
|
|
|
last_sync_attempt = 0;
|
|
|
}
|
|
|
else
|
|
|
@@ -523,33 +527,33 @@ int main(int argc, char **argv)
|
|
|
//freshly logged in, we send a warning and reset our frequency timers.
|
|
|
handle_watermark_warnings(&ctx, paddle_ack_flag);
|
|
|
paddle_ack_flag = 0;
|
|
|
-
|
|
|
+
|
|
|
nfds=0;
|
|
|
-
|
|
|
+
|
|
|
//If we have a connection to the billing server
|
|
|
if(server_fd >= 0)
|
|
|
{
|
|
|
//tell poll that we care about it
|
|
|
fds[nfds].fd = server_fd;
|
|
|
fds[nfds].events = POLLIN;
|
|
|
-
|
|
|
+
|
|
|
//See if the database thinks we have anything that we need to try and sync right now...
|
|
|
next_sync_idx = next_pending_entry(&ctx);
|
|
|
-
|
|
|
- if(next_sync_idx >= 0) //If so, ask poll() to unblock us when there is room in the pipe to the server.
|
|
|
+
|
|
|
+ if(next_sync_idx >= 0) //If so, ask poll() to unblock us when there is room in the pipe to the server.
|
|
|
{
|
|
|
fds[nfds].events |= POLLOUT;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
//and advance the counter of active file descriptors
|
|
|
nfds++;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if(commhub_fd >= 0)
|
|
|
{
|
|
|
fds[nfds].fd = commhub_fd;
|
|
|
fds[nfds].events = POLLIN;
|
|
|
-
|
|
|
+
|
|
|
nfds++;
|
|
|
}
|
|
|
|
|
|
@@ -567,22 +571,32 @@ int main(int argc, char **argv)
|
|
|
{
|
|
|
continue;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ //DEBUG
|
|
|
+ printf("## cp.nfd %i\n", (int)nfds);
|
|
|
+
|
|
|
for(i=0; i < nfds; i++)
|
|
|
{
|
|
|
if( fds[i].fd == server_fd )
|
|
|
{
|
|
|
-
|
|
|
+
|
|
|
if(fds[i].revents & POLLIN)
|
|
|
{
|
|
|
-
|
|
|
+
|
|
|
read_return = recv(fds[i].fd, input_line + input_idx, sizeof(input_line) - input_idx, 0);
|
|
|
-
|
|
|
+
|
|
|
//If the socket has closed politely (0), or had an error other than EINTR...
|
|
|
if( (read_return == 0) || ((read_return < 0) && (errno != EINTR)) )
|
|
|
{
|
|
|
+
|
|
|
+ //DEBUG
|
|
|
+ printf("## bang\n");
|
|
|
+
|
|
|
close(server_fd);
|
|
|
server_fd = -1;
|
|
|
+
|
|
|
+ usleep(POLL_TIMEOUT * 1000);
|
|
|
+
|
|
|
break;
|
|
|
}
|
|
|
else
|
|
|
@@ -591,7 +605,7 @@ int main(int argc, char **argv)
|
|
|
if(read_return > 0)
|
|
|
{
|
|
|
input_idx += read_return;
|
|
|
-
|
|
|
+
|
|
|
do
|
|
|
{
|
|
|
//Advance until we either hit the end of the buffer, or we hit a line-terminator
|
|
|
@@ -606,15 +620,15 @@ int main(int argc, char **argv)
|
|
|
checked_idx++;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
//If we didn't hit the end of the input...
|
|
|
if(checked_idx != input_idx)
|
|
|
{
|
|
|
int j,k;
|
|
|
-
|
|
|
+
|
|
|
//Null terminate the line we got as a string...
|
|
|
input_line[checked_idx] = '\0';
|
|
|
-
|
|
|
+
|
|
|
//Do something useful with the string input_buffer...
|
|
|
|
|
|
if( handle_bill_reply(input_line, &ctx) < 0 )
|
|
|
@@ -630,20 +644,20 @@ int main(int argc, char **argv)
|
|
|
|
|
|
//Update our pass time status...
|
|
|
send_bill_update(0);
|
|
|
- }
|
|
|
-
|
|
|
+ }
|
|
|
+
|
|
|
//Now that we've done that, we can bump the rest of characters to the beginning of the next line...
|
|
|
k = input_idx - (checked_idx + 1);
|
|
|
-
|
|
|
+
|
|
|
for(j=0; j < k; j++)
|
|
|
{
|
|
|
input_line[j] = input_line[j + checked_idx + 1];
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
input_idx = j;
|
|
|
checked_idx = 0;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
//If we have hit an overflow condition such that our buffer is full and no newline has been received
|
|
|
if(input_idx == sizeof(input_line))
|
|
|
{
|
|
|
@@ -651,7 +665,7 @@ int main(int argc, char **argv)
|
|
|
server_fd = -1;
|
|
|
break;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
} while(checked_idx < input_idx);
|
|
|
}
|
|
|
}
|
|
|
@@ -663,7 +677,7 @@ int main(int argc, char **argv)
|
|
|
break;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
+
|
|
|
if(fds[i].revents & POLLOUT)
|
|
|
{
|
|
|
//send more logs here
|
|
|
@@ -671,36 +685,36 @@ int main(int argc, char **argv)
|
|
|
real_bill_status.last_sync_time = time(NULL);
|
|
|
send_bill_update(0);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
}
|
|
|
else if( fds[i].fd == commhub_fd )
|
|
|
- {
|
|
|
+ {
|
|
|
//If we've lost connection, break this loop and poll all over again
|
|
|
- if(fds[i].revents & (POLLERR | POLLHUP | POLLNVAL))
|
|
|
+ if(fds[i].revents & (POLLERR | POLLHUP | POLLNVAL))
|
|
|
{
|
|
|
close(commhub_fd);
|
|
|
commhub_fd = -1;
|
|
|
break;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if(fds[i].revents & POLLIN)
|
|
|
{
|
|
|
// printf("Trying to read from hub...\n");
|
|
|
read_return = get_message(commhub_fd, &incoming_msg);
|
|
|
-
|
|
|
+
|
|
|
if( read_return < 0 )
|
|
|
{
|
|
|
close(commhub_fd);
|
|
|
commhub_fd = -1;
|
|
|
break;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
process_message(&incoming_msg);
|
|
|
-
|
|
|
+
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
}
|
|
|
- }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
printf("Detatching from Bill Database\n");
|
|
|
@@ -720,7 +734,7 @@ int main(int argc, char **argv)
|
|
|
}
|
|
|
|
|
|
printf("Goodbye.\n");
|
|
|
-
|
|
|
+
|
|
|
|
|
|
return 0;
|
|
|
}
|