From d5c873a780357928fcccb27f4378b456f0e45dd2 Mon Sep 17 00:00:00 2001 From: turret Date: Wed, 17 Jan 2024 19:13:16 +0000 Subject: net: basic ws handler includes heartbeat mechanism --- net/net.c | 114 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ net/ws.c | 61 +++++++++++++++++++++++++++++++++ 2 files changed, 175 insertions(+) create mode 100644 net/ws.c (limited to 'net') diff --git a/net/net.c b/net/net.c index 332aef3..bad34c6 100644 --- a/net/net.c +++ b/net/net.c @@ -1,5 +1,11 @@ +#include +#include +#include +#include +#include #include #include +#include #include #include @@ -10,12 +16,120 @@ #include extern int http_get(char *url); +extern void ws_handle_event(cJSON *event); +extern void ws_send_heartbeat(); +CURL *ws_handle; char *gateway_url; int net_subsystem(void) { print(LOG_INFO "net: starting net subsystem"); + /* Set handler for heartbeats */ + /* + struct sigaction *alrmhandler = malloc(sizeof(struct sigaction)); + memset(alrmhandler, 0, sizeof(struct sigaction)); + alrmhandler->sa_handler = &ws_send_heartbeat; + alrmhandler->sa_flags |= SA_RESTART; + sigaction(SIGALRM, alrmhandler, NULL); + free(alrmhandler); + */ + + if(!gateway_url) + panic("net: gateway url invalid"); + + ws_handle = curl_easy_init(); + + curl_easy_setopt(ws_handle, CURLOPT_URL, gateway_url); + curl_easy_setopt(ws_handle, CURLOPT_CONNECT_ONLY, 2L); + + print(LOG_INFO "net: opening ws"); + CURLcode ret = curl_easy_perform(ws_handle); + + if(ret > 0) + panic("net: cannot open websocket (curl errno %d)", ret); + + int ws_sockfd; + if((ret = curl_easy_getinfo(ws_handle, + CURLINFO_ACTIVESOCKET, &ws_sockfd)) != CURLE_OK) + panic("net: curl cannot get active socket (errno %d)", ret); + +/* struct pollfd ws_sockpoll = { + .fd = ws_sockfd, + .events = POLLIN + }; */ + char *inbuf = malloc(1<<16 * sizeof(char)); + size_t rlen; + const struct curl_ws_frame *meta; + + /* Block ALRM */ + sigset_t *set = malloc(sizeof(sigset_t)); + sigaddset(set, SIGALRM); + sigprocmask(SIG_BLOCK, set, NULL); + int alrmfd = signalfd(-1, set, 0); + free(set); + + struct pollfd pollarray[2] = { + { + .fd = ws_sockfd, + .events = POLLIN, + .revents = POLLIN + }, + { + .fd = alrmfd, + .events = POLLIN, + .revents = 0 + } + }; + + struct pollfd *sockpoll = &(pollarray[0]); + struct pollfd *alrmpoll = &(pollarray[1]); + + errno = 0; + do { + if((sockpoll->revents & POLLIN) == POLLIN) { + ret = curl_ws_recv(ws_handle, inbuf, 1<<16, &rlen, &meta); + if(ret == CURLE_AGAIN) + continue; + if(ret != CURLE_OK) { + print(LOG_ERR "net: encountered curl error while reading socket (curl errno %d)", ret); + break; + } + + /* TODO: partial frames */ + if((meta->offset | meta->bytesleft) > 0) { + print(LOG_ERR "net: dropped partial frame"); + continue; + } + + cJSON *event = cJSON_ParseWithLength(inbuf, rlen); + if(!event) { + print(LOG_ERR "net: dropped malformed frame"); + continue; + } + ws_handle_event(event); + cJSON_Delete(event); + } else if((sockpoll->revents & (POLLRDHUP | POLLERR | POLLHUP | POLLNVAL)) > 0) { + print(LOG_ERR "net: encountered error on socket (revents %d)", sockpoll->revents); + break; + } + + if((alrmpoll->revents & POLLIN) == POLLIN) { + struct signalfd_siginfo siginfo; + read(alrmfd, &siginfo, sizeof(struct signalfd_siginfo)); + ws_send_heartbeat(); + } + } while(poll(pollarray, 2, -1) >= 0); + if(errno > 0) { + print(LOG_ERR "net: error encountered while polling (errno %d)", errno); + } + + free(inbuf); + + curl_easy_cleanup(ws_handle); + + panic("net: websocket closed unexpectedly"); + return 0; } diff --git a/net/ws.c b/net/ws.c new file mode 100644 index 0000000..9ad857d --- /dev/null +++ b/net/ws.c @@ -0,0 +1,61 @@ +#include +#include +#include +#include +#include + +#include +#include + +#include + +extern CURL *ws_handle; +long last_sequence = -1; +struct timeval heartbeat_time; + +void ws_send_heartbeat() +{ + char buf[128] = "{\"op\":1,\"d\":null}"; + if(last_sequence > 0) + snprintf(buf, 128, "{\"op\":1,\"d\":%ld}", last_sequence); + size_t sent; + curl_ws_send(ws_handle, buf, strnlen(buf, 128), &sent, 0, CURLWS_TEXT); + + struct itimerval itimer; + getitimer(ITIMER_REAL, &itimer); + if(itimer.it_value.tv_sec < heartbeat_time.tv_sec - 2) { + itimer.it_value = heartbeat_time; + setitimer(ITIMER_REAL, &itimer, NULL); + } +} + +void ws_handle_event(cJSON *event) +{ + int op = cJSON_GetObjectItem(event, "op")->valueint; + switch(op) { + case 1: /* Heartbeat request */ + ws_send_heartbeat(); + break; + case 10: ; /* Hello */ + int heartbeat_wait = cJSON_GetObjectItem(cJSON_GetObjectItem(event, "d"), "heartbeat_interval")->valueint; + float jitter = (float)rand() / (RAND_MAX * 1.0f); + + heartbeat_time.tv_sec = heartbeat_wait / 1000; + heartbeat_time.tv_usec = (heartbeat_wait % 1000) * 1000; + struct timeval jitter_time = { + .tv_sec = heartbeat_time.tv_sec * jitter, + .tv_usec = heartbeat_time.tv_usec * jitter, + }; + struct itimerval new_itimer = { + .it_interval = heartbeat_time, + .it_value = jitter_time + }; + setitimer(ITIMER_REAL, &new_itimer, NULL); + break; + case 11: + break; + default: + print(LOG_ERR "ws: received unknown WS opcode %d", op); + break; + } +} -- cgit v1.2.3