/*
* Copyright (c) 2019 Clementine Computing LLC.
*
* This file is part of PopuFare.
*
* PopuFare is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* PopuFare is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with PopuFare. If not, see .
*
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "../common/common_defs.h"
#include "../commhub/commhub.h"
#include "../commhub/client_utils.h"
#include "passdb_slim.h"
#include "rules.h"
#include "rfid_decoder.h"
// --------------------- DEBUG -----------------------------
int g_debug_flag = 0;
#include
void debug_print_file(char *fn, char *msg, ... )
{
va_list p_arg;
FILE *fp;
va_start(p_arg, msg);
if ( !(fp = fopen(fn, "a")) ) return;
vfprintf(fp, msg, p_arg);
fclose(fp);
va_end(p_arg);
}
// --------------------- DEBUG -----------------------------
//----------GLOBAL STATE VARIABLES
int flush_in_progress = 0; //This flag is used to tell if there is a flush in progress
time_t last_sync_attempt = 0; //Time of the last sync attempt in seconds since epoch
time_t last_sync_ack = 0; //Time of the last ack from the server (update, delete, or nop)
int commhub_fd = -1; //File descriptor of our connection to the comm hub
int server_fd = -1; //File descriptor of our connection to the sync server
int zflush_in_progress = 0;
void *zflush_data = NULL;
int zflush_data_size = 0;
int zflush_idx = 0;
time_t last_piu_update = 0; //Last PIU status update
pass_status real_pass_status = {0};
int update_piu_status_message(int busy)
{
struct message_record outgoing;
time_t now = time(NULL);
struct tm t;
char *status = "READY";
int am_pm;
int hour;
if(busy)
{
status = "BUSY";
}
else
{
if( real_pass_status.flush_status == FLUSH_STATUS_APPLY )
{
status = "APPLY";
}
else if(real_pass_status.flush_status == FLUSH_STATUS_WRITE)
{
status = "SAVING";
}
else if( !rules_loaded() ||
(real_pass_status.flush_status == FLUSH_STATUS_LEGACY) ||
(real_pass_status.flush_status == FLUSH_STATUS_DOWNLOAD) )
{
status = "LOADING";
}
else if( !driver_stat.logged_in_driver )
{
status = "SEE DRIVER";
}
}
if(commhub_fd >= 0)
{
localtime_r(&now, &t);
if( (t.tm_hour < 12) )
{
am_pm = 0;
if(t.tm_hour == 0)
{
hour = 12;
}
else
{
hour = t.tm_hour;
}
}
else
{
am_pm = 1;
if(t.tm_hour == 12)
{
hour = 12;
}
else
{
hour = t.tm_hour - 12;
}
}
format_piu_message(&outgoing, 0, 0, 0, "%d/%d %d:%02d %s %s", t.tm_mon + 1, t.tm_mday, hour, t.tm_min, am_pm?"PM":"AM", status);
send_message(commhub_fd, &outgoing);
if(real_pass_status.flush_status == FLUSH_STATUS_DOWNLOAD)
{
format_piu_message(&outgoing, 1, 0, 0, "Downloading %d%%", real_pass_status.progress_indicator);
send_message(commhub_fd, &outgoing);
}
else if(real_pass_status.flush_status == FLUSH_STATUS_APPLY)
{
format_piu_message(&outgoing, 1, 0, 0, "Applying %d%%", real_pass_status.progress_indicator);
send_message(commhub_fd, &outgoing);
}
else if(real_pass_status.flush_status == FLUSH_STATUS_WRITE)
{
format_piu_message(&outgoing, 1, 0, 0, "Saving Database");
send_message(commhub_fd, &outgoing);
}
else
{
format_piu_message(&outgoing, 1, 0, 0, "");
send_message(commhub_fd, &outgoing);
}
}
return 0;
}
//This function attempts to connect to the pass server...
int connect_to_pass_server()
{
int fd;
int retval;
struct sockaddr_in addr;
fd = socket(PF_INET, SOCK_STREAM, 0);
if(fd < 0)
return -1;
addr.sin_family = AF_INET;
addr.sin_port = htons(PASS_SERVER_PORT);
inet_aton(PASS_SERVER_IP, &addr.sin_addr);
retval = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
if(retval < 0)
{
close(fd);
return -2;
}
return fd;
}
int do_database_flush(passdb_slim_context *ctx)
{
int retval;
//Detach from the current pass database
detach_from_passdb(ctx);
//Format a new database file...
//format_new_passdb();
format_new_passdbs();
retval = attach_to_passdb(ctx);
if( DB_FAIL(retval) ) //If we fail to attach to our new database
{
fprintf(stderr, "Error (%d) attaching to pass database during flush attempt\n", retval); //report failure
return retval;
}
else //Otherwise, report success.
{
printf("Pass database flushed!\n");
return 0;
}
}
int send_query_message(int fd, passdb_slim_context *ctx)
{
char query[LINE_BUFFER_SIZE] = {0};
int i, n;
int retval;
if(fd < 0)
return -1;
n = sprintf(query, "QUERY\t%llu\n", ctx->seq);
// printf("Sending: \"%s\"\n", query);
i = 0;
while(i < n)
{
retval = send(fd, query + i, n - i, 0);
if(retval <= 0)
return -1;
i += retval;
}
return 0;
}
int send_flushreq_message(int fd)
{
char query[LINE_BUFFER_SIZE] = {0};
int i, n;
int retval;
if(fd < 0)
return -1;
n = sprintf(query, "QUERY\t0\n");
// printf("Sending: \"%s\"\n", query);
i = 0;
while(i < n)
{
retval = send(fd, query + i, n - i, 0);
if(retval <= 0)
return -1;
i += retval;
}
return 0;
}
// This function sends our local buspass state structure (real_pass_status) to other modules if either the force flag is set,
//or if it differs from the last one received from the IPC hub.
//
// This function returns:
//
// < 0 Failure in communication (IPC socket is down...)
// == 0 No news to send (force was not set and our status has not changed)
// == 1 News was successfully sent.
//
// This is useful because often the next thing we want to do after sending a status update is update the display on the
// PIU, but we only want to do that when the update represents a meaningful change which the user will notice (so as not
// to flood that serial port with redundant change notifications).
//
int send_pass_state(int force)
{
struct message_record outgoing_msg;
int retval;
// Only actually perform the send if EITHER: The force flag has been set, OR the status message we are sending
//differs from the last status message we received from ourselves via the IPC hub (which should model what other
//modules think out status currently is).
if(force || memcmp(&real_pass_status, &pass_stat, sizeof(real_pass_status)))
{
prepare_message(&outgoing_msg, MAILBOX_PASS_STATUS, &real_pass_status, sizeof(real_pass_status));
retval = send_message(commhub_fd,&outgoing_msg);
return (retval < 0)?retval:1;
}
return 0;
}
// This function updates our local buspass state structure (real_pass_status) from the internal state of other data structures
//and using the two parameters (flush_status (see ../commhub/commhub.h for values) and a progress indicater (in percent),
//a copy of our local status to other modules if it differs from the last state (using the send_pass_state() function above).
int update_pass_state(int flush_status, int progress)
{
real_pass_status.progress_indicator = 0;
switch(flush_status)
{
case FLUSH_STATUS_DOWNLOAD: //This means we're in the process of downloading a ZFLUSH dataset
real_pass_status.flush_status = FLUSH_STATUS_DOWNLOAD;
if(progress == 0) //if no progress indicator number is supplied
{
if(zflush_data_size != 0) //and it is possible to calculate
{
//use our download count / expected size to calculate a percentage
real_pass_status.progress_indicator = (100 * zflush_idx) / zflush_data_size;
}
}
else //if a progress indicator is supplied, simnply use it...
{
real_pass_status.progress_indicator = progress;
}
break;
case FLUSH_STATUS_APPLY: //In the case of APPLY, WRITE, or LEGACY, just take the progress number on faith
case FLUSH_STATUS_WRITE:
case FLUSH_STATUS_LEGACY:
{
real_pass_status.flush_status = flush_status;
real_pass_status.progress_indicator = progress;
}
break;
default: //Otherwise, just let the other modules know we're feeling normal today...
real_pass_status.flush_status = FLUSH_STATUS_NORMAL;
break;
}
//inform other modules of any changes...
return send_pass_state(0);
}
int do_zflush(passdb_slim_context *ctx, void *data, int len);
//Cleanup after (or abort in progress) a ZFLUSH attempt
int zflush_cleanup()
{
zflush_idx = 0;
flush_in_progress = 0;
zflush_in_progress = 0;
zflush_data_size = 0;
if(zflush_data)
{
free(zflush_data);
}
zflush_data = NULL;
return 0;
}
int handle_pass_update_request(char *line, passdb_slim_context *ctx, int sync)
{
rider_record foo = {0};
char buffer[LINE_BUFFER_SIZE];
int input_idx = 0;
int eol = 0;
//Extract the first tab-delimited field from the input line...
input_idx += get_field(buffer, line + input_idx, sizeof(buffer), &eol);
//If that field is blank, then we ignore this line
if( buffer[0] == '\0' )
{
return 0;
}
if (g_debug_flag)
{
printf("# request received '%s'\n", buffer);
}
if( !strcasecmp(buffer, "UPDATE") ) //========================================== UPDATE
{
if (g_debug_flag)
{
printf("# UPDATE request received '%s'\n", buffer);
}
//------------------------------------------------------------------ seq
if( eol )
{
fprintf(stderr, "UPDATE: Premature end of line!\n");
return -1;
}
//Get the next field (this should be sequence number...)
input_idx += get_field(buffer, line + input_idx, sizeof(buffer), &eol);
foo.seq = strtoull(buffer, NULL, 10);
if( foo.seq == 0 )
{
fprintf(stderr, "UPDATE: Blank or zero sequence number not allowed!\n");
return -1;
}
//------------------------------------------------------------------ ID
if( eol )
{
fprintf(stderr, "UPDATE: Premature end of line!\n");
return -1;
}
//Get the next field (this should be Rider ID...)
input_idx += get_field(buffer, line + input_idx, sizeof(buffer), &eol);
foo.id = strtoull(buffer, NULL, 10);
if( foo.id == 0 )
{
fprintf(stderr, "UPDATE: Blank or zero rider ID not allowed!\n");
return -1;
}
//------------------------------------------------------------------ Magstripe
if( eol )
{
fprintf(stderr, "UPDATE: Premature end of line!\n");
return -1;
}
//Get the next field (this should be the Magstripe Value...)
input_idx += get_field(foo.magstripe_value, line + input_idx, sizeof(foo.magstripe_value), &eol);
//------------------------------------------------------------------ RFID
if( eol )
{
fprintf(stderr, "UPDATE: Premature end of line!\n");
return -1;
}
//Get the next field (this should be the RFID Value...)
input_idx += get_field(foo.rfid_value, line + input_idx, sizeof(foo.rfid_value), &eol);
//------------------------------------------------------------------ Rule Name
if( eol )
{
fprintf(stderr, "UPDATE: Premature end of line!\n");
return -1;
}
//Get the next field (this should be the Rule Name...)
input_idx += get_field(foo.rule_name, line + input_idx, sizeof(foo.rule_name), &eol);
//------------------------------------------------------------------ Rule Parameter
if( eol )
{
fprintf(stderr, "UPDATE: Premature end of line!\n");
return -1;
}
//Get the next field (this should be the Rule Parameter...)
input_idx += get_field(foo.rule_param, line + input_idx, sizeof(foo.rule_param), &eol);
//If we have extra fields, that's BAD NEWS!
if( !eol )
{
fprintf(stderr, "UPDATE: Too many fields!\n");
return -1;
}
//If everything else is okay, go ahead and update our local database
last_sync_ack = time(NULL);
return update_rider(ctx, &foo, sync); //returning the status of that operation
}
else if( !strcasecmp(buffer, "DELETE") ) //========================================== DELETE
{
//------------------------------------------------------------------ seq
if( eol )
{
fprintf(stderr, "DELETE: Premature end of line!\n");
return -1;
}
//Get the next field (this should be sequence number...)
input_idx += get_field(buffer, line + input_idx, sizeof(buffer), &eol);
foo.seq = strtoull(buffer, NULL, 10);
if( foo.seq == 0 )
{
fprintf(stderr, "DELETE: Blank or zero sequence number not allowed!\n");
return -1;
}
//------------------------------------------------------------------ ID
if( eol )
{
fprintf(stderr, "DELETE: Premature end of line!\n");
return -1;
}
//Get the next field (this should be Rider ID...)
input_idx += get_field(buffer, line + input_idx, sizeof(buffer), &eol);
foo.id = strtoull(buffer, NULL, 10);
if( foo.id == 0 )
{
fprintf(stderr, "DELETE: Blank or zero rider ID not allowed!\n");
return -1;
}
//If we have extra fields, that's BAD NEWS!
if( !eol )
{
fprintf(stderr, "DELETE: Too many fields!\n");
return -1;
}
//If everything else is okay, go ahead and update our local database
last_sync_ack = time(NULL);
return delete_rider(ctx, &foo, sync); //returning the status of that operation
}
else if( !strcasecmp(buffer, "FLUSH") ) //========================================== FLUSH
{
// DEBUG
printf("DEBUG: FLUSH RECEIVED\n");
flush_in_progress = 1;
if(update_pass_state(FLUSH_STATUS_LEGACY, 0) > 0)
{
update_piu_status_message(0);
}
fprintf(stderr, "Legacy Flush Requested!\n");
last_sync_ack = time(NULL);
return do_database_flush(ctx);
}
else if( !strcasecmp(buffer, "FLUSHDONE") ) //========================================== FLUSHDONE
{
// DEBUG
printf("DEBUG: FLUSHDONE RECEIVED\n");
flush_in_progress = 0;
if(update_pass_state(FLUSH_STATUS_NORMAL, 0) > 0)
{
update_piu_status_message(0);
}
fprintf(stderr, "Legacy Flush Done!\n");
last_sync_ack = time(NULL);
return 0;
}
else if( !strcasecmp(buffer, "ZFLUSH") ) //========================================== FLUSH
{
// DEBUG
printf("DEBUG: ZFLUSH RECEIVED\n");
//------------------------------------------------------------------ seq
if( eol )
{
fprintf(stderr, "ZFLUSH: Premature end of line!\n");
return -1;
}
//Get the next field (this should be sequence number...)
input_idx += get_field(buffer, line + input_idx, sizeof(buffer), &eol);
zflush_data_size = strtoull(buffer, NULL, 10);
zflush_data = malloc(zflush_data_size);
if(zflush_data == NULL)
{
fprintf(stderr, "Can't malloc %d bytes for ZFLUSH!\n", zflush_data_size);
return FAIL_MEM;
}
printf("ZFLUSH: Downloading %d bytes.\n", zflush_data_size);
zflush_idx = 0;
zflush_in_progress = 1;
flush_in_progress = 1;
if(update_pass_state(FLUSH_STATUS_DOWNLOAD, 0) > 0) //Let update_pass_state() calculate our download percentage
{
update_piu_status_message(0);
}
last_sync_ack = time(NULL);
return 0;
}
else if( !strcasecmp(buffer, "ZFLUSHDONE") ) //========================================== FLUSHDONE
{
// DEBUG
printf("DEBUG: ZFLUSHDONE RECEIVED\n");
fprintf(stderr, "ZFlush applying...\n");
if(update_pass_state(FLUSH_STATUS_APPLY, 0) > 0) //Mark that we're starting an apply operation
{
update_piu_status_message(0);
}
do_zflush(ctx, zflush_data, zflush_data_size);
zflush_cleanup();
if(update_pass_state(FLUSH_STATUS_NORMAL, 0) > 0)
{
update_piu_status_message(0);
}
fprintf(stderr, "ZFlush Done!\n");
//DEBUG
fprintf(stderr, "CONSISTENCY CHECK: %i\n" , passdb_slim_consistency_check( ctx ) );
last_sync_ack = time(NULL);
return 0;
}
else if( !strcasecmp(buffer, "NOP") ) //========================================== NOP (this just updates ACK time)
{
last_sync_ack = time(NULL);
return 0;
}
else
{
fprintf(stderr, "Unrecognized command: %s\n", buffer);
return -1;
}
}
int do_zflush(passdb_slim_context *ctx, void *data, int len)
{
char line[LINE_BUFFER_SIZE] = {0};
z_stream foo = {0};
int zretval;
int retval;
int i,j,k;
int nout;
struct message_record incoming_msg;
foo.zalloc = Z_NULL;
foo.zfree = Z_NULL;
foo.opaque = Z_NULL;
foo.next_in = zflush_data;
foo.avail_in = zflush_data_size;
foo.next_out = (void *)line;
foo.avail_out = sizeof(line);
foo.total_out = 0;
// printf("do_zflush(%p, %p, %d)\n", ctx, data, len);
retval = inflateInit(&foo);
if(retval != Z_OK)
{
fprintf(stderr, "inflateInit failed: Zlib error %d\n", retval);
return FAIL_DATABASE;
}
retval = do_database_flush(ctx);
if( DB_FAIL(retval) )
{
fprintf(stderr, "Database failure during ZFLUSH while performing flush\n");
return retval;
}
do
{
RESET_WATCHDOG();
//Respond to PING messages from the supervisor process lest we get killed off too early...
while(message_socket_status(commhub_fd, MSG_RECV) & MSG_RECV)
{
if(get_message(commhub_fd, &incoming_msg) >= 0)
{
process_message(&incoming_msg);
}
}
zretval = inflate(&foo, Z_NO_FLUSH);
if( (zretval == Z_NEED_DICT) || (zretval == Z_DATA_ERROR) || (zretval == Z_MEM_ERROR) || (zretval == Z_STREAM_ERROR) )
{
fprintf(stderr, "Error inflating data: Zlib error %d\n", zretval);
inflateEnd(&foo);
return FAIL_DATABASE;
}
//The number of output bytes generated is the size of the buffer - number of free bytes
nout = sizeof(line) - foo.avail_out;
// printf("-- %d output bytes\n", nout);
j = 0;
//Scan through this buffer looking for EOL characters
for(i=0; i < nout; i++)
{
if(line[i] == '\n') //if we've found one
{
line[i] = '\0'; //make it a terminating nul
//DEBUG
//printf("DEBUG: ZFLUSH->%s\n", line + j);
//handle our pass update
#ifdef ZFLUSH_SLOW_BUT_SAFE
//In slow but safe mode, we msync() (or write()) the memory page containing each record after each update.
//This is hell on the flash, and it takes a long time, but it means that we cannot POSSIBLY write a lower
//sequence number record to the physical storage media before having physically written all previous
//sequence numbers in the batch.
retval = handle_pass_update_request(line + j, ctx, 1);
#else
//In fast mode, we apply all of our update to the memory window without calling msync() or write(), and then
//(see below) when we're done rewriting the mmap()'d window for the local database, then we make one call
//to msync() or write() and splat it all down to flash at once. This gets us a better compression ratio
//on the large chunks of file (like the Cornell passes) which change infrequently and it produces less
//filesystem fragmentation, but it means that it is theoretically possible for a power event to cause a
//partial write to corrupt the local database (requiring a new flush). That being said, when running on
//top of JFFS2, this should cause file truncation, which will be easily detectable and trigger another
//flush request next time the application comes up.
retval = handle_pass_update_request(line + j, ctx, 0);
#endif
if( DB_FAIL(retval) )
{
inflateEnd(&foo);
fprintf(stderr, "Database failure during ZFLUSH while processing line \"%s\"\n", line + j);
return retval;
}
//mark the first character of the next line
j = i + 1;
}
}
//k will be equal to the number of leftover bytes
k = i - j;
// printf("-- i = %d j = %d k = %d\n", i, j, k);
//copy those leftovers back to the beginning
for(i = 0; i < k; i++)
{
line[i] = line[j + i];
}
//and update our output buffer structure
foo.next_out = (void *)(line + k);
foo.avail_out = sizeof(line) - k;
if(foo.avail_out == 0)
{
fprintf(stderr, "Line too long in decompressed ZFLUSH data!\n");
inflateEnd(&foo);
return FAIL_DATABASE;
}
// Update the other modules with our status (APPLY) and a progress indicator based on the amount of compressed data
//read as a percentage of the total ZFLUSH data blob.
if(update_pass_state(FLUSH_STATUS_APPLY, (foo.total_in * 100) / zflush_data_size) > 0)
{
update_piu_status_message(0);
}
} while(zretval != Z_STREAM_END);
inflateEnd(&foo);
//In fast mode, we have to ensure that we write our big block of changes back to secondary storage all at once
#ifndef ZFLUSH_SLOW_BUT_SAFE
if(update_pass_state(FLUSH_STATUS_WRITE, 0) > 0) //Let the other modules know we're performing the final write...
{
update_piu_status_message(0);
}
fprintf(stderr, "Doing one-shot file write...\n");
// This resets the watchdog timer for a double-long timeframe (120 seconds these days...) allowing the write to be
//slow as dirt (this is rarely needed, but if the flash was badly fragmented it can take some time to coalesce free
//blocks and possibly perform some erase cycles.
RESET_WATCHDOG_LONG(2);
// This calls msync() if we're working with a non-broken implementation of mmap() (i.e. non-jffs2) or write()
//to take our updated state from RAM and save it to the underlying file.
sync_all_riders(ctx);
fprintf(stderr, "Done.\n");
#endif
// After completing a ZFLUSH operation, we want to hang up on the server (and reconnect asap). The server should
//also hang up on us after completing transmission of a ZFLUSH dataset. This effectively clears the buffers going both
//ways which prevents tha case where a high communication (or server-side database) lag causes two "QUERY 0" messages
//to be issued in a row, the first one triggering (correctly) a ZFLUSH which will be completed, then the server will read
//the second (now obsolete) "QUERY 0" message and respond to it with another (now just useless and annoying) ZFLUSH.
close(server_fd); //We just do a rude close... The server does a proper shutdown(fd, 2), and the FIN will have reached
//us LONG before we execute this...
server_fd = -1; //Mark the TCP connection to the server as dead so we flush buffers and restart the connection next
//time through the loop.
return 0;
}
//callback handles MAILBOX_UPDATE_PASSES
message_callback_return handle_update_passes_message(struct message_record *msg, void *param)
{
//flag our last sync attempt as NEVER
last_sync_attempt = 0;
return MESSAGE_HANDLED_CONT;
}
//callback handles MAILBOX_FLUSH_PASSES
message_callback_return handle_flush_passes_message(struct message_record *msg, void *param)
{
//DEBUG
printf("DEBUG: hanlde_flush_passes_message server_fd %i, tunnel_is_up() %i real_pass_stats.flush_stats %i ==? (FLUSH_STATUS_NORMAL %i)\n",
server_fd, tunnel_is_up(), real_pass_status.flush_status, FLUSH_STATUS_NORMAL
);
//We only want to allow a FLUSH_PASSES request to go through if the following conditions are met:
if( (server_fd >= 0) && //1: We have a connection to the server
(tunnel_is_up()) && //2: That connection is real (i.e. the SSH tunnel is in a known good state)
(real_pass_status.flush_status == FLUSH_STATUS_NORMAL) //3: we are not currently in the middle of doing a flush...
)
{
send_flushreq_message(server_fd);
}
return MESSAGE_HANDLED_CONT;
}
//callback handles MAILBOX_STATUS_REQUEST
message_callback_return handle_status_request_message(struct message_record *msg, void *param)
{
// If we're responding to a status request message, we have to force a send even if it seems redundant because odds are
//the requesting module just came on line and has no current status data for us.
send_pass_state(1);
return MESSAGE_HANDLED_CONT;
}
int reject_unknown_card(char *cred)
{
struct message_record bill_msg;
struct message_record user_msg;
struct message_record rider_msg;
format_piu_message(&rider_msg, 1, PIU_PRIORITY_FARE, PASSENGER_MESSAGE_DURATION, "%s %s", REJECT_STR, "UNKNOWN CARD");
format_driver_message(&user_msg, LOGLEVEL_REJECT, "Unknown card");
format_billing_message(&bill_msg, REJECT_STR, "UNKNOWN-CARD", "", "Unknown card", cred, 0, 0);
if(commhub_fd >= 0)
{
send_message(commhub_fd, &bill_msg);
send_message(commhub_fd, &user_msg);
send_message(commhub_fd, &rider_msg);
return 0;
}
else
{
return -1;
}
}
#ifdef REJECT_IF_NO_DRIVER
int reject_no_driver()
{
struct message_record user_msg;
struct message_record rider_msg;
format_piu_message(&rider_msg, 1, PIU_PRIORITY_FARE, PASSENGER_MESSAGE_DURATION, "%s", "SEE DRIVER");
format_driver_message(&user_msg, LOGLEVEL_REJECT, "Not Logged In!");
if(commhub_fd >= 0)
{
send_message(commhub_fd, &user_msg);
send_message(commhub_fd, &rider_msg);
return 0;
}
else
{
return -1;
}
}
#endif
message_callback_return handle_token_mag_message(struct message_record *msg, void *param)
{
int idx;
char cred[LINE_BUFFER_SIZE] = {0};
passdb_slim_context *ctx = (passdb_slim_context *)param;
if(!ctx)
return MESSAGE_HANDLED_CONT;
update_piu_status_message(1);
#ifdef REJECT_IF_NO_DRIVER
if( (driver_stat.logged_in_driver <= 0) || (stop_stat.paddle <= 0) )
{
reject_no_driver();
return MESSAGE_HANDLED_CONT;
}
#endif
idx = smart_find_mag(ctx, (char *)msg->payload, cred);
if(idx < 0)
{
reject_unknown_card(cred);
}
else
{
process_rider(ctx, idx, cred);
}
return MESSAGE_HANDLED_CONT;
}
message_callback_return handle_token_rfid_message(struct message_record *msg, void *param)
{
int idx;
char cred[LINE_BUFFER_SIZE] = {0};
passdb_slim_context *ctx = (passdb_slim_context *)param;
if(!ctx)
return MESSAGE_HANDLED_CONT;
update_piu_status_message(1);
#ifdef REJECT_IF_NO_DRIVER
if( (driver_stat.logged_in_driver <= 0) || (stop_stat.paddle <= 0) )
{
reject_no_driver();
return MESSAGE_HANDLED_CONT;
}
#endif
idx = smart_find_rf(ctx, (char *)msg->payload, cred);
if(idx < 0)
{
reject_unknown_card(cred);
}
else
{
process_rider(ctx, idx, cred);
}
return MESSAGE_HANDLED_CONT;
}
message_callback_return update_anti_passback_cache(struct message_record *msg, void *param)
{
apb_flush_if_needed();
return MESSAGE_HANDLED_CONT;
}
message_callback_return handle_rule_call(struct message_record *msg, void *param)
{
process_driver_rulecall( (driver_rulecall *)msg->payload );
return MESSAGE_HANDLED_CONT;
}
message_callback_return handle_pulse_callback(struct message_record *msg, void *param)
{
passdb_slim_context *ctx = (passdb_slim_context *)param;
int n;
n = ctx->n_one_cred + ctx->n_two_cred + ctx->n_spillover;
fprintf(stderr, "PASSDB_PULSE: EQ# %i active %i (%i,%i,%i) [a%i,f%i] b(%i,%i,%i) rule(%i/%i)\n",
get_equip_num(), n,
ctx->n_one_cred, ctx->n_two_cred, ctx->n_spillover ,
ctx->num_active, ctx->num_free,
ctx->n_one_cred_bank, ctx->n_two_cred_bank, ctx->n_spillover_bank ,
ctx->ruleparam_db->n, RULEPARAM_DB_MAX
);
return MESSAGE_HANDLED_CONT;
}
message_callback_return handle_consistency_check_callback(struct message_record *msg, void *param)
{
int r;
passdb_slim_context *ctx = (passdb_slim_context *)param;
r = passdb_slim_consistency_check( ctx );
fprintf(stderr, "PASSDB_CONSIST: %i %s\n", r, (r==1) ? "(ok)" : "FAIL" );
return MESSAGE_HANDLED_CONT;
}
void maintain_ipc_hub_connect(char *progname)
{
struct message_record outgoing_msg;
if(commhub_fd < 0) //if we have no connection to the communication hub
{
commhub_fd = connect_to_message_server(progname); //try and get one
// printf("commhub_fd = %d\n", commhub_fd);
if(commhub_fd >= 0) //if it worked
{
//Subscribe to the command mailboxes we act on
prepare_message(&outgoing_msg, MAILBOX_SUBSCRIBE, MAILBOX_TOKEN_MAG, strlen(MAILBOX_TOKEN_MAG));
send_message(commhub_fd,&outgoing_msg);
prepare_message(&outgoing_msg, MAILBOX_SUBSCRIBE, MAILBOX_TOKEN_RFID, strlen(MAILBOX_TOKEN_RFID));
send_message(commhub_fd,&outgoing_msg);
prepare_message(&outgoing_msg, MAILBOX_SUBSCRIBE, MAILBOX_FLUSH_PASSES, strlen(MAILBOX_FLUSH_PASSES));
send_message(commhub_fd,&outgoing_msg);
prepare_message(&outgoing_msg, MAILBOX_SUBSCRIBE, MAILBOX_UPDATE_PASSES, strlen(MAILBOX_UPDATE_PASSES));
send_message(commhub_fd,&outgoing_msg);
prepare_message(&outgoing_msg, MAILBOX_SUBSCRIBE, MAILBOX_RULE_CALL, strlen(MAILBOX_RULE_CALL));
send_message(commhub_fd,&outgoing_msg);
// NEW
prepare_message(&outgoing_msg, MAILBOX_SUBSCRIBE, MAILBOX_PASSDB_CONSISTENCY, strlen(MAILBOX_PASSDB_CONSISTENCY));
send_message(commhub_fd,&outgoing_msg);
prepare_message(&outgoing_msg, MAILBOX_SUBSCRIBE, MAILBOX_PASSDB_PULSE, strlen(MAILBOX_PASSDB_PULSE));
send_message(commhub_fd,&outgoing_msg);
//Subscribe to the relevant status management mailboxes
subscribe_to_default_messages(commhub_fd);
//Request updated status information...
prepare_message(&outgoing_msg, MAILBOX_STATUS_REQUEST, "", 0);
send_message(commhub_fd,&outgoing_msg);
}
}
}
int main(int argc, char **argv)
{
struct pollfd fds[2];
int nfds = 0;
int poll_return = 0;
int read_return = 0;
int i;
struct message_record incoming_msg;
char input_line[LINE_BUFFER_SIZE] = {0};
int input_idx = 0;
int checked_idx = 0;
passdb_slim_context ctx = {0};
//------------------
if (argc > 1)
if (strncmp(argv[1], "-d", 4)==0)
g_debug_flag = 1;
if (g_debug_flag)
{
printf("# DEBUGGING on\n");
}
configure_signal_handlers(argv[0]);
maintain_ipc_hub_connect(argv[0]);
read_return = attach_to_passdb(&ctx);
if( read_return < 0 )
{
fprintf(stderr, "Database is missing or corrupt. Attempting to format new database.\n");
//read_return = format_new_passdb();
read_return = format_new_passdbs();
if( read_return < 0 )
{
fprintf(stderr, "Database cannot be created. Aborting!\n");
return -1;
}
else
{
read_return = attach_to_passdb(&ctx);
if( read_return < 0 )
{
fprintf(stderr, "New database is ALSO missing or corrupt. Aborting.\n");
return -1;
}
}
}
//Register our default keep-up-with-system status callbacks
register_system_status_callbacks();
//Add our module-specific callbacks
register_dispatch_callback(MAILBOX_UPDATE_PASSES, CALLBACK_USER(1), handle_update_passes_message, NULL);
register_dispatch_callback(MAILBOX_FLUSH_PASSES, CALLBACK_USER(2), handle_flush_passes_message, NULL);
register_dispatch_callback(MAILBOX_STATUS_REQUEST, CALLBACK_USER(3), handle_status_request_message, NULL);
register_dispatch_callback(MAILBOX_TOKEN_RFID, CALLBACK_USER(4), handle_token_rfid_message, &ctx);
register_dispatch_callback(MAILBOX_TOKEN_MAG, CALLBACK_USER(5), handle_token_mag_message, &ctx);
register_dispatch_callback(MAILBOX_RULE_CALL, CALLBACK_USER(6), handle_rule_call, &ctx);
//Handle status updates which require us to check expiration of the anti-passback cache
register_dispatch_callback(MAILBOX_GPS_STATUS, CALLBACK_USER(6), update_anti_passback_cache, NULL);
register_dispatch_callback(MAILBOX_STOP_STATUS, CALLBACK_USER(7), update_anti_passback_cache, NULL);
register_dispatch_callback(MAILBOX_PASSDB_CONSISTENCY, CALLBACK_USER(8), handle_consistency_check_callback, &ctx);
register_dispatch_callback(MAILBOX_PASSDB_PULSE, CALLBACK_USER(9), handle_pulse_callback, &ctx);
while( exit_request_status == EXIT_REQUEST_NONE )
{
time_t now = time(NULL);
RESET_WATCHDOG();
maintain_ipc_hub_connect(argv[0]);
if( (now - last_piu_update) > 0)
{
update_piu_status_message(0);
last_piu_update = now;
}
if(server_fd < 0) //If we don't have a connection to the sync server...
{
if( (now - last_sync_attempt) > DEFAULT_CONNECT_RETRY ) //See if it is time to try again
{
if( tunnel_is_up() ) //and if the tunnel thinks it is up
{
server_fd = connect_to_pass_server(); //if so, try again...
// printf("server_fd = %d\n", server_fd);
if(server_fd >= 0) //if it worked
{
input_idx = 0; //reset our buffer index
last_sync_attempt = 0;
zflush_cleanup();;
}
else
{
last_sync_attempt = now;
}
}
}
}
if(!rules_loaded())
{
load_rules(RULES_FILE);
}
if(rfid_pattern_loaded() <= 0)
{
load_rfid_decode_patterns(RFID_PATTERN_FILE);
}
if(hup_request_status)
{
hup_request_status = 0;
unload_rules();
load_rules(RULES_FILE);
load_rfid_decode_patterns(RFID_PATTERN_FILE);
}
nfds=0;
if(server_fd >= 0)
{
fds[nfds].fd = server_fd;
fds[nfds].events = POLLIN;
if( (now - last_sync_attempt) > DEFAULT_PASS_SYNC_RETRY )
{
// printf("We want to sync...\n");
fds[nfds].events |= POLLOUT;
}
nfds++;
}
if(commhub_fd >= 0)
{
fds[nfds].fd = commhub_fd;
fds[nfds].events = POLLIN;
nfds++;
}
if(nfds > 0)
{
poll_return = poll(fds, nfds, POLL_TIMEOUT);
}
else
{
usleep(POLL_TIMEOUT * 1000);
poll_return = 0;
}
if(poll_return <= 0)
{
continue;
}
for(i=0; i < nfds; i++)
{
if( fds[i].fd == server_fd )
{
if(fds[i].revents & POLLIN) //If we just got some input...
{
if(zflush_in_progress) //And we're in the middle of downloading ZFLUSH data
{
//Read it into our ZFLUSH buffer
read_return = recv(fds[i].fd, zflush_data + zflush_idx, zflush_data_size - zflush_idx, 0);
//If the socket has closed politely (0), or had an error other than EINTR...
if( (read_return == 0) || ((read_return < 0) && (errno != EINTR)) )
{
printf("Lost touch with server. This may be OK if we just finished downloading ZFLUSH data\n");
close(server_fd);
server_fd = -1;
break;
}
else
{
if(read_return > 0) //Otherwise, if we got some real data
{
zflush_idx += read_return; //advance our download pointer
//We want to let the update_pass_state() function compute our progress bar...
update_pass_state(FLUSH_STATUS_DOWNLOAD, 0);
update_piu_status_message(0);
last_sync_attempt = now; //remember our last sync time
if(zflush_idx == zflush_data_size) //if we've got the whole shebang...
{
//Flag that we're done downloading zflush blob and returning to command mode...
zflush_in_progress = 0;
// We don't have to do anything else about this, because the next thing we will hear
//from the server is ZFLUSHDONE which will cause us to apply our downloaded zflush data.
}
}
}
}
else //If we are not in the middle of a ZFLUSH
{
read_return = recv(fds[i].fd, input_line + input_idx, sizeof(input_line) - input_idx, 0);
//If the socket has closed politely (0), or had an error other than EINTR...
if( (read_return == 0) || ((read_return < 0) && (errno != EINTR)) )
{
printf("Lost touch with server.\n");
close(server_fd);
server_fd = -1;
break;
}
else
{
if(read_return > 0) //and we got some real data
{
input_idx += read_return; //advance our input index
do //loop through the data we've received...
{
if(zflush_in_progress) //if we have begun a ZFLUSH at some point,
{
if(input_idx >= zflush_data_size)
{
int j,k;
memcpy(zflush_data, input_line, zflush_data_size);
zflush_idx = zflush_data_size;
zflush_in_progress = 0;
k = input_idx - zflush_data_size;
for(j = 0; j < k; j++)
{
input_line[j] = input_line[j + zflush_data_size];
}
input_idx = j;
checked_idx = 0;
}
else
{
// Steal the rest of our input buffer and tack it on to the beginning of the zflush
//buffer. We do this because otherwise we'd have the beginning of the zflush data blob
//sitting around collecting dust in the line buffer and we'd download the rest into the
//zflush buffer and be unable to process it because the beginning would still be
//sitting around in this other buffer.
memcpy(zflush_data, input_line, input_idx);
zflush_idx = input_idx; //advance the zflush buffer index appropriately
checked_idx = input_idx = 0; //clear our line buffer index as it is now empty
last_sync_attempt = now; //mark our last sync time as now
//printf("Added %d leftover bytes to zflush_data\n", input_idx);
break; //break out of the do loop since we're done with our input buffer
}
}
//Advance until we either hit the end of the buffer, or we hit a line-terminator
while(checked_idx < input_idx)
{
if( (input_line[checked_idx] == '\r') || (input_line[checked_idx] == '\n') )
{
break;
}
else
{
checked_idx++;
}
}
//If we didn't hit the end of the input... (meaning we got a whole line...)
if(checked_idx != input_idx)
{
int j,k;
//Null terminate the line we got as a string...
input_line[checked_idx] = '\0';
//printf("Got Command \"%s\"\n", input_line);
// ------------ DEBUG -------------
//debug_print_file("/tmp/passdb.debug", "handling update request: '%s'\n", input_line);
// ------------ DEBUG -------------
if( handle_pass_update_request(input_line, &ctx, 1) < 0 )
{
// printf("Command Failed: \"%s\"\n", input_line);
// ------------ DEBUG -------------
//debug_print_file("/tmp/passdb.debug", " FAILED! '%s'\n", input_line);
// ------------ DEBUG -------------
}
else
{
real_pass_status.last_ack_time = now;
}
last_sync_attempt = now; //remember this server contact time
real_pass_status.last_sync_time = now; //add it to our passdb status
send_pass_state(0); //update other modules if this represents a change
// Now that we've done that, we can bump the rest of characters to the beginning
//of the next line...
k = input_idx - (checked_idx + 1);
//copy those characters the the beginning of the buffer...
for(j=0; j < k; j++)
{
input_line[j] = input_line[j + checked_idx + 1];
}
input_idx = j; //Move the index to point to the next free byte in the buffer
checked_idx = 0; // Move the 'checked' index to the beginning so that next time
//we start counting index 0 as the beginning of the next line when
//it is time to start scanning for EOL.
}
// If we have hit an overflow condition such that our buffer is full and no newline
//has been received, we want to hang up on the server and try again because something is
//seriously borked...
if(input_idx == sizeof(input_line))
{
// ------------ DEBUG -------------
//debug_print_file("/tmp/passdb.debug", " input overrun!\n");
// ------------ DEBUG -------------
fprintf(stderr, "Input overrun from server (line too long).\n");
close(server_fd);
server_fd = -1;
break;
}
} while(checked_idx < input_idx); // While we have unchecked data (that we have yet to check
//for containing an end-of-line.
}
}
}
}
else //Be sure we process all input before closing a remotely closed socket...
{
//If we've lost connection, break this loop and poll all over again
if(fds[i].revents & (POLLERR | POLLHUP | POLLNVAL))
{
printf("Lost touch with server.\n");
close(server_fd);
server_fd = -1;
break;
}
}
if(fds[i].revents & POLLOUT) //If we have flagged that we intend to send a query and poll says it's OK...
{
// printf("Trying to write to server...\n");
if(real_pass_status.flush_status == FLUSH_STATUS_NORMAL)
{
send_query_message(server_fd, &ctx);
real_pass_status.last_sync_time = now;
send_pass_state(0);
}
else
{
printf("Skipping query opportunity because we are mid-flush.\n");
}
//and then update our last sync attempt time
last_sync_attempt = now;
}
}
else if( fds[i].fd == commhub_fd )
{
//If we've lost connection, break this loop and poll all over again
if(fds[i].revents & (POLLERR | POLLHUP | POLLNVAL))
{
close(commhub_fd);
commhub_fd = -1;
break;
}
if(fds[i].revents & POLLIN)
{
// printf("Trying to read from hub...\n");
read_return = get_message(commhub_fd, &incoming_msg);
if( read_return < 0 )
{
close(commhub_fd);
commhub_fd = -1;
break;
}
// ------------ DEBUG -------------
//debug_print_file("/tmp/passdb.debug", "processing incoming_msg: '%s', '%s'\n",
// incoming_msg.header.mailbox_name, incoming_msg.payload);
// ------------ DEBUG -------------
process_message(&incoming_msg); //This passes the received message through the callback list
}
}
}
}
printf("Detatching from Pass Database\n");
detach_from_passdb(&ctx);
printf("Closing connections\n");
if(server_fd >= 0)
{
close(server_fd);
server_fd = -1;
}
if(commhub_fd >= 0)
{
close(commhub_fd);
server_fd = -1;
}
printf("Goodbye.\n");
return 0;
}