/* * Copyright (c) 2019 Clementine Computing LLC. * * This file is part of PopuFare. * * PopuFare is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * PopuFare is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with PopuFare. If not, see . * */ #include #include #include #include #include #include #include #include #include #include "../common/common_defs.h" #include "commhub.h" #define COMMHUB_POLL_TIME (10000) //10 seconds //#define COMMHUB_DEBUG int listener_socket = -1; //Our listening socket struct mailbox_record *mailboxes = NULL; //Our list of user mailboxes struct subscriber_record *wiretap_list = NULL; //Our list of clients subscribed for wiretap access struct subscriber_record *broadcast_list = NULL; //Our list of all clients not in wiretap mode (to prevent duplicate broadcasts) struct subscriber_record *master_list = NULL; //Our list of all clients (for fd/pid tracking) int num_clients = 0; //Our active client count struct pollfd fds[NUM_SUPPORTED_CLIENTS + 1]; //A buffer to hold the poll() file descriptor table int set_blocking_mode(int fd, int blocking) { int retval; int newflags; newflags = fcntl(fd, F_GETFL); if(newflags >= 0) { if(blocking) { newflags &= ~O_NONBLOCK; } else { newflags |= O_NONBLOCK; } retval = fcntl(fd, F_SETFL, newflags); if(retval) { printf("Cannot set O_NONBLOCK on socket %d to %s!\n", fd, blocking?"ON":"OFF"); return -1; } } else { printf("Cannot get flags on socket %d! [%d / %X]\n", fd, newflags, newflags); return -1; } return 0; } #ifdef COMMHUB_O_NONBLOCK #define DO_SEND_MESSAGE(ret, fd, msg) {set_blocking_mode((fd),0); ret = send_message((fd), (msg)); set_blocking_mode((fd),1);} #else #define DO_SEND_MESSAGE(ret, fd, msg) {ret = send_message((fd), (msg));} #endif void debug_traverse_single_list(struct subscriber_record *p) { while(p) { printf(" [FD: %d PID: %d Prog: %s]", p->clientfd, p->pid, p->progname); p = p->next; } printf("\n"); } //This function traverses the global state data structures and prints out a visual indication of state int debug_traverse() { struct mailbox_record *p; printf("-------------------------------------------------\n"); printf("ALL:"); debug_traverse_single_list(master_list); //iterate through the master list printf("WIRETAP:"); debug_traverse_single_list(wiretap_list); //iterate through the wiretap list printf("BROADCAST:"); debug_traverse_single_list(broadcast_list); //do the same for the broadcast list printf("---User Lists---\n"); p = mailboxes; //then iterate through each generic (user) mailbox while(p) { //display the mailbox name printf("%s:",p->mailbox_name); debug_traverse_single_list(p->clients); p = p->next; } printf("\n"); return 0; } void update_client_identifiers(struct subscriber_record *p, pid_t pid, char *progname) { if(!p) return; p->pid = pid; if(progname) //if we have a program name, store it { strncpy(p->progname, progname, MAX_MODULE_NAME_LENGTH); p->progname[MAX_MODULE_NAME_LENGTH - 1] = '\0'; } else //otherwise, set a blank string { memset(p->progname, '\0', MAX_MODULE_NAME_LENGTH); } } //This function adds a subscriber to a subscriber list (usually a mailbox or a 'special' list) //It is passed a file descriptor, optional PID, and a pointer to the listhead (so that we can insert at the head if we want) int add_client(int fd, pid_t pid, char *progname, struct subscriber_record **list) { struct subscriber_record *p, *q; q = NULL; p = *list; //iterate through all items while(p) { if(p->clientfd == fd) //the client is already on the list { //update the PID and progname anyway update_client_identifiers(p, pid, progname); return 0; //and return } q = p; //move our pointers along p = p->next; } //allocate a new subscriber_record structure p = (struct subscriber_record *) malloc(sizeof(struct subscriber_record)); if(p == NULL) return -1; //clear it and populate it memset(p,0,sizeof(struct subscriber_record)); p->clientfd = fd; p->next = NULL; update_client_identifiers(p, pid, progname); //set our pid and progname if(q) //if we're not at the list head { q->next = p; //insert at the end } else //if this is the list head (empty list) { *list = p; //replace the head with our new node. } return 0; } //This function deletes a client from a subscriber list (again we pass a pointer to the listhead). int del_client(int fd, struct subscriber_record **list) { struct subscriber_record *p, *q; q = NULL; p = *list; //iterate through our list while(p) { if(p->clientfd == fd) //if we find our client { if(q) //and we're not at the head of the list { q->next = p->next; //snip it out } else //otherwise { *list = p->next; //bump the listhead down one } free(p); //free the deleted node return 0; //and return } q=p; //advance our pointers p=p->next; } return -1; //client not found } //This function subscribes a client to a user mailbox. It is passed the client data (fd and pid) //as well as a mailbox name to search for. int subscribe_client(int fd, pid_t pid, char *progname, char *mailbox) { struct mailbox_record *p, *q; if(mailbox == NULL) { return -1; } q = NULL; p = mailboxes; //traverse the mailboxes list while(p) { if( !strncmp(mailbox, p->mailbox_name, MAILBOX_NAME_MAX) ) //if we found a match { return add_client(fd, pid, progname, &p->clients); //try and add the client } q = p; //advance our pointers p = p->next; } //if we have not found a mailbox, create one p = (struct mailbox_record *) malloc(sizeof(struct mailbox_record)); if(p == NULL) return -1; memset(p, 0, sizeof(struct mailbox_record)); //clear the memory strncpy(p->mailbox_name, mailbox, MAILBOX_NAME_MAX); //fill the mailbox name p->mailbox_name[MAILBOX_NAME_MAX] = '\0'; add_client(fd, pid, progname, &p->clients); //add our shiny new subscriber p->next = NULL; if(q) //if we're not adding to an empty list { q->next = p; //insert at the end } else //otherwise { mailboxes = p; //replace the listhead } return 0; } //This function unsubscriber a client from a user mailbox int unsubscribe_client(int fd, char *mailbox) { struct mailbox_record *p, *q; int retval; if(mailbox == NULL) { return -1; } q = NULL; p = mailboxes; //traverse the mailbox list while(p) { if( !strncmp(mailbox, p->mailbox_name, MAILBOX_NAME_MAX) ) //if we found a match { retval = del_client(fd, &p->clients); //try to unsubscribe if(p->clients == NULL) //if we just unsubscribed our last client, delete this list. { if(q) //if this is not the first list item { q->next = p->next; //snip it from the middle } else //otherwise { mailboxes = p->next; //snip from the beginning (tweaking the listhead) } free(p); //and free it } return retval; //return the result of the removal } q = p; //advance our pointers p = p->next; } return -1; //not found! } //This function scans the mailboxes to find the named mailbox, and if it is found //returns the client list associated with that mailbox. This is used in delivery. struct subscriber_record *find_mailbox(char *mailbox) { struct mailbox_record *p; if(mailbox == NULL) { return NULL; } p = mailboxes; //iterate through the mailbox list while(p) { if( !strncmp(mailbox, p->mailbox_name, MAILBOX_NAME_MAX) ) //if we found a match { return p->clients; //return the client list associated } p = p->next; //advance our pointer } return NULL; //not found } //This function walks all global state structures to purge all reference to a (disconnected) client int purge_client(int fd) { struct mailbox_record *p, *q; //remove the client from all system lists del_client(fd, &broadcast_list); del_client(fd, &wiretap_list); del_client(fd, &master_list); q = NULL; p = mailboxes; //then iterate through user mailboxes while(p) { del_client(fd, &p->clients); //removing the client when it is present if(p->clients == NULL) //if we just unsubscribed our last client on this list, delete the list. { struct mailbox_record *r = p; //save this pointer to delete after advancement if(q) //if we're deleting from the middle { q->next = p->next; //snip out } else //otherwise { mailboxes = p->next; //advance the listhead } p = p->next; //advance our pointer free(r); //free the deleted node continue; //continue with the advanced pointer } q = p; //advance our pointers p = p->next; } #ifdef COMMHUB_DEBUG printf("Purged client %d\n", fd); debug_traverse(); #endif return 0; } int create_listener(char *pathname) { int fd; struct sockaddr_un addr; int len; int retval; //create a UNIX domain socket of type SOCK_SEQPACKET fd = socket(AF_UNIX, SOCK_SEQPACKET, 0); if(fd < 0) return -1; //prepare our bind address structure addr.sun_family = AF_UNIX; strncpy(addr.sun_path, pathname, sizeof(addr.sun_path) - 1); addr.sun_path[sizeof(addr.sun_path) - 1]='\0'; //unlink any existing socket or file that lives there unlink(pathname); //calculate the address length len = strlen(pathname) + sizeof(addr.sun_family); //and perform the bind retval = bind(fd,(struct sockaddr *)&addr,len); if(retval) { close(fd); return -2; } //set the socket in listening mode... retval=listen(fd, 5); if(retval) { close(fd); return -3; } return fd; } //This function walks the master client list and rebuilds the file descriptor set used by poll() //to wait for input. int rebuild_client_fds() { struct subscriber_record *p = master_list; int i = 0; while(p) //iterate through our master list { if(i >= NUM_SUPPORTED_CLIENTS) //stopping if we have exceeded the space in the fds table. break; fds[i + 1].fd = p->clientfd; //set the fd for this table slot fds[i + 1].events = POLLIN; //specify that we're waiting for input i++; //count it p=p->next; //and advance our pointer } num_clients = i; //remember the number of clients we have so that we can reject new ones if they are return i; //in excess of our table size. } //This function accepts a new client on the listener_socket and either rejects it with an error (if our table is full) or //accepts it and places it in the master (and broadcast) lists. int accept_client() { int retval; int sendret = 0; struct message_record msg; //try to accept the new client retval = accept(listener_socket, NULL, NULL); //if we got a valid file descriptor if(retval >= 0) { if(num_clients >= NUM_SUPPORTED_CLIENTS) //if we are out of room { #ifdef COMMHUB_DEBUG printf("Cannot add client %d (MAXCLIENTS)\n", retval); #endif prepare_message(&msg,MAILBOX_ERROR,"MAXCLIENTS",10); //send an error DO_SEND_MESSAGE(sendret, retval, &msg); if (sendret < 0) { } close(retval); //close the socket return -1; //and return } //otherwise add it to our default lists (master, broadcast) with a bogus pid //which the client will (hopefully) correct with a HELLO message add_client(retval, -1, "", &master_list); add_client(retval, -1, "", &broadcast_list); #ifdef COMMHUB_DEBUG printf("Added Client %d\n", retval); debug_traverse(); #endif } else { if(errno != EINTR) perror("Accept client failed: "); } return 0; } //This helper function finds a client record by file descriptor struct subscriber_record *find_client_by_fd(struct subscriber_record *list, int fd) { struct subscriber_record *p = list; while(p) { if(p->clientfd == fd) break; p = p->next; } return p; } //This helper function delivers a message to every subscriber on the provided list. int deliver_message(struct message_record *msg, struct subscriber_record *dest) { struct subscriber_record *p = dest; int sendret; while(p) { DO_SEND_MESSAGE(sendret, p->clientfd, msg); if(sendret) { close(p->clientfd); } p = p->next; } return 0; } //This function is where the 'magic' happens. It picks out special messages (after first forwarding everything to the wiretap list) //Messages addressed to: // HELLO adds the client to the master and broadcast lists and updates its pid // >pid gets delivered to the client with the specified pid. // :modulename gets delivered to all clients registered with the supplied module name // SUBSCRIBE causes the sending client to be added to the named mailbox // UNSUBSCRIBE causes the sending client to be removed from the named mailbox // BROADCAST causes the message to be passed to all listeners // WIRETAP sets the wiretap state (on if payload = "ON" off otherwise) // PING replies with PONG // //All other messages are passed to their user mailbox and on to any subscribing clients. Messages with no receiver are dropped silently. // int route_message(int fd, struct message_record *msg) { struct subscriber_record dummy = {0}; struct subscriber_record *dest; int pid; int sendret; //Implement wiretap for debug if(wiretap_list) { deliver_message(msg, wiretap_list); } //Handle special mailboxes if( msg->header.mailbox_name[0] == '>' ) //-------------------------------------------- PID addressed message { pid = atoi( &msg->header.mailbox_name[1] ); //extract pid dest = master_list; //look it up in the master list while(dest) { if(dest->pid == pid) //if we have a match break; //stop looking dest = dest->next; //advance pointer } if(dest != NULL) { DO_SEND_MESSAGE(sendret, dest->clientfd, msg); //deliver the message if(sendret) { close(dest->clientfd); } } else { if(pid == getpid()) { //If this is a unicast PING to US... if( !strncmp((char *)msg->payload, MAILBOX_PING, MAILBOX_NAME_MAX) ) { struct message_record outgoing; prepare_message(&outgoing, MAILBOX_PONG, msg->payload, msg->header.payload_length); dest = find_mailbox(MAILBOX_PONG); if(dest) { deliver_message(&outgoing, dest); } deliver_message(&outgoing, wiretap_list); } } } return 0; } else if( msg->header.mailbox_name[0] == ':' ) //-------------------------------------------- module addressed message { dest = master_list; //start looking through the master list while(dest) { if(! strncmp(&msg->header.mailbox_name[1], dest->progname, MAX_MODULE_NAME_LENGTH) ) //if we have a match on module name { DO_SEND_MESSAGE(sendret, dest->clientfd, msg); //deliver the message (but keep looking) if(sendret) { close(dest->clientfd); } } dest = dest->next; //advance pointer } return 0; } else if( !strncmp(msg->header.mailbox_name, MAILBOX_HELLO, MAILBOX_NAME_MAX)) //------------------------- HELLO message { //a HELLO message sets the client pid, adds it to master and broadcast, and removes it from wiretap //the payload should contain the module name add_client(fd, msg->header.sender, (char *)msg->payload, &master_list); add_client(fd, msg->header.sender, (char *)msg->payload, &broadcast_list); del_client(fd, &wiretap_list); #ifdef COMMHUB_DEBUG printf("Associated client %d with PID %d (progname = %s)\n", fd, msg->header.sender, msg->payload); debug_traverse(); #endif return 0; //return } else if( !strncmp(msg->header.mailbox_name, MAILBOX_SUBSCRIBE, MAILBOX_NAME_MAX)) //------------------------- SUBSCRIBE message { //a SUBSCRIBE message subscribes the client to the specified mailbox dest = find_client_by_fd(master_list, fd); //look up this client in the master list to obtain progname if(!dest) //if this fd is not found (this should NEVER happen, but still...) avoid a null ponter by using a dummy record of all 0's { fprintf(stderr,"Request from client %d which appears not to be in our master list!\n", fd); dest = &dummy; } subscribe_client(fd, msg->header.sender, dest->progname, (char *)msg->payload); #ifdef COMMHUB_DEBUG printf("Added Client %d to %s\n", fd, msg->payload); debug_traverse(); #endif return 0; //return } else if( !strncmp(msg->header.mailbox_name, MAILBOX_UNSUBSCRIBE, MAILBOX_NAME_MAX)) //------------------------- UNSUBSCRIBE message { //an UNSUBSCRIBE message removes the client from the specified mailbox unsubscribe_client(fd, (char *)msg->payload); #ifdef COMMHUB_DEBUG printf("Removed Client %d from %s\n", fd, msg->payload); debug_traverse(); #endif return 0; //return } else if( !strncmp(msg->header.mailbox_name, MAILBOX_BROADCAST, MAILBOX_NAME_MAX)) //------------------------- BROADCAST message { //a BROADCAST message goes to everybody (except those who already got it through wiretap) deliver_message(msg, broadcast_list); return 0; } else if( !strncmp(msg->header.mailbox_name, MAILBOX_WIRETAP, MAILBOX_NAME_MAX)) //------------------------- WIRETAP message { //a WIRETAP message sets the wiretap mode. dest = find_client_by_fd(master_list, fd); //look up this client in the master list to obtain progname if(!dest) //if this fd is not found (this should NEVER happen, but still...) avoid a null ponter by using a dummy record of all 0's { fprintf(stderr,"Request from client %d which appears not to be in our master list!\n", fd); dest = &dummy; } if( !strncmp((char *)msg->payload, "ON", MAX_PAYLOAD_LENGTH) ) //if we are turning wiretap mode ON { add_client(fd, msg->header.sender, dest->progname, &wiretap_list); //add the client to the wiretap list del_client(fd, &broadcast_list); //and remove from the broadcast list #ifdef COMMHUB_DEBUG printf("Added Client %d to wiretap list\n", fd); debug_traverse(); #endif } else //if we are turning wiretap mode OFF { del_client(fd, &wiretap_list); //remove the client from the wiretap list add_client(fd, msg->header.sender, dest->progname, &broadcast_list);//and add to the broadcast list #ifdef COMMHUB_DEBUG printf("Removed Client %d from wiretap list\n", fd); debug_traverse(); #endif } return 0; } else if( !strncmp(msg->header.mailbox_name, MAILBOX_PING, MAILBOX_NAME_MAX)) //Even we need to respond to a PING message { struct message_record outgoing; prepare_message(&outgoing, MAILBOX_PONG, msg->payload, msg->header.payload_length); dest = find_mailbox(MAILBOX_PONG); if(dest) { deliver_message(&outgoing, dest); } deliver_message(&outgoing, wiretap_list); } //-------------------------------------------------------------------------------Normal delivery to user mailboxes... dest = find_mailbox(msg->header.mailbox_name); //look up the mailbox client list if(dest) //if it is non-NULL { deliver_message(msg, dest); //deliver the message to those clients } return 0; } //this function frees our data structures and closes our file handles politely before exit int cleanup() { struct mailbox_record *p, *q; struct subscriber_record *pp, *qq; q = NULL; p = mailboxes; //walk the list of user mailboxes while(p) { q = p; p = p->next; //advance the pointers qq = NULL; pp = q->clients; //walk the just-visited user mailbox while(pp) //and free all the subscriber records { qq = pp; pp = pp->next; free(qq); } free(q); //free the node } //walk the wiretap list freeing all nodes qq = NULL; pp = wiretap_list; while(pp) { qq = pp; pp = pp->next; free(qq); } //walk the broadcast list freeing all nodes qq = NULL; pp = broadcast_list; while(pp) { qq = pp; pp = pp->next; free(qq); } //walk the master list freeing all nodes and closing the file descriptors qq = NULL; pp = master_list; while(pp) { qq = pp; pp = pp->next; close(qq->clientfd); free(qq); } //NULL our all of our pointers mailboxes = NULL; broadcast_list = NULL; wiretap_list = NULL; master_list = NULL; close(listener_socket); num_clients = 0; listener_socket = -1; return 0; } //which poll revents flags signal that we need to close and clean up a socket... #define CLOSE_CONDITION (POLLERR | POLLNVAL | POLLHUP) int main(int argc, char **argv) { int i; int retval; int rebuild; struct message_record msg; long long int _usec_now, _usec_prv, _usec_del; _usec_del = 60000000; _usec_now = get_usec_time(); _usec_prv = _usec_now; //Install our signal handlers and watchdog configure_signal_handlers(argv[0]); //create our listening socket listener_socket = create_listener(COMMHUB_ADDRESS); //return an error if we can't if(listener_socket < 0) { printf("Can't create listener socket: %d\n", listener_socket); return -1; } //set our listener socket up in the poll data structure fds[0].fd = listener_socket; fds[0].events = POLLIN; //do the same for our (blank) client list rebuild_client_fds(); rebuild = 0; //while we have not yet received a signal telling us to stop while( exit_request_status == EXIT_REQUEST_NONE ) { //DEBUG _usec_now = get_usec_time(); if ((_usec_now - _usec_prv) > _usec_del) { printf("[%lli] ipc_server: heartbeat\n", get_usec_time()); _usec_prv = _usec_now; } //DEBUG RESET_WATCHDOG(); if(rebuild) //if we have been flagged to rebuild our client fd list { rebuild_client_fds(); //do so, and clear the flag rebuild = 0; } //poll for any I/O events that we need to service retval = poll(fds, num_clients + 1, COMMHUB_POLL_TIME); //if poll returned an error if(retval < 0) { if(errno == EINTR) //and it was just a signal interruption { continue; //continue } else //otherwise complain bitterly { perror("commserver: Poll failed:"); break; } } else if(retval == 0) //if poll returns 0, that means a timeout with no events { continue; //so go back and wait some more... } //if we get a new client if(fds[0].revents & POLLIN) { accept_client(); //accept the connection rebuild = 1; //flag a rebuild of the poll list continue; //and poll again } if(fds[0].revents & CLOSE_CONDITION) //if our server socket fails, complain { fprintf(stderr, "commserver: Server socket failed: revents = %d\n", fds[0].revents); break; } for(i=1; i <= num_clients; i++) //for each connected client { if(fds[i].revents & POLLIN) //check for input events... { retval = get_message(fds[i].fd, &msg); //if we have one, try and get the message if(retval == -1) //if that fails { purge_client(fds[i].fd); //that means the client has disconnected, so we purge them close(fds[i].fd); //close the socket rebuild = 1; //and flag a rebuild } else //if it worked { //DEBUG printf("[%lli] %s (fd:%i,pid:%i,n:%i)\n", (long long int)msg.header.usec_time, msg.header.mailbox_name, (int)msg.header.from_fd, (int)msg.header.sender, (int)msg.header.payload_length); //DEBUG route_message(fds[i].fd, &msg); //we route the message and keep on going } } if(fds[i].revents & CLOSE_CONDITION) //if we have an error condition on this socket { purge_client(fds[i].fd); //do the purge, close, and rebuild drill close(fds[i].fd); rebuild = 1; } } } cleanup(); //clean up our resources return 0; }