加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
0259-cleancode-add-rpc_async_call-remove-rpc_msg_arg.sock.patch 23.91 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693
From 710bb34f1b46e0df4d82fe46fc36e729160ea1d7 Mon Sep 17 00:00:00 2001
From: Lemmy Huang <hlm3280@163.com>
Date: Sun, 1 Sep 2024 00:21:52 +0800
Subject: [PATCH] cleancode: add rpc_async_call, remove rpc_msg_arg.socklen,
fix some format
Signed-off-by: Lemmy Huang <hlm3280@163.com>
---
src/lstack/core/lstack_lwip.c | 27 +----
src/lstack/core/lstack_protocol_stack.c | 25 ++---
src/lstack/core/lstack_thread_rpc.c | 124 +++++++++++----------
src/lstack/include/lstack_lwip.h | 27 +++--
src/lstack/include/lstack_protocol_stack.h | 13 +--
src/lstack/include/lstack_thread_rpc.h | 25 +++--
6 files changed, 117 insertions(+), 124 deletions(-)
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
index 3454961..91f4838 100644
--- a/src/lstack/core/lstack_lwip.c
+++ b/src/lstack/core/lstack_lwip.c
@@ -588,23 +588,6 @@ bool do_lwip_replenish_sendring(struct protocol_stack *stack, struct lwip_sock *
return replenish_again;
}
-int do_lwip_send(struct protocol_stack *stack, int32_t fd, struct lwip_sock *sock,
- size_t len, int32_t flags)
-{
- ssize_t ret;
- /* send all send_ring, so len set lwip send max. */
- if (NETCONN_IS_UDP(sock)) {
- ret = lwip_send(fd, sock, len, flags);
- } else {
- ret = lwip_send(fd, sock, UINT16_MAX, flags);
- }
- if (ret < 0 && (errno == ENOTCONN || errno == ECONNRESET || errno == ECONNABORTED)) {
- return -1;
- }
-
- return do_lwip_replenish_sendring(stack, sock);
-}
-
static inline void free_recv_ring_readover(struct rte_ring *ring)
{
void *pbufs[SOCK_RECV_RING_SIZE];
@@ -753,10 +736,10 @@ static inline void notice_stack_tcp_send(struct lwip_sock *sock, int32_t fd, int
static inline void notice_stack_udp_send(struct lwip_sock *sock, int32_t fd, int32_t len, int32_t flags)
{
- __sync_fetch_and_add(&sock->call_num, 1);
- while (rpc_call_udp_send(&sock->stack->rpc_queue, fd, len, flags) < 0) {
- usleep(1000); // 1000: wait 1ms to exec again
- }
+ __sync_fetch_and_add(&sock->call_num, 1);
+ while (rpc_call_udp_send(&sock->stack->rpc_queue, fd, len, flags) < 0) {
+ usleep(1000); // 1000: wait 1ms to exec again
+ }
}
static inline void notice_stack_send(struct lwip_sock *sock, int32_t fd, int32_t len, int32_t flags)
@@ -875,7 +858,7 @@ ssize_t do_lwip_send_to_stack(int32_t fd, const void *buf, size_t len, int32_t f
// send = 0 : tcp peer close connection ?
if (send <= 0) {
return send;
- }
+ }
}
notice_stack_send(sock, fd, send, flags);
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
index bcca1a7..f1eeba1 100644
--- a/src/lstack/core/lstack_protocol_stack.c
+++ b/src/lstack/core/lstack_protocol_stack.c
@@ -88,7 +88,7 @@ struct protocol_stack *get_protocol_stack(void)
return g_stack_p;
}
-struct protocol_stack *get_protocol_stack_by_fd(int32_t fd)
+struct protocol_stack *get_protocol_stack_by_fd(int fd)
{
struct lwip_sock *sock = lwip_get_socket(fd);
if (POSIX_IS_CLOSED(sock)) {
@@ -468,7 +468,7 @@ END:
return NULL;
}
-int stack_polling(uint32_t wakeup_tick)
+int stack_polling(unsigned wakeup_tick)
{
int force_quit;
struct cfg_params *cfg = get_global_cfg_params();
@@ -536,12 +536,11 @@ int stack_polling(uint32_t wakeup_tick)
static void* gazelle_stack_thread(void *arg)
{
struct thread_params *t_params = (struct thread_params*) arg;
-
uint16_t queue_id = t_params->queue_id;
- uint32_t wakeup_tick = 0;
-
- struct protocol_stack *stack = stack_thread_init(arg);
+ struct protocol_stack *stack;
+ unsigned wakeup_tick = 0;
+ stack = stack_thread_init(arg);
free(arg);
if (stack == NULL) {
LSTACK_LOG(ERR, LSTACK, "stack_thread_init failed queue_id=%hu\n", queue_id);
@@ -607,7 +606,7 @@ static int stack_group_init_mempool(void)
return 0;
}
-int32_t stack_group_init(void)
+int stack_group_init(void)
{
struct protocol_stack_group *stack_group = get_protocol_stack_group();
stack_group->stack_num = 0;
@@ -632,7 +631,7 @@ int32_t stack_group_init(void)
return 0;
}
-int32_t stack_setup_app_thread(void)
+int stack_setup_app_thread(void)
{
static PER_THREAD int first_flags = 1;
static _Atomic uint32_t queue_id = 0;
@@ -660,21 +659,21 @@ int32_t stack_setup_app_thread(void)
return 0;
}
-int32_t stack_setup_thread(void)
+int stack_setup_thread(void)
{
- int32_t ret;
+ int ret, i;
char name[PATH_MAX];
int queue_num = get_global_cfg_params()->num_queue;
struct thread_params *t_params[PROTOCOL_STACK_MAX] = {NULL};
int process_index = get_global_cfg_params()->process_idx;
- for (uint32_t i = 0; i < queue_num; ++i) {
+ for (i = 0; i < queue_num; ++i) {
t_params[i] = malloc(sizeof(struct thread_params));
if (t_params[i] == NULL) {
goto OUT1;
}
}
- for (uint32_t i = 0; i < queue_num; i++) {
+ for (i = 0; i < queue_num; i++) {
if (get_global_cfg_params()->seperate_send_recv) {
if (i % 2 == 0) {
ret = sprintf_s(name, sizeof(name), "%s_%d_%d", LSTACK_RECV_THREAD_NAME, process_index, i / 2);
@@ -714,7 +713,7 @@ int32_t stack_setup_thread(void)
return 0;
OUT1:
- for (int32_t i = 0; i < queue_num; ++i) {
+ for (i = 0; i < queue_num; ++i) {
if (t_params[i] != NULL) {
free(t_params[i]);
}
diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c
index 3e9889a..b4a5953 100644
--- a/src/lstack/core/lstack_thread_rpc.c
+++ b/src/lstack/core/lstack_thread_rpc.c
@@ -45,20 +45,20 @@ static struct rpc_msg *get_rpc_msg(struct rpc_msg_pool *rpc_pool)
return msg;
}
-static void rpc_msg_init(struct rpc_msg *msg, rpc_msg_func func, struct rpc_msg_pool *pool)
+__rte_always_inline
+static void rpc_msg_init(struct rpc_msg *msg, rpc_func_t func, struct rpc_msg_pool *pool)
{
msg->func = func;
msg->rpcpool = pool;
- msg->sync_flag = 1;
msg->recall_flag = 0;
pthread_spin_init(&msg->lock, PTHREAD_PROCESS_PRIVATE);
}
-static struct rpc_msg *rpc_msg_alloc(rpc_msg_func func)
+static struct rpc_msg *rpc_msg_alloc(rpc_func_t func)
{
- struct rpc_msg *msg = NULL;
+ struct rpc_msg *msg;
- if (g_rpc_pool == NULL) {
+ if (unlikely(g_rpc_pool == NULL)) {
g_rpc_pool = calloc(1, sizeof(struct rpc_msg_pool));
if (g_rpc_pool == NULL) {
LSTACK_LOG(INFO, LSTACK, "g_rpc_pool calloc failed\n");
@@ -66,8 +66,8 @@ static struct rpc_msg *rpc_msg_alloc(rpc_msg_func func)
exit(-1);
}
- g_rpc_pool->mempool = create_mempool("rpc_pool", get_global_cfg_params()->rpc_msg_max, sizeof(struct rpc_msg),
- 0, rte_gettid());
+ g_rpc_pool->mempool =
+ create_mempool("rpc_pool", get_global_cfg_params()->rpc_msg_max, sizeof(struct rpc_msg), 0, rte_gettid());
if (g_rpc_pool->mempool == NULL) {
LSTACK_LOG(INFO, LSTACK, "rpc_pool create failed, errno is %d\n", errno);
g_rpc_stats.call_alloc_fail++;
@@ -76,12 +76,12 @@ static struct rpc_msg *rpc_msg_alloc(rpc_msg_func func)
}
msg = get_rpc_msg(g_rpc_pool);
- if (msg == NULL) {
+ if (unlikely(msg == NULL)) {
g_rpc_stats.call_alloc_fail++;
return NULL;
}
- rpc_msg_init(msg, func, g_rpc_pool);
+ rpc_msg_init(msg, func, g_rpc_pool);
return msg;
}
@@ -97,8 +97,9 @@ static void rpc_msg_free(struct rpc_msg *msg)
}
__rte_always_inline
-static void rpc_call(rpc_queue *queue, struct rpc_msg *msg)
+static void rpc_async_call(rpc_queue *queue, struct rpc_msg *msg)
{
+ msg->sync_flag = 0;
lockless_queue_mpsc_push(queue, &msg->queue_node);
}
@@ -108,7 +109,9 @@ static int rpc_sync_call(rpc_queue *queue, struct rpc_msg *msg)
int ret;
pthread_spin_trylock(&msg->lock);
- rpc_call(queue, msg);
+
+ msg->sync_flag = 1;
+ lockless_queue_mpsc_push(queue, &msg->queue_node);
// waiting stack unlock
pthread_spin_lock(&msg->lock);
@@ -123,7 +126,7 @@ int rpc_msgcnt(rpc_queue *queue)
return lockless_queue_count(queue);
}
-static struct rpc_msg *rpc_msg_alloc_except(rpc_msg_func func)
+static struct rpc_msg *rpc_msg_alloc_except(rpc_func_t func)
{
struct rpc_msg *msg = calloc(1, sizeof(struct rpc_msg));
if (msg == NULL) {
@@ -146,14 +149,14 @@ int rpc_call_stack_exit(rpc_queue *queue)
return -1;
}
- rpc_call(queue, msg);
+ rpc_async_call(queue, msg);
return 0;
}
int rpc_poll_msg(rpc_queue *queue, int max_num)
{
int force_quit = 0;
- struct rpc_msg *msg = NULL;
+ struct rpc_msg *msg;
while (max_num--) {
lockless_queue_node *node = lockless_queue_mpsc_pop(queue);
@@ -163,24 +166,24 @@ int rpc_poll_msg(rpc_queue *queue, int max_num)
msg = container_of(node, struct rpc_msg, queue_node);
- if (msg->func) {
+ if (likely(msg->func)) {
msg->func(msg);
} else {
g_rpc_stats.call_null++;
}
- if (msg->func == stack_exit_by_rpc) {
+ if (unlikely(msg->func == stack_exit_by_rpc)) {
force_quit = 1;
}
+ if (msg->recall_flag) {
+ msg->recall_flag = 0;
+ continue;
+ }
- if (!msg->recall_flag) {
- if (msg->sync_flag) {
- pthread_spin_unlock(&msg->lock);
- } else {
- rpc_msg_free(msg);
- }
+ if (msg->sync_flag) {
+ pthread_spin_unlock(&msg->lock);
} else {
- msg->recall_flag = 0;
+ rpc_msg_free(msg);
}
}
@@ -204,7 +207,7 @@ static void callback_close(struct rpc_msg *msg)
if (sock && __atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) > 0) {
msg->recall_flag = 1;
- rpc_call(&stack->rpc_queue, msg); /* until stack_send recall finish */
+ rpc_async_call(&stack->rpc_queue, msg); /* until stack_send recall finish */
return;
}
@@ -223,7 +226,7 @@ static void callback_shutdown(struct rpc_msg *msg)
if (sock && __atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) > 0) {
msg->recall_flag = 1;
- rpc_call(&stack->rpc_queue, msg);
+ rpc_async_call(&stack->rpc_queue, msg);
return;
}
@@ -237,7 +240,7 @@ static void callback_shutdown(struct rpc_msg *msg)
static void callback_bind(struct rpc_msg *msg)
{
- msg->result = lwip_bind(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].cp, msg->args[MSG_ARG_2].socklen);
+ msg->result = lwip_bind(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].cp, msg->args[MSG_ARG_2].u);
if (msg->result != 0) {
LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d failed %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
}
@@ -265,7 +268,7 @@ static void callback_create_shadow_fd(struct rpc_msg *msg)
{
int fd = msg->args[MSG_ARG_0].i;
struct sockaddr *addr = msg->args[MSG_ARG_1].p;
- socklen_t addr_len = msg->args[MSG_ARG_2].socklen;
+ socklen_t addr_len = msg->args[MSG_ARG_2].u;
int clone_fd = 0;
struct lwip_sock *sock = lwip_get_socket(fd);
@@ -337,7 +340,7 @@ static void callback_accept(struct rpc_msg *msg)
static void callback_connect(struct rpc_msg *msg)
{
- msg->result = lwip_connect(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].socklen);
+ msg->result = lwip_connect(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].u);
if (msg->result < 0) {
msg->result = -errno;
}
@@ -391,7 +394,7 @@ int rpc_call_bind(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen
msg->args[MSG_ARG_0].i = fd;
msg->args[MSG_ARG_1].cp = addr;
- msg->args[MSG_ARG_2].socklen = addrlen;
+ msg->args[MSG_ARG_2].u = addrlen;
return rpc_sync_call(queue, msg);
}
@@ -418,7 +421,7 @@ int rpc_call_shadow_fd(rpc_queue *queue, int fd, const struct sockaddr *addr, so
msg->args[MSG_ARG_0].i = fd;
msg->args[MSG_ARG_1].cp = addr;
- msg->args[MSG_ARG_2].socklen = addrlen;
+ msg->args[MSG_ARG_2].u = addrlen;
return rpc_sync_call(queue, msg);
}
@@ -447,7 +450,7 @@ int rpc_call_connect(rpc_queue *queue, int fd, const struct sockaddr *addr, sock
msg->args[MSG_ARG_0].i = fd;
msg->args[MSG_ARG_1].cp = addr;
- msg->args[MSG_ARG_2].socklen = addrlen;
+ msg->args[MSG_ARG_2].u = addrlen;
int ret = rpc_sync_call(queue, msg);
if (ret < 0) {
@@ -486,7 +489,7 @@ static void callback_getsockopt(struct rpc_msg *msg)
static void callback_setsockopt(struct rpc_msg *msg)
{
msg->result = lwip_setsockopt(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i,
- msg->args[MSG_ARG_3].cp, msg->args[MSG_ARG_4].socklen);
+ msg->args[MSG_ARG_3].cp, msg->args[MSG_ARG_4].u);
if (msg->result != 0) {
LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d, level %d, optname %d, fail %ld\n", get_stack_tid(),
msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i, msg->result);
@@ -548,7 +551,7 @@ int rpc_call_setsockopt(rpc_queue *queue, int fd, int level, int optname, const
msg->args[MSG_ARG_1].i = level;
msg->args[MSG_ARG_2].i = optname;
msg->args[MSG_ARG_3].cp = optval;
- msg->args[MSG_ARG_4].socklen = optlen;
+ msg->args[MSG_ARG_4].u = optlen;
return rpc_sync_call(queue, msg);
}
@@ -556,13 +559,13 @@ int rpc_call_setsockopt(rpc_queue *queue, int fd, int level, int optname, const
static void callback_tcp_send(struct rpc_msg *msg)
{
int fd = msg->args[MSG_ARG_0].i;
- size_t len = msg->args[MSG_ARG_1].size;
+ size_t len = UINT16_MAX; /* ignore msg->args[MSG_ARG_1].size; */
struct protocol_stack *stack = get_protocol_stack();
- int replenish_again;
+ int ret;
+ msg->result = -1;
struct lwip_sock *sock = lwip_get_socket(fd);
- if (POSIX_IS_CLOSED(sock)) {
- msg->result = -1;
+ if (unlikely(POSIX_IS_CLOSED(sock))) {
return;
}
@@ -570,16 +573,18 @@ static void callback_tcp_send(struct rpc_msg *msg)
calculate_sock_latency(&stack->latency, sock, GAZELLE_LATENCY_WRITE_RPC_MSG);
}
- replenish_again = do_lwip_send(stack, sock->conn->callback_arg.socket, sock, len, 0);
- if (replenish_again < 0) {
+ ret = lwip_send(fd, sock, len, 0);
+ if (unlikely(ret < 0) && (errno == ENOTCONN || errno == ECONNRESET || errno == ECONNABORTED)) {
__sync_fetch_and_sub(&sock->call_num, 1);
return;
}
+ msg->result = 0;
- if (NETCONN_IS_DATAOUT(sock) || replenish_again > 0) {
+ ret = do_lwip_replenish_sendring(stack, sock);
+ if (ret > 0 || NETCONN_IS_DATAOUT(sock)) {
if (__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) == 1) {
msg->recall_flag = 1;
- rpc_call(&stack->rpc_queue, msg);
+ rpc_async_call(&stack->rpc_queue, msg);
return;
}
}
@@ -593,11 +598,11 @@ static void callback_udp_send(struct rpc_msg *msg)
int fd = msg->args[MSG_ARG_0].i;
size_t len = msg->args[MSG_ARG_1].size;
struct protocol_stack *stack = get_protocol_stack();
- int replenish_again;
+ int ret;
+ msg->result = -1;
struct lwip_sock *sock = lwip_get_socket(fd);
- if (POSIX_IS_CLOSED(sock)) {
- msg->result = -1;
+ if (unlikely(POSIX_IS_CLOSED(sock))) {
return;
}
@@ -605,8 +610,15 @@ static void callback_udp_send(struct rpc_msg *msg)
calculate_sock_latency(&stack->latency, sock, GAZELLE_LATENCY_WRITE_RPC_MSG);
}
- replenish_again = do_lwip_send(stack, sock->conn->callback_arg.socket, sock, len, 0);
- if ((replenish_again > 0) && (__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) == 1)) {
+ ret = lwip_send(fd, sock, len, 0);
+ if (unlikely(ret < 0) && (errno == ENOTCONN || errno == ECONNRESET || errno == ECONNABORTED)) {
+ __sync_fetch_and_sub(&sock->call_num, 1);
+ return;
+ }
+ msg->result = 0;
+
+ ret = do_lwip_replenish_sendring(stack, sock);
+ if (ret > 0 && (__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) == 1)) {
rpc_call_replenish(&stack->rpc_queue, sock);
return;
}
@@ -629,9 +641,8 @@ int rpc_call_udp_send(rpc_queue *queue, int fd, size_t len, int flags)
msg->args[MSG_ARG_0].i = fd;
msg->args[MSG_ARG_1].size = len;
msg->args[MSG_ARG_2].i = flags;
- msg->sync_flag = 0;
- rpc_call(queue, msg);
+ rpc_async_call(queue, msg);
return 0;
}
@@ -649,9 +660,8 @@ int rpc_call_tcp_send(rpc_queue *queue, int fd, size_t len, int flags)
msg->args[MSG_ARG_0].i = fd;
msg->args[MSG_ARG_1].size = len;
msg->args[MSG_ARG_2].i = flags;
- msg->sync_flag = 0;
- rpc_call(queue, msg);
+ rpc_async_call(queue, msg);
return 0;
}
@@ -663,7 +673,7 @@ static void callback_replenish_sendring(struct rpc_msg *msg)
msg->result = do_lwip_replenish_sendring(stack, sock);
if (msg->result == true) {
msg->recall_flag = 1;
- rpc_call(&stack->rpc_queue, msg);
+ rpc_async_call(&stack->rpc_queue, msg);
}
}
@@ -675,9 +685,8 @@ int rpc_call_replenish(rpc_queue *queue, void *sock)
}
msg->args[MSG_ARG_0].p = sock;
- msg->sync_flag = 0;
- rpc_call(queue, msg);
+ rpc_async_call(queue, msg);
return 0;
}
@@ -713,16 +722,17 @@ static void callback_clean_epoll(struct rpc_msg *msg)
list_del_node(&wakeup->wakeup_list[stack->stack_idx]);
}
-void rpc_call_clean_epoll(rpc_queue *queue, void *wakeup)
+int rpc_call_clean_epoll(rpc_queue *queue, void *wakeup)
{
struct rpc_msg *msg = rpc_msg_alloc(callback_clean_epoll);
if (msg == NULL) {
- return;
+ return -1;
}
msg->args[MSG_ARG_0].p = wakeup;
rpc_sync_call(queue, msg);
+ return 0;
}
static void callback_arp(struct rpc_msg *msg)
@@ -740,11 +750,9 @@ int rpc_call_arp(rpc_queue *queue, void *mbuf)
return -1;
}
- msg->sync_flag = 0;
msg->args[MSG_ARG_0].p = mbuf;
- rpc_call(queue, msg);
-
+ rpc_async_call(queue, msg);
return 0;
}
diff --git a/src/lstack/include/lstack_lwip.h b/src/lstack/include/lstack_lwip.h
index 0c7bb62..dcb7dac 100644
--- a/src/lstack/include/lstack_lwip.h
+++ b/src/lstack/include/lstack_lwip.h
@@ -29,15 +29,23 @@ unsigned same_node_ring_count(struct lwip_sock *sock);
#define NETCONN_IS_OUTIDLE(sock) gazelle_ring_readable_count((sock)->send_ring)
#define NETCONN_IS_UDP(sock) (NETCONNTYPE_GROUP(netconn_type((sock)->conn)) == NETCONN_UDP)
-void do_lwip_clone_sockopt(struct lwip_sock *dst_sock, struct lwip_sock *src_sock);
-
+/* lwip api */
struct pbuf *do_lwip_tcp_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size);
struct pbuf *do_lwip_udp_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size);
void do_lwip_get_from_sendring_over(struct lwip_sock *sock);
-bool do_lwip_replenish_sendring(struct protocol_stack *stack, struct lwip_sock *sock);
ssize_t do_lwip_read_from_lwip(struct lwip_sock *sock, int32_t flags, uint8_t apiflags);
-/* app write/read ring */
+/* lwip api */
+void do_lwip_free_pbuf(struct pbuf *pbuf);
+struct pbuf *do_lwip_alloc_pbuf(pbuf_layer layer, uint16_t length, pbuf_type type);
+
+/* lwip api */
+void do_lwip_add_recvlist(int32_t fd);
+/* stack api */
+void do_lwip_read_recvlist(struct protocol_stack *stack, uint32_t max_num);
+
+
+/* app api */
ssize_t do_lwip_sendmsg_to_stack(struct lwip_sock *sock, int32_t s,
const struct msghdr *message, int32_t flags);
ssize_t do_lwip_recvmsg_from_stack(int32_t s, const struct msghdr *message, int32_t flags);
@@ -47,17 +55,14 @@ ssize_t do_lwip_send_to_stack(int32_t fd, const void *buf, size_t len, int32_t f
ssize_t do_lwip_read_from_stack(int32_t fd, void *buf, size_t len, int32_t flags,
struct sockaddr *addr, socklen_t *addrlen);
-void do_lwip_read_recvlist(struct protocol_stack *stack, uint32_t max_num);
-void do_lwip_add_recvlist(int32_t fd);
-int do_lwip_send(struct protocol_stack *stack, int32_t fd, struct lwip_sock *sock,
- size_t len, int32_t flags);
+/* stack api */
+bool do_lwip_replenish_sendring(struct protocol_stack *stack, struct lwip_sock *sock);
+
+void do_lwip_clone_sockopt(struct lwip_sock *dst_sock, struct lwip_sock *src_sock);
uint32_t do_lwip_get_conntable(struct gazelle_stat_lstack_conn_info *conn, uint32_t max_num);
uint32_t do_lwip_get_connnum(void);
-void do_lwip_free_pbuf(struct pbuf *pbuf);
-struct pbuf *do_lwip_alloc_pbuf(pbuf_layer layer, uint16_t length, pbuf_type type);
-
void read_same_node_recv_list(struct protocol_stack *stack);
#endif
diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h
index fdd5388..08a3901 100644
--- a/src/lstack/include/lstack_protocol_stack.h
+++ b/src/lstack/include/lstack_protocol_stack.h
@@ -31,11 +31,8 @@
#include "lstack_tx_cache.h"
#define SOCK_RECV_RING_SIZE (get_global_cfg_params()->recv_ring_size)
-#define SOCK_RECV_FREE_THRES (32)
#define SOCK_RECV_RING_SIZE_MAX (2048)
#define SOCK_SEND_RING_SIZE_MAX (2048)
-#define SOCK_SEND_REPLENISH_THRES (16)
-#define WAKEUP_MAX_NUM (32)
#define MBUFPOOL_RESERVE_NUM (get_global_cfg_params()->nic.rxqueue_size + 1024)
@@ -113,7 +110,7 @@ struct protocol_stack_group {
long get_stack_tid(void);
struct protocol_stack *get_protocol_stack(void);
-struct protocol_stack *get_protocol_stack_by_fd(int32_t fd);
+struct protocol_stack *get_protocol_stack_by_fd(int fd);
struct protocol_stack *get_bind_protocol_stack(void);
struct protocol_stack_group *get_protocol_stack_group(void);
@@ -121,13 +118,13 @@ int get_min_conn_stack(struct protocol_stack_group *stack_group);
void bind_to_stack_numa(struct protocol_stack *stack);
void thread_bind_stack(struct protocol_stack *stack);
-int32_t stack_group_init(void);
+int stack_group_init(void);
void stack_group_exit(void);
void stack_exit(void);
-int32_t stack_setup_thread(void);
-int32_t stack_setup_app_thread(void);
+int stack_setup_thread(void);
+int stack_setup_app_thread(void);
-int stack_polling(uint32_t wakeup_tick);
+int stack_polling(unsigned wakeup_tick);
#endif
diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h
index c2654bb..6f8e03e 100644
--- a/src/lstack/include/lstack_thread_rpc.h
+++ b/src/lstack/include/lstack_thread_rpc.h
@@ -32,8 +32,6 @@ struct rpc_stats {
uint64_t call_alloc_fail;
};
-struct rpc_msg;
-typedef void (*rpc_msg_func)(struct rpc_msg *msg);
union rpc_msg_arg {
int i;
unsigned int u;
@@ -41,22 +39,25 @@ union rpc_msg_arg {
unsigned long ul;
void *p;
const void *cp;
- socklen_t socklen;
size_t size;
};
-struct rpc_msg_pool {
- struct rte_mempool *mempool;
-};
+
+struct rpc_msg;
+typedef void (*rpc_func_t)(struct rpc_msg *msg);
struct rpc_msg {
- pthread_spinlock_t lock; /* msg handler unlock notice sender msg process done */
int8_t sync_flag : 1;
int8_t recall_flag : 1;
- int64_t result; /* func return val */
- lockless_queue_node queue_node;
- struct rpc_msg_pool *rpcpool;
- rpc_msg_func func; /* msg handle func hook */
+ long result; /* func return val */
+ rpc_func_t func; /* msg handle func hook */
union rpc_msg_arg args[RPM_MSG_ARG_SIZE]; /* resolve by type */
+
+ struct rpc_msg_pool {
+ struct rte_mempool *mempool;
+ } *rpcpool;
+
+ pthread_spinlock_t lock; /* msg handler unlock notice sender msg process done */
+ lockless_queue_node queue_node;
};
static inline void rpc_queue_init(rpc_queue *queue)
@@ -92,7 +93,7 @@ int rpc_call_udp_send(rpc_queue *queue, int fd, size_t len, int flags);
int rpc_call_replenish(rpc_queue *queue, void *sock);
int rpc_call_recvlistcnt(rpc_queue *queue);
-void rpc_call_clean_epoll(rpc_queue *queue, void *wakeup);
+int rpc_call_clean_epoll(rpc_queue *queue, void *wakeup);
int rpc_call_arp(rpc_queue *queue, void *mbuf);
int rpc_call_conntable(rpc_queue *queue, void *conn_table, unsigned max_conn);
--
2.33.0
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化