加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
ocpp_ws_client.c 29.27 KB
一键复制 编辑 原始数据 按行查看 历史
Baba 提交于 2020-05-17 21:10 . removed controller
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023
/**
* @file ocpp_ws_client.c
* @author Parikshit Tyagi
* @version 1.4
* @date 3 July 2019
* @brief This is the main library that let's charging station connects to CMS
* over websockets.It works on multi threading hence don't forget to use
* -lpthread while compiling with GCC.
* Mainly used in packet framing of the payload as mention in websockets
* (https://tools.ietf.org/html/rfc6455) official rfc, for more details
* please refer to the above link.
* Currently library supports only unsecured websocket connection
*
*/
#include "ocpp_ws_client.h"
#include "utils.h"
//Define errors
char *errors[] = {
"Unknown error occured",
"Error while getting address info",
"Could connect to any address returned by getaddrinfo",
"Error receiving data in client run thread",
"Error during libwsclient_close",
"Error sending while handling control frame",
"Received masked frame from server",
"Got null pointer during message dispatch",
"Attempted to send after close frame was sent",
"Attempted to send during connect",
"Attempted to send null payload",
"Attempted to send too much data",
"Error during send in libwsclient_send",
"Remote end closed connection during handshake",
"Problem receiving data during handshake",
"Remote web server responded with bad HTTP status during handshake",
"Remote web server did not respond with upgrade header during handshake",
"Remote web server did not respond with connection header during handshake",
"Remote web server did not specify the appropriate Sec-WebSocket-Accept header during handshake",
NULL
};
/**
* @brief --> this function is the entry point for intiating websocket connection to server
* @param --> const char *URI : This is the URL of Websocket server where connection is to be initiated
*/
wsclient *libwsclient_new(const char *URI)
{
printf("\nInside libws client\n");
//Made ws_client
wsclient *client = NULL;
printf("\nCompleted the client connection\n");
client = (wsclient *)malloc(sizeof(wsclient));
if (!client)
{
fprintf(stderr, "Unable to allocate memory in libwsclient_new.\n");
exit(WS_EXIT_MALLOC);
}
memset(client, 0, sizeof(wsclient));
if (pthread_mutex_init(&client->lock, NULL) != 0)
{
fprintf(stderr, "Unable to init mutex in libwsclient_new.\n");
exit(WS_EXIT_PTHREAD_MUTEX_INIT);
}
if (pthread_mutex_init(&client->send_lock, NULL) != 0)
{
fprintf(stderr, "Unable to init send lock in libwsclient_new.\n");
exit(WS_EXIT_PTHREAD_MUTEX_INIT);
}
pthread_mutex_lock(&client->lock);
client->URI = (char *)malloc(strlen(URI) + 1);
if (!client->URI)
{
fprintf(stderr, "Unable to allocate memory in libwsclient_new.\n");
exit(WS_EXIT_MALLOC);
}
memset(client->URI, 0, strlen(URI) + 1);
strncpy(client->URI, URI, strlen(URI));
client->flags |= CLIENT_CONNECTING;
pthread_mutex_unlock(&client->lock);
//Creating new thread which initiates handshake with the server
if (pthread_create(&client->handshake_thread, NULL, libwsclient_handshake_thread, (void *)client))
{
fprintf(stderr, "Unable to create handshake thread.\n");
exit(WS_EXIT_PTHREAD_CREATE);
}
return client;
}
/**
* @brief --> This function intiates the websocket connection with the server, it follows the process that is mentioned in rfc6455
* @param --> void *ptr : This is the pointer which points towards URI, that is passed while calling it
*/
void *libwsclient_handshake_thread(void *ptr)
{
printf("Inside handshake thread\n");
wsclient *client = (wsclient *)ptr;
wsclient_error *err = NULL;
const char *URI = client->URI;
SHA1Context shactx;
const char *UUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
char pre_encode[256];
char sha1bytes[20];
char expected_base64[512];
char request_headers[1024];
char websocket_key[256];
char key_nonce[16];
char scheme[10];
char host[255];
char request_host[255];
char port[10];
char path[255];
char recv_buff[1024];
char *URI_copy = NULL, *p = NULL, *rcv = NULL, *tok = NULL;
int i, z, sockfd, n, flags = 0, headers_space = 1024;
URI_copy = (char *)malloc(strlen(URI) + 1);
if (!URI_copy)
{
fprintf(stderr, "Unable to allocate memory in libwclient handshake.\n");
exit(WS_EXIT_MALLOC);
}
memset(URI_copy, 0, strlen(URI) + 1);
strncpy(URI_copy, URI, strlen(URI));
p = strstr(URI_copy, "://");
if (p == NULL)
{
fprintf(stderr, "Malformed or missing scheme for URI.\n");
exit(WS_EXIT_BAD_SCHEME);
}
strncpy(scheme, URI_copy, p - URI_copy);
scheme[p - URI_copy] = '\0';
printf("\nScheme = %s\n", scheme);
if (strcmp(scheme, "ws") != 0 && strcmp(scheme, "wss") != 0)
{
fprintf(stderr, "Invalid scheme for URI: %s\n", scheme);
exit(WS_EXIT_BAD_SCHEME);
}
if (strcmp(scheme, "ws") == 0)
{
strncpy(port, "8080", 9); //server is at port 80 //Change this port to 80 when going to production
}
else
{
strncpy(port, "443", 9);
pthread_mutex_lock(&client->lock);
client->flags != CLIENT_IS_SSL;
pthread_mutex_unlock(&client->lock);
}
for(i=p-URI_copy+3,z=0;*(URI_copy+i) != '/' && *(URI_copy+i) != ':' && *(URI_copy+i) != '\0';i++,z++) {
host[z] = *(URI_copy+i);
}
host[z] = '\0';
if(*(URI_copy+i) == ':') {
i++;
p = strchr(URI_copy+i, '/');
if(!p)
p = strchr(URI_copy+i, '\0');
strncpy(port, URI_copy+i, (p - (URI_copy+i)));
port[p-(URI_copy+i)] = '\0';
i += p-(URI_copy+i);
}
if(*(URI_copy+i) == '\0') {
//end of URI request path will be /
strncpy(path, "/", 1);
} else {
strncpy(path, URI_copy+i, 254);
}
free(URI_copy);
printf("\npath = > %s\n", path);
sockfd = libwsclient_open_connection(host, port); //line 564
if (sockfd < 0)
{
if (client->onerror)
{
err = libwsclient_new_error(sockfd);
client->onerror(client, err);
free(err);
err = NULL;
}
return NULL;
}
printf("Socket Connected \n");
pthread_mutex_lock(&client->lock);
client->sockfd = sockfd;
pthread_mutex_unlock(&client->lock);
//generate NONCE for Handshake
get_nonce(key_nonce, sizeof(key_nonce));
base64_encode(key_nonce, sizeof(key_nonce), websocket_key, sizeof(websocket_key)); //encoding the Nonce with base64
memset(request_headers, 0, 1024);
if (strcmp(port, "80") != 0)
{
snprintf(request_host, 255, "%s:%s", host, port);
}
else
{
snprintf(request_host, 255, "%s", host);
}
//For OCPP connection always remember to put Sec-WebSocket-Protocol
/*
GET /webServices/ocpp/CP3211 HTTP/1.1
Host: some.server.com:33033
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==
Sec-WebSocket-Protocol: ocpp1.6, ocpp1.5
Sec-WebSocket-Version: 13
*/
snprintf(request_headers, 1024, "GET %s HTTP/1.1\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Protocol: ocpp1.6\r\nHost: %s\r\nSec-WebSocket-Key: %s\r\nSec-WebSocket-Version: 13\r\n\r\n", path, request_host, websocket_key);
printf("Request header = %s", request_headers);
n = _libwsclient_write(client, request_headers, strlen(request_headers));
z = 0;
memset(recv_buff, 0, 1024);
//TODO: actually handle data after \r\n\r\n in case server
// sends post-handshake data that gets coalesced in this recv
do
{
n = _libwsclient_read(client, recv_buff + z, 1023 - z);
z += n;
} while ((z < 4 || strstr(recv_buff, "\r\n\r\n") == NULL) && n > 0);
if (n == 0)
{
if (client->onerror)
{
err = libwsclient_new_error(WS_HANDSHAKE_REMOTE_CLOSED_ERR);
client->onerror(client, err);
free(err);
err = NULL;
}
return NULL;
}
if (n < 0)
{
if (client->onerror)
{
err = libwsclient_new_error(WS_HANDSHAKE_RECV_ERR);
err->extra_code = n;
client->onerror(client, err);
free(err);
}
return NULL;
}
//parse recv_buf for response headers and assure Accept matches expected value
rcv = (char *)malloc(strlen(recv_buff) + 1);
if (!rcv)
{
fprintf(stderr, "Unable to allocate memory in libwsclient_new.\n");
exit(WS_EXIT_MALLOC);
}
memset(rcv, 0, strlen(recv_buff) + 1);
strncpy(rcv, recv_buff, strlen(recv_buff));
memset(pre_encode, 0, 256);
snprintf(pre_encode, 256, "%s%s", websocket_key, UUID);
SHA1Reset(&shactx);
SHA1Input(&shactx, pre_encode, strlen(pre_encode));
SHA1Result(&shactx);
memset(pre_encode, 0, 256);
snprintf(pre_encode, 256, "%08x%08x%08x%08x%08x", shactx.Message_Digest[0], shactx.Message_Digest[1], shactx.Message_Digest[2], shactx.Message_Digest[3], shactx.Message_Digest[4]);
for (z = 0; z < (strlen(pre_encode) / 2); z++)
sscanf(pre_encode + (z * 2), "%02hhx", sha1bytes);
memset(expected_base64, 0, 512);
base64_encode(sha1bytes, 20, expected_base64, 512);
for (tok = strtok(rcv, "\r\n"); tok != NULL; tok = strtok(NULL, "\r\n"))
{
if (*tok == 'H' && *(tok + 1) == 'T' && *(tok + 2) == 'T' && *(tok + 3) == 'P')
{
p = strchr(tok, ' ');
p = strchr(p + 1, ' ');
*p = '\0';
if (strcmp(tok, "HTTP/1.1 101") != 0 && strcmp(tok, "HTTP/1.0 101") != 0)
{
if (client->onerror)
{
err = libwsclient_new_error(WS_HANDSHAKE_BAD_STATUS_ERR);
client->onerror(client, err);
free(err);
err = NULL;
}
return NULL;
}
flags |= REQUEST_VALID_STATUS;
}
else
{
p = strchr(tok, ' ');
*p = '\0';
if (strcmp(tok, "Upgrade:") == 0)
{
if (strcasecmp(p + 1, "websocket") == 0)
{
flags |= REQUEST_HAS_UPGRADE;
}
}
if (strcmp(tok, "Connection:") == 0)
{
if (strcasecmp(p + 1, "upgrade") == 0)
{
flags |= REQUEST_HAS_CONNECTION;
}
}
if (strcmp(tok, "Sec-WebSocket-Accept:") == 0)
{
if (strcmp(p + 1, expected_base64) == 0)
{
flags |= REQUEST_VALID_ACCEPT;
}
}
}
}
if (!flags & REQUEST_HAS_UPGRADE)
{
if (client->onerror)
{
err = libwsclient_new_error(WS_HANDSHAKE_NO_UPGRADE_ERR);
client->onerror(client, err);
free(err);
err = NULL;
}
return NULL;
}
if (!flags & REQUEST_HAS_CONNECTION)
{
if (client->onerror)
{
err = libwsclient_new_error(WS_HANDSHAKE_NO_CONNECTION_ERR);
client->onerror(client, err);
free(err);
err = NULL;
}
return NULL;
}
if (!flags & REQUEST_VALID_ACCEPT)
{
if (client->onerror)
{
err = libwsclient_new_error(WS_HANDSHAKE_BAD_ACCEPT_ERR);
client->onerror(client, err);
free(err);
err = NULL;
}
return NULL;
}
pthread_mutex_lock(&client->lock);
client->flags &= ~CLIENT_CONNECTING;
pthread_mutex_unlock(&client->lock);
if (client->onopen != NULL)
{
client->onopen(client);
}
return NULL;
}
/**
* @brief --> This function opens the connection between client and server
* @param --> const char *host : socket host which will be parsed from URI passed in the beginning
* const char *port : port address for socket connection
*/
int libwsclient_open_connection(const char *host, const char *port)
{
struct addrinfo hints, *servinfo, *p;
int rv, sockfd;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
if ((rv = getaddrinfo(host, port, &hints, &servinfo)) != 0)
{
return WS_OPEN_CONNECTION_ADDRINFO_ERR;
}
for (p = servinfo; p != NULL; p = p->ai_next)
{
if ((sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1)
{
continue;
}
if (connect(sockfd, p->ai_addr, p->ai_addrlen) == -1)
{
close(sockfd);
continue;
}
break;
}
freeaddrinfo(servinfo);
if (p == NULL)
{
return WS_OPEN_CONNECTION_ADDRINFO_EXHAUSTED_ERR;
}
return sockfd;
}
/**
* @brief --> function sets errorCode with error when ever any error occurs
* @param --> int errcode : error code to be passed
*/
wsclient_error *libwsclient_new_error(int errcode)
{
wsclient_error *err = NULL;
err = (wsclient_error *)malloc(sizeof(wsclient_error));
if (!err)
{
//one of the few places we will fail and exit
fprintf(stderr, "Unable to allocate memory in libwsclient_new_error.\n");
exit(errcode);
}
memset(err, 0, sizeof(wsclient_error));
err->code = errcode;
switch (err->code)
{
case WS_OPEN_CONNECTION_ADDRINFO_ERR:
err->str = *(errors + 1);
break;
case WS_OPEN_CONNECTION_ADDRINFO_EXHAUSTED_ERR:
err->str = *(errors + 2);
break;
case WS_RUN_THREAD_RECV_ERR:
err->str = *(errors + 3);
break;
case WS_DO_CLOSE_SEND_ERR:
err->str = *(errors + 4);
break;
case WS_HANDLE_CTL_FRAME_SEND_ERR:
err->str = *(errors + 5);
break;
case WS_COMPLETE_FRAME_MASKED_ERR:
err->str = *(errors + 6);
break;
case WS_DISPATCH_MESSAGE_NULL_PTR_ERR:
err->str = *(errors + 7);
break;
case WS_SEND_AFTER_CLOSE_FRAME_ERR:
err->str = *(errors + 8);
break;
case WS_SEND_DURING_CONNECT_ERR:
err->str = *(errors + 9);
break;
case WS_SEND_NULL_DATA_ERR:
err->str = *(errors + 10);
break;
case WS_SEND_DATA_TOO_LARGE_ERR:
err->str = *(errors + 11);
break;
case WS_SEND_SEND_ERR:
err->str = *(errors + 12);
break;
case WS_HANDSHAKE_REMOTE_CLOSED_ERR:
err->str = *(errors + 13);
break;
case WS_HANDSHAKE_RECV_ERR:
err->str = *(errors + 14);
break;
case WS_HANDSHAKE_BAD_STATUS_ERR:
err->str = *(errors + 15);
break;
case WS_HANDSHAKE_NO_UPGRADE_ERR:
err->str = *(errors + 16);
break;
case WS_HANDSHAKE_NO_CONNECTION_ERR:
err->str = *(errors + 17);
break;
case WS_HANDSHAKE_BAD_ACCEPT_ERR:
err->str = *(errors + 18);
break;
default:
err->str = *errors;
break;
}
return err;
}
/**
* @brief --> To send data from client to server
* @param --> wsclient *client : websocket client struct
* char *strdata : data that is to be send to server of char * datatype
*/
int libwsclient_send(wsclient *client, char *strdata) {
wsclient_error *err = NULL;
struct timeval tv;
unsigned char mask[4];
unsigned int mask_int;
unsigned long long payload_len;
unsigned char finNopcode;
unsigned int payload_len_small;
unsigned int payload_offset = 6;
unsigned int len_size;
unsigned long long be_payload_len;
unsigned int sent = 0;
int i, sockfd;
unsigned int frame_size;
char *data;
sockfd = client->sockfd;
if(client->flags & CLIENT_SENT_CLOSE_FRAME) {
if(client->onerror) {
err = libwsclient_new_error(WS_SEND_AFTER_CLOSE_FRAME_ERR);
client->onerror(client, err);
free(err);
err = NULL;
}
return 0;
}
if(client->flags & CLIENT_CONNECTING) {
if(client->onerror) {
err = libwsclient_new_error(WS_SEND_DURING_CONNECT_ERR);
client->onerror(client, err);
free(err);
err = NULL;
}
return 0;
}
if(strdata == NULL) {
if(client->onerror) {
err = libwsclient_new_error(WS_SEND_NULL_DATA_ERR);
client->onerror(client, err);
free(err);
err = NULL;
}
return 0;
}
gettimeofday(&tv, NULL);
srand(tv.tv_usec * tv.tv_sec);
mask_int = rand();
memcpy(mask, &mask_int, 4);
payload_len = strlen(strdata);
finNopcode = 0x81; //FIN and text opcode.
if(payload_len <= 125) {
frame_size = 6 + payload_len;
payload_len_small = payload_len;
} else if(payload_len > 125 && payload_len <= 0xffff) {
frame_size = 8 + payload_len;
payload_len_small = 126;
payload_offset += 2;
} else if(payload_len > 0xffff && payload_len <= 0xffffffffffffffffLL) {
frame_size = 14 + payload_len;
payload_len_small = 127;
payload_offset += 8;
} else {
if(client->onerror) {
err = libwsclient_new_error(WS_SEND_DATA_TOO_LARGE_ERR);
client->onerror(client, err);
free(err);
err = NULL;
}
return -1;
}
data = (char *)malloc(frame_size);
memset(data, 0, frame_size);
*data = finNopcode;
*(data+1) = payload_len_small | 0x80; //payload length with mask bit on
if(payload_len_small == 126) {
payload_len &= 0xffff;
len_size = 2;
for(i = 0; i < len_size; i++) {
*(data+2+i) = *((char *)&payload_len+(len_size-i-1));
}
}
if(payload_len_small == 127) {
payload_len &= 0xffffffffffffffffLL;
len_size = 8;
for(i = 0; i < len_size; i++) {
*(data+2+i) = *((char *)&payload_len+(len_size-i-1));
}
}
for(i=0;i<4;i++)
*(data+(payload_offset-4)+i) = mask[i];
memcpy(data+payload_offset, strdata, strlen(strdata));
for(i=0;i<strlen(strdata);i++)
*(data+payload_offset+i) ^= mask[i % 4] & 0xff;
sent = 0;
i = 0;
pthread_mutex_lock(&client->send_lock);
while(sent < frame_size && i >= 0) {
i = _libwsclient_write(client, data+sent, frame_size - sent);
sent += i;
}
pthread_mutex_unlock(&client->send_lock);
if(i < 0) {
if(client->onerror) {
err = libwsclient_new_error(WS_SEND_SEND_ERR);
client->onerror(client, err);
free(err);
err = NULL;
}
}
free(data);
return sent;
}
/**
* @brief --> This function maintains running of websocket thread
* @param --> wsclient *c : Websocket client from stuct
*/
void libwsclient_run(wsclient *c)
{
if (c->flags & CLIENT_CONNECTING)
{
pthread_join(c->handshake_thread, NULL);
pthread_mutex_lock(&c->lock);
c->flags &= ~CLIENT_CONNECTING;
free(c->URI);
c->URI = NULL;
pthread_mutex_unlock(&c->lock);
}
if (c->sockfd)
{
pthread_create(&c->run_thread, NULL, libwsclient_run_thread, (void *)c);
}
}
/**
* @brief --> This function is called when libwsclient_run is called
* @param --> void *ptr : It is used for pointing towards websocket client
*/
void *libwsclient_run_thread(void *ptr)
{
wsclient *c = (wsclient *)ptr;
wsclient_error *err = NULL;
int sockfd;
char buf[1024];
int n, i;
do
{
memset(buf, 0, 1024);
n = _libwsclient_read(c, buf, 1024);
for (i = 0; i < n; i++)
libwsclient_in_data(c, buf[i]);
} while (n > 0);
if (n < 0)
{
if (c->onerror)
{
err = libwsclient_new_error(WS_RUN_THREAD_RECV_ERR);
err->extra_code = n;
c->onerror(c, err);
free(err);
err = NULL;
}
}
if (c->onclose)
{
c->onclose(c);
}
close(c->sockfd);
free(c);
return NULL;
}
/**
* @brief --> If any data comes from Websocket it is handled here
* @param --> wsclient *c : Websocket client
* char in : Char type data coming in
*/
inline void libwsclient_in_data(wsclient *c, char in){
wsclient_frame *current = NULL, *new = NULL;
unsigned char payload_len_short;
pthread_mutex_lock(&c->lock);
if(c->current_frame == NULL){
c->current_frame = (wsclient_frame *)malloc(sizeof(wsclient_frame));
memset(c->current_frame, 0, sizeof(wsclient_frame));
c->current_frame->payload_len = -1;
c->current_frame->rawdata_sz = FRAME_CHUNK_LENGTH;
c->current_frame->rawdata = (char *)malloc(c->current_frame->rawdata_sz);
memset(c->current_frame->rawdata, 0, c->current_frame->rawdata_sz);
}
current = c->current_frame;
if(current->rawdata_idx >= current->rawdata_sz){
current->rawdata_sz += FRAME_CHUNK_LENGTH;
current->rawdata = (char *)realloc(current->rawdata, current->rawdata_sz);
memset(current->rawdata + current->rawdata_idx, 0, current->rawdata_sz - current->rawdata_idx);
}
*(current->rawdata + current->rawdata_idx++) = in;
pthread_mutex_unlock(&c->lock);
if(libwsclient_complete_frame(c, current) == 1){
if(current->fin == 1){
//is control frame
if((current->opcode & 0x08) == 0x08){
libwsclient_handle_control_frame(c, current);
}
else {
libwsclient_dispatch_message(c, current);
c->current_frame = NULL;
}
}else{
new = (wsclient_frame *)malloc(sizeof(wsclient_frame));
memset(new, 0, sizeof(wsclient_frame));
new->payload_len = -1;
new->rawdata = (char *)malloc(FRAME_CHUNK_LENGTH);
memset(new->rawdata, 0, FRAME_CHUNK_LENGTH);
new->prev_frame = current;
current->next_frame = new;
c->current_frame = new;
}
}
}
/**
* @brief --> this function forms the complete frame for websocket
* @param --> wsclient *c : websocket client
* wsclient_frame *frame : websocket client frame
*/
int libwsclient_complete_frame(wsclient *c, wsclient_frame *frame){
wsclient_error *err = NULL;
int payload_len_short, i;
unsigned long long payload_len = 0;
if(frame->rawdata_idx < 2){
return 0;
}
frame->fin = (*(frame->rawdata) & 0x80) == 0x80 ? 1 : 0;
frame->opcode = *(frame->rawdata) & 0x0f;
frame->payload_offset = 2;
if((*(frame->rawdata+1) & 0x80) != 0x0){
if(c->onerror){
err = libwsclient_new_error(WS_COMPLETE_FRAME_MASKED_ERR);
c->onerror(c, err);
free(err);
err = NULL;
}
pthread_mutex_lock(&c->lock);
c->flags |= CLIENT_SHOULD_CLOSE;
pthread_mutex_unlock(&c->lock);
return 0;
}
payload_len_short = *(frame->rawdata+1) & 0x7f;
switch(payload_len_short){
case 126:
if(frame->rawdata_idx < 4){
return 0;
}
for (i = 0; i < 2; i++){
memcpy((void *)&payload_len+i, frame->rawdata+3-i, 1);
}
frame->payload_offset += 2;
frame->payload_len = payload_len;
break;
case 127:
if(frame->rawdata_idx < 10) {
return 0;
}
for(i = 0; i < 8; i++) {
memcpy((void *)&payload_len+i, frame->rawdata+9-i, 1);
}
frame->payload_offset += 8;
frame->payload_len = payload_len;
break;
default:
frame->payload_len = payload_len_short;
break;
}
if(frame->rawdata_idx < frame->payload_offset + frame->payload_len) {
return 0;
}
return 1;
}
/**
* @brief --> This function controls the complete frame of websocket client
* @param --> wsclient *c : websocket client
* wsclient_frame *ctl_frame : websocket control frame
*/
void libwsclient_handle_control_frame(wsclient *c, wsclient_frame *ctl_frame) {
wsclient_error *err = NULL;
wsclient_frame *ptr = NULL;
int i, n = 0;
char mask[4];
int mask_int;
struct timeval tv;
gettimeofday(&tv, NULL);
srand(tv.tv_sec * tv.tv_usec);
mask_int = rand();
memcpy(mask, &mask_int, 4);
pthread_mutex_lock(&c->lock);
switch(ctl_frame->opcode) {
case 0x8:
//close frame
if((c->flags & CLIENT_SENT_CLOSE_FRAME) == 0) {
//server request close. Send close frame as acknowledgement.
for(i=0;i<ctl_frame->payload_len;i++)
*(ctl_frame->rawdata + ctl_frame->payload_offset + i) ^= (mask[i % 4] & 0xff); //mask payload
*(ctl_frame->rawdata + 1) |= 0x80; //turn mask bit on
i = 0;
pthread_mutex_lock(&c->send_lock);
while(i < ctl_frame->payload_offset + ctl_frame->payload_len && n >= 0) {
n = _libwsclient_write(c, ctl_frame->rawdata + i, ctl_frame->payload_offset + ctl_frame->payload_len - i);
i += n;
}
pthread_mutex_unlock(&c->send_lock);
if(n < 0) {
if(c->onerror) {
err = libwsclient_new_error(WS_HANDLE_CTL_FRAME_SEND_ERR);
err->extra_code = n;
c->onerror(c, err);
free(err);
err = NULL;
}
}
}
c->flags |= CLIENT_SHOULD_CLOSE;
break;
default:
fprintf(stderr, "Unhandled control frame received. Opcode: %d\n", ctl_frame->opcode);
break;
}
ptr = ctl_frame->prev_frame; //This very well may be a NULL pointer, but just in case we preserve it.
free(ctl_frame->rawdata);
memset(ctl_frame, 0, sizeof(wsclient_frame));
ctl_frame->prev_frame = ptr;
ctl_frame->rawdata = (char *)malloc(FRAME_CHUNK_LENGTH);
memset(ctl_frame->rawdata, 0, FRAME_CHUNK_LENGTH);
pthread_mutex_unlock(&c->lock);
}
/**
* @brief --> This function dispatches the message from client to server
* @param --> wsclient *c : websocket client
* wsclient_frame *current : current frame for ws_client
*/
void libwsclient_dispatch_message(wsclient *c, wsclient_frame *current) {
unsigned long long message_payload_len, message_offset;
int message_opcode, i;
char *message_payload;
wsclient_frame *first = NULL;
wsclient_message *msg = NULL;
wsclient_error *err = NULL;
if(current == NULL) {
if(c->onerror) {
err = libwsclient_new_error(WS_DISPATCH_MESSAGE_NULL_PTR_ERR);
c->onerror(c, err);
free(err);
err = NULL;
}
return;
}
message_offset = 0;
message_payload_len = current->payload_len;
for(;current->prev_frame != NULL;current = current->prev_frame) {
message_payload_len += current->payload_len;
}
first = current;
message_opcode = current->opcode;
message_payload = (char *)malloc(message_payload_len + 1);
memset(message_payload, 0, message_payload_len + 1);
for(;current != NULL; current = current->next_frame) {
memcpy(message_payload + message_offset, current->rawdata + current->payload_offset, current->payload_len);
message_offset += current->payload_len;
}
libwsclient_cleanup_frames(first);
msg = (wsclient_message *)malloc(sizeof(wsclient_message));
memset(msg, 0, sizeof(wsclient_message));
msg->opcode = message_opcode;
msg->payload_len = message_offset;
msg->payload = message_payload;
if(c->onmessage != NULL) {
c->onmessage(c, msg);
} else {
fprintf(stderr, "No onmessage call back registered with libwsclient.\n");
}
free(msg->payload);
free(msg);
}
/**
* @brief --> once the connection is closed, this function cleans up the frames
* @param --> wsclient_frame _first : ws_client_frame which was used for the first time
*/
void libwsclient_cleanup_frames(wsclient_frame *first) {
wsclient_frame *this = NULL;
wsclient_frame *next = first;
while(next != NULL) {
this = next;
next = this->next_frame;
if(this->rawdata != NULL) {
free(this->rawdata);
}
free(this);
}
}
/**
* @brief --> sends the data to socket
* @param --> wsclient *c : websocket client
* const void *buf : buffer size to be sent
* size_t length : length of the buffer size
*/
ssize_t _libwsclient_write(wsclient *c, const void *buf, size_t length)
{
return send(c->sockfd, buf, length, 0);
}
/**
* @brief --> this reads the message received from socket
* @param --> wsclient *c : websocket client
* void *buf : buffer to be used for storing data of no datatype
* size_t length : length of the buffer of message sent
*/
ssize_t _libwsclient_read(wsclient *c, void *buf, size_t length)
{
return recv(c->sockfd, buf, length, 0);
}
/**
* @brief --> this function kills the thread running
* @param --> wsclient *client : websocket client
*/
void libwsclient_finish(wsclient *client){
if(client->helper_thread){
pthread_kill(client->helper_thread, SIGINT);
}
if(client->run_thread){
pthread_join(client->run_thread, NULL);
}
}
/**
* @brief --> this function opens the locked thread
* @param --> wsclient *client : websocket client
* int (*cb)(wsclient *c) : websocket call back function
*/
void libwsclient_onclose(wsclient *client, int (*cb)(wsclient *c))
{
pthread_mutex_lock(&client->lock);
client->onclose = cb;
pthread_mutex_unlock(&client->lock);
}
/**
* @brief -->
* @param -->
*/
void libwsclient_onopen(wsclient *client, int (*cb)(wsclient *c))
{
pthread_mutex_lock(&client->lock);
client->onopen = cb;
pthread_mutex_unlock(&client->lock);
}
/**
* @brief -->
* @param -->
*/
void libwsclient_onmessage(wsclient *client, int (*cb)(wsclient *c, wsclient_message *msg))
{
pthread_mutex_lock(&client->lock);
client->onmessage = cb;
pthread_mutex_unlock(&client->lock);
}
/**
* @brief -->
* @param -->
*/
void libwsclient_onerror(wsclient *client, int (*cb)(wsclient *c, wsclient_error *err))
{
pthread_mutex_lock(&client->lock);
client->onerror = cb;
pthread_mutex_unlock(&client->lock);
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化