/*
* Copyright (c) 2019 Clementine Computing LLC.
*
* This file is part of PopuFare.
*
* PopuFare is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* PopuFare is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with PopuFare. If not, see .
*
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "../common/common_defs.h"
#include "commhub.h"
#define COMMHUB_POLL_TIME (10000) //10 seconds
//#define COMMHUB_DEBUG
int listener_socket = -1; //Our listening socket
struct mailbox_record *mailboxes = NULL; //Our list of user mailboxes
struct subscriber_record *wiretap_list = NULL; //Our list of clients subscribed for wiretap access
struct subscriber_record *broadcast_list = NULL; //Our list of all clients not in wiretap mode (to prevent duplicate broadcasts)
struct subscriber_record *master_list = NULL; //Our list of all clients (for fd/pid tracking)
int num_clients = 0; //Our active client count
struct pollfd fds[NUM_SUPPORTED_CLIENTS + 1]; //A buffer to hold the poll() file descriptor table
int set_blocking_mode(int fd, int blocking)
{
int retval;
int newflags;
newflags = fcntl(fd, F_GETFL);
if(newflags >= 0)
{
if(blocking)
{
newflags &= ~O_NONBLOCK;
}
else
{
newflags |= O_NONBLOCK;
}
retval = fcntl(fd, F_SETFL, newflags);
if(retval)
{
printf("Cannot set O_NONBLOCK on socket %d to %s!\n", fd, blocking?"ON":"OFF");
return -1;
}
}
else
{
printf("Cannot get flags on socket %d! [%d / %X]\n", fd, newflags, newflags);
return -1;
}
return 0;
}
#ifdef COMMHUB_O_NONBLOCK
#define DO_SEND_MESSAGE(ret, fd, msg) {set_blocking_mode((fd),0); ret = send_message((fd), (msg)); set_blocking_mode((fd),1);}
#else
#define DO_SEND_MESSAGE(ret, fd, msg) {ret = send_message((fd), (msg));}
#endif
void debug_traverse_single_list(struct subscriber_record *p)
{
while(p)
{
printf(" [FD: %d PID: %d Prog: %s]", p->clientfd, p->pid, p->progname);
p = p->next;
}
printf("\n");
}
//This function traverses the global state data structures and prints out a visual indication of state
int debug_traverse()
{
struct mailbox_record *p;
printf("-------------------------------------------------\n");
printf("ALL:");
debug_traverse_single_list(master_list); //iterate through the master list
printf("WIRETAP:");
debug_traverse_single_list(wiretap_list); //iterate through the wiretap list
printf("BROADCAST:");
debug_traverse_single_list(broadcast_list); //do the same for the broadcast list
printf("---User Lists---\n");
p = mailboxes; //then iterate through each generic (user) mailbox
while(p)
{ //display the mailbox name
printf("%s:",p->mailbox_name);
debug_traverse_single_list(p->clients);
p = p->next;
}
printf("\n");
return 0;
}
void update_client_identifiers(struct subscriber_record *p, pid_t pid, char *progname)
{
if(!p)
return;
p->pid = pid;
if(progname) //if we have a program name, store it
{
strncpy(p->progname, progname, MAX_MODULE_NAME_LENGTH);
p->progname[MAX_MODULE_NAME_LENGTH - 1] = '\0';
}
else //otherwise, set a blank string
{
memset(p->progname, '\0', MAX_MODULE_NAME_LENGTH);
}
}
//This function adds a subscriber to a subscriber list (usually a mailbox or a 'special' list)
//It is passed a file descriptor, optional PID, and a pointer to the listhead (so that we can insert at the head if we want)
int add_client(int fd, pid_t pid, char *progname, struct subscriber_record **list)
{
struct subscriber_record *p, *q;
q = NULL;
p = *list; //iterate through all items
while(p)
{
if(p->clientfd == fd) //the client is already on the list
{
//update the PID and progname anyway
update_client_identifiers(p, pid, progname);
return 0; //and return
}
q = p; //move our pointers along
p = p->next;
}
//allocate a new subscriber_record structure
p = (struct subscriber_record *) malloc(sizeof(struct subscriber_record));
if(p == NULL)
return -1;
//clear it and populate it
memset(p,0,sizeof(struct subscriber_record));
p->clientfd = fd;
p->next = NULL;
update_client_identifiers(p, pid, progname); //set our pid and progname
if(q) //if we're not at the list head
{
q->next = p; //insert at the end
}
else //if this is the list head (empty list)
{
*list = p; //replace the head with our new node.
}
return 0;
}
//This function deletes a client from a subscriber list (again we pass a pointer to the listhead).
int del_client(int fd, struct subscriber_record **list)
{
struct subscriber_record *p, *q;
q = NULL;
p = *list; //iterate through our list
while(p)
{
if(p->clientfd == fd) //if we find our client
{
if(q) //and we're not at the head of the list
{
q->next = p->next; //snip it out
}
else //otherwise
{
*list = p->next; //bump the listhead down one
}
free(p); //free the deleted node
return 0; //and return
}
q=p; //advance our pointers
p=p->next;
}
return -1; //client not found
}
//This function subscribes a client to a user mailbox. It is passed the client data (fd and pid)
//as well as a mailbox name to search for.
int subscribe_client(int fd, pid_t pid, char *progname, char *mailbox)
{
struct mailbox_record *p, *q;
if(mailbox == NULL)
{
return -1;
}
q = NULL;
p = mailboxes; //traverse the mailboxes list
while(p)
{
if( !strncmp(mailbox, p->mailbox_name, MAILBOX_NAME_MAX) ) //if we found a match
{
return add_client(fd, pid, progname, &p->clients); //try and add the client
}
q = p; //advance our pointers
p = p->next;
}
//if we have not found a mailbox, create one
p = (struct mailbox_record *) malloc(sizeof(struct mailbox_record));
if(p == NULL)
return -1;
memset(p, 0, sizeof(struct mailbox_record)); //clear the memory
strncpy(p->mailbox_name, mailbox, MAILBOX_NAME_MAX); //fill the mailbox name
p->mailbox_name[MAILBOX_NAME_MAX] = '\0';
add_client(fd, pid, progname, &p->clients); //add our shiny new subscriber
p->next = NULL;
if(q) //if we're not adding to an empty list
{
q->next = p; //insert at the end
}
else //otherwise
{
mailboxes = p; //replace the listhead
}
return 0;
}
//This function unsubscriber a client from a user mailbox
int unsubscribe_client(int fd, char *mailbox)
{
struct mailbox_record *p, *q;
int retval;
if(mailbox == NULL)
{
return -1;
}
q = NULL;
p = mailboxes; //traverse the mailbox list
while(p)
{
if( !strncmp(mailbox, p->mailbox_name, MAILBOX_NAME_MAX) ) //if we found a match
{
retval = del_client(fd, &p->clients); //try to unsubscribe
if(p->clients == NULL) //if we just unsubscribed our last client, delete this list.
{
if(q) //if this is not the first list item
{
q->next = p->next; //snip it from the middle
}
else //otherwise
{
mailboxes = p->next; //snip from the beginning (tweaking the listhead)
}
free(p); //and free it
}
return retval; //return the result of the removal
}
q = p; //advance our pointers
p = p->next;
}
return -1; //not found!
}
//This function scans the mailboxes to find the named mailbox, and if it is found
//returns the client list associated with that mailbox. This is used in delivery.
struct subscriber_record *find_mailbox(char *mailbox)
{
struct mailbox_record *p;
if(mailbox == NULL)
{
return NULL;
}
p = mailboxes; //iterate through the mailbox list
while(p)
{
if( !strncmp(mailbox, p->mailbox_name, MAILBOX_NAME_MAX) ) //if we found a match
{
return p->clients; //return the client list associated
}
p = p->next; //advance our pointer
}
return NULL; //not found
}
//This function walks all global state structures to purge all reference to a (disconnected) client
int purge_client(int fd)
{
struct mailbox_record *p, *q;
//remove the client from all system lists
del_client(fd, &broadcast_list);
del_client(fd, &wiretap_list);
del_client(fd, &master_list);
q = NULL;
p = mailboxes; //then iterate through user mailboxes
while(p)
{
del_client(fd, &p->clients); //removing the client when it is present
if(p->clients == NULL) //if we just unsubscribed our last client on this list, delete the list.
{
struct mailbox_record *r = p; //save this pointer to delete after advancement
if(q) //if we're deleting from the middle
{
q->next = p->next; //snip out
}
else //otherwise
{
mailboxes = p->next; //advance the listhead
}
p = p->next; //advance our pointer
free(r); //free the deleted node
continue; //continue with the advanced pointer
}
q = p; //advance our pointers
p = p->next;
}
#ifdef COMMHUB_DEBUG
printf("Purged client %d\n", fd);
debug_traverse();
#endif
return 0;
}
int create_listener(char *pathname)
{
int fd;
struct sockaddr_un addr;
int len;
int retval;
//create a UNIX domain socket of type SOCK_SEQPACKET
fd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
if(fd < 0)
return -1;
//prepare our bind address structure
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, pathname, sizeof(addr.sun_path) - 1);
addr.sun_path[sizeof(addr.sun_path) - 1]='\0';
//unlink any existing socket or file that lives there
unlink(pathname);
//calculate the address length
len = strlen(pathname) + sizeof(addr.sun_family);
//and perform the bind
retval = bind(fd,(struct sockaddr *)&addr,len);
if(retval)
{
close(fd);
return -2;
}
//set the socket in listening mode...
retval=listen(fd, 5);
if(retval)
{
close(fd);
return -3;
}
return fd;
}
//This function walks the master client list and rebuilds the file descriptor set used by poll()
//to wait for input.
int rebuild_client_fds()
{
struct subscriber_record *p = master_list;
int i = 0;
while(p) //iterate through our master list
{
if(i >= NUM_SUPPORTED_CLIENTS) //stopping if we have exceeded the space in the fds table.
break;
fds[i + 1].fd = p->clientfd; //set the fd for this table slot
fds[i + 1].events = POLLIN; //specify that we're waiting for input
i++; //count it
p=p->next; //and advance our pointer
}
num_clients = i; //remember the number of clients we have so that we can reject new ones if they are
return i; //in excess of our table size.
}
//This function accepts a new client on the listener_socket and either rejects it with an error (if our table is full) or
//accepts it and places it in the master (and broadcast) lists.
int accept_client()
{
int retval;
int sendret = 0;
struct message_record msg;
//try to accept the new client
retval = accept(listener_socket, NULL, NULL);
//if we got a valid file descriptor
if(retval >= 0)
{
if(num_clients >= NUM_SUPPORTED_CLIENTS) //if we are out of room
{
#ifdef COMMHUB_DEBUG
printf("Cannot add client %d (MAXCLIENTS)\n", retval);
#endif
prepare_message(&msg,MAILBOX_ERROR,"MAXCLIENTS",10); //send an error
DO_SEND_MESSAGE(sendret, retval, &msg);
if (sendret < 0) { }
close(retval); //close the socket
return -1; //and return
}
//otherwise add it to our default lists (master, broadcast) with a bogus pid
//which the client will (hopefully) correct with a HELLO message
add_client(retval, -1, "", &master_list);
add_client(retval, -1, "", &broadcast_list);
#ifdef COMMHUB_DEBUG
printf("Added Client %d\n", retval);
debug_traverse();
#endif
}
else
{
if(errno != EINTR)
perror("Accept client failed: ");
}
return 0;
}
//This helper function finds a client record by file descriptor
struct subscriber_record *find_client_by_fd(struct subscriber_record *list, int fd)
{
struct subscriber_record *p = list;
while(p)
{
if(p->clientfd == fd)
break;
p = p->next;
}
return p;
}
//This helper function delivers a message to every subscriber on the provided list.
int deliver_message(struct message_record *msg, struct subscriber_record *dest)
{
struct subscriber_record *p = dest;
int sendret;
while(p)
{
DO_SEND_MESSAGE(sendret, p->clientfd, msg);
if(sendret)
{
close(p->clientfd);
}
p = p->next;
}
return 0;
}
//This function is where the 'magic' happens. It picks out special messages (after first forwarding everything to the wiretap list)
//Messages addressed to:
// HELLO adds the client to the master and broadcast lists and updates its pid
// >pid gets delivered to the client with the specified pid.
// :modulename gets delivered to all clients registered with the supplied module name
// SUBSCRIBE causes the sending client to be added to the named mailbox
// UNSUBSCRIBE causes the sending client to be removed from the named mailbox
// BROADCAST causes the message to be passed to all listeners
// WIRETAP sets the wiretap state (on if payload = "ON" off otherwise)
// PING replies with PONG
//
//All other messages are passed to their user mailbox and on to any subscribing clients. Messages with no receiver are dropped silently.
//
int route_message(int fd, struct message_record *msg)
{
struct subscriber_record dummy = {0};
struct subscriber_record *dest;
int pid;
int sendret;
//Implement wiretap for debug
if(wiretap_list)
{
deliver_message(msg, wiretap_list);
}
//Handle special mailboxes
if( msg->header.mailbox_name[0] == '>' ) //-------------------------------------------- PID addressed message
{
pid = atoi( &msg->header.mailbox_name[1] ); //extract pid
dest = master_list; //look it up in the master list
while(dest)
{
if(dest->pid == pid) //if we have a match
break; //stop looking
dest = dest->next; //advance pointer
}
if(dest != NULL)
{
DO_SEND_MESSAGE(sendret, dest->clientfd, msg); //deliver the message
if(sendret)
{
close(dest->clientfd);
}
}
else
{
if(pid == getpid())
{
//If this is a unicast PING to US...
if( !strncmp((char *)msg->payload, MAILBOX_PING, MAILBOX_NAME_MAX) )
{
struct message_record outgoing;
prepare_message(&outgoing, MAILBOX_PONG, msg->payload, msg->header.payload_length);
dest = find_mailbox(MAILBOX_PONG);
if(dest)
{
deliver_message(&outgoing, dest);
}
deliver_message(&outgoing, wiretap_list);
}
}
}
return 0;
}
else if( msg->header.mailbox_name[0] == ':' ) //-------------------------------------------- module addressed message
{
dest = master_list; //start looking through the master list
while(dest)
{
if(! strncmp(&msg->header.mailbox_name[1], dest->progname, MAX_MODULE_NAME_LENGTH) ) //if we have a match on module name
{
DO_SEND_MESSAGE(sendret, dest->clientfd, msg); //deliver the message (but keep looking)
if(sendret)
{
close(dest->clientfd);
}
}
dest = dest->next; //advance pointer
}
return 0;
}
else if( !strncmp(msg->header.mailbox_name, MAILBOX_HELLO, MAILBOX_NAME_MAX)) //------------------------- HELLO message
{
//a HELLO message sets the client pid, adds it to master and broadcast, and removes it from wiretap
//the payload should contain the module name
add_client(fd, msg->header.sender, (char *)msg->payload, &master_list);
add_client(fd, msg->header.sender, (char *)msg->payload, &broadcast_list);
del_client(fd, &wiretap_list);
#ifdef COMMHUB_DEBUG
printf("Associated client %d with PID %d (progname = %s)\n", fd, msg->header.sender, msg->payload);
debug_traverse();
#endif
return 0; //return
}
else if( !strncmp(msg->header.mailbox_name, MAILBOX_SUBSCRIBE, MAILBOX_NAME_MAX)) //------------------------- SUBSCRIBE message
{
//a SUBSCRIBE message subscribes the client to the specified mailbox
dest = find_client_by_fd(master_list, fd); //look up this client in the master list to obtain progname
if(!dest) //if this fd is not found (this should NEVER happen, but still...) avoid a null ponter by using a dummy record of all 0's
{
fprintf(stderr,"Request from client %d which appears not to be in our master list!\n", fd);
dest = &dummy;
}
subscribe_client(fd, msg->header.sender, dest->progname, (char *)msg->payload);
#ifdef COMMHUB_DEBUG
printf("Added Client %d to %s\n", fd, msg->payload);
debug_traverse();
#endif
return 0; //return
}
else if( !strncmp(msg->header.mailbox_name, MAILBOX_UNSUBSCRIBE, MAILBOX_NAME_MAX)) //------------------------- UNSUBSCRIBE message
{
//an UNSUBSCRIBE message removes the client from the specified mailbox
unsubscribe_client(fd, (char *)msg->payload);
#ifdef COMMHUB_DEBUG
printf("Removed Client %d from %s\n", fd, msg->payload);
debug_traverse();
#endif
return 0; //return
}
else if( !strncmp(msg->header.mailbox_name, MAILBOX_BROADCAST, MAILBOX_NAME_MAX)) //------------------------- BROADCAST message
{
//a BROADCAST message goes to everybody (except those who already got it through wiretap)
deliver_message(msg, broadcast_list);
return 0;
}
else if( !strncmp(msg->header.mailbox_name, MAILBOX_WIRETAP, MAILBOX_NAME_MAX)) //------------------------- WIRETAP message
{
//a WIRETAP message sets the wiretap mode.
dest = find_client_by_fd(master_list, fd); //look up this client in the master list to obtain progname
if(!dest) //if this fd is not found (this should NEVER happen, but still...) avoid a null ponter by using a dummy record of all 0's
{
fprintf(stderr,"Request from client %d which appears not to be in our master list!\n", fd);
dest = &dummy;
}
if( !strncmp((char *)msg->payload, "ON", MAX_PAYLOAD_LENGTH) ) //if we are turning wiretap mode ON
{
add_client(fd, msg->header.sender, dest->progname, &wiretap_list); //add the client to the wiretap list
del_client(fd, &broadcast_list); //and remove from the broadcast list
#ifdef COMMHUB_DEBUG
printf("Added Client %d to wiretap list\n", fd);
debug_traverse();
#endif
}
else //if we are turning wiretap mode OFF
{
del_client(fd, &wiretap_list); //remove the client from the wiretap list
add_client(fd, msg->header.sender, dest->progname, &broadcast_list);//and add to the broadcast list
#ifdef COMMHUB_DEBUG
printf("Removed Client %d from wiretap list\n", fd);
debug_traverse();
#endif
}
return 0;
}
else if( !strncmp(msg->header.mailbox_name, MAILBOX_PING, MAILBOX_NAME_MAX)) //Even we need to respond to a PING message
{
struct message_record outgoing;
prepare_message(&outgoing, MAILBOX_PONG, msg->payload, msg->header.payload_length);
dest = find_mailbox(MAILBOX_PONG);
if(dest)
{
deliver_message(&outgoing, dest);
}
deliver_message(&outgoing, wiretap_list);
}
//-------------------------------------------------------------------------------Normal delivery to user mailboxes...
dest = find_mailbox(msg->header.mailbox_name); //look up the mailbox client list
if(dest) //if it is non-NULL
{
deliver_message(msg, dest); //deliver the message to those clients
}
return 0;
}
//this function frees our data structures and closes our file handles politely before exit
int cleanup()
{
struct mailbox_record *p, *q;
struct subscriber_record *pp, *qq;
q = NULL;
p = mailboxes; //walk the list of user mailboxes
while(p)
{
q = p;
p = p->next; //advance the pointers
qq = NULL;
pp = q->clients; //walk the just-visited user mailbox
while(pp) //and free all the subscriber records
{
qq = pp;
pp = pp->next;
free(qq);
}
free(q); //free the node
}
//walk the wiretap list freeing all nodes
qq = NULL;
pp = wiretap_list;
while(pp)
{
qq = pp;
pp = pp->next;
free(qq);
}
//walk the broadcast list freeing all nodes
qq = NULL;
pp = broadcast_list;
while(pp)
{
qq = pp;
pp = pp->next;
free(qq);
}
//walk the master list freeing all nodes and closing the file descriptors
qq = NULL;
pp = master_list;
while(pp)
{
qq = pp;
pp = pp->next;
close(qq->clientfd);
free(qq);
}
//NULL our all of our pointers
mailboxes = NULL;
broadcast_list = NULL;
wiretap_list = NULL;
master_list = NULL;
close(listener_socket);
num_clients = 0;
listener_socket = -1;
return 0;
}
//which poll revents flags signal that we need to close and clean up a socket...
#define CLOSE_CONDITION (POLLERR | POLLNVAL | POLLHUP)
int main(int argc, char **argv)
{
int i;
int retval;
int rebuild;
struct message_record msg;
long long int _usec_now, _usec_prv, _usec_del;
_usec_del = 60000000;
_usec_now = get_usec_time();
_usec_prv = _usec_now;
//Install our signal handlers and watchdog
configure_signal_handlers(argv[0]);
//create our listening socket
listener_socket = create_listener(COMMHUB_ADDRESS);
//return an error if we can't
if(listener_socket < 0)
{
printf("Can't create listener socket: %d\n", listener_socket);
return -1;
}
//set our listener socket up in the poll data structure
fds[0].fd = listener_socket;
fds[0].events = POLLIN;
//do the same for our (blank) client list
rebuild_client_fds();
rebuild = 0;
//while we have not yet received a signal telling us to stop
while( exit_request_status == EXIT_REQUEST_NONE )
{
//DEBUG
_usec_now = get_usec_time();
if ((_usec_now - _usec_prv) > _usec_del) {
printf("[%lli] ipc_server: heartbeat\n", get_usec_time());
_usec_prv = _usec_now;
}
//DEBUG
RESET_WATCHDOG();
if(rebuild) //if we have been flagged to rebuild our client fd list
{
rebuild_client_fds(); //do so, and clear the flag
rebuild = 0;
}
//poll for any I/O events that we need to service
retval = poll(fds, num_clients + 1, COMMHUB_POLL_TIME);
//if poll returned an error
if(retval < 0)
{
if(errno == EINTR) //and it was just a signal interruption
{
continue; //continue
}
else //otherwise complain bitterly
{
perror("commserver: Poll failed:");
break;
}
}
else if(retval == 0) //if poll returns 0, that means a timeout with no events
{
continue; //so go back and wait some more...
}
//if we get a new client
if(fds[0].revents & POLLIN)
{
accept_client(); //accept the connection
rebuild = 1; //flag a rebuild of the poll list
continue; //and poll again
}
if(fds[0].revents & CLOSE_CONDITION) //if our server socket fails, complain
{
fprintf(stderr, "commserver: Server socket failed: revents = %d\n", fds[0].revents);
break;
}
for(i=1; i <= num_clients; i++) //for each connected client
{
if(fds[i].revents & POLLIN) //check for input events...
{
retval = get_message(fds[i].fd, &msg); //if we have one, try and get the message
if(retval == -1) //if that fails
{
purge_client(fds[i].fd); //that means the client has disconnected, so we purge them
close(fds[i].fd); //close the socket
rebuild = 1; //and flag a rebuild
}
else //if it worked
{
//DEBUG
printf("[%lli] %s (fd:%i,pid:%i,n:%i)\n",
(long long int)msg.header.usec_time,
msg.header.mailbox_name,
(int)msg.header.from_fd, (int)msg.header.sender, (int)msg.header.payload_length);
//DEBUG
route_message(fds[i].fd, &msg); //we route the message and keep on going
}
}
if(fds[i].revents & CLOSE_CONDITION) //if we have an error condition on this socket
{
purge_client(fds[i].fd); //do the purge, close, and rebuild drill
close(fds[i].fd);
rebuild = 1;
}
}
}
cleanup(); //clean up our resources
return 0;
}