加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
sbalance.c 50.30 KB
一键复制 编辑 原始数据 按行查看 历史

#include "sbatomic.h"
#include "sbutil.h"
#include "sbmain.h"
#include "sbparsehost.h"
#include <netinet/tcp.h>
#include <fcntl.h>
#include <string.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <errno.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <arpa/inet.h>
#include <time.h>
#include <stdarg.h>
unsigned long session_id = 1;
/* total live session */
int sb_accept_lock_mutex = 0;
/**** NOTICE:
1. event handler can use sb_finish_session and other function don't use sb_finish_session(even in event handler in which call other event handler)
2. in event handler when call other function(not event handler) find error, the other function will make why_finish reason
3. event handler just return SB_OK or SB_ERROR
4. when connecting server and occur error, write handler will be called
5. after connected server and occur error, sb_finish_session will be called
6. when client use shutdown sbalance will treat as client's use close, so the client will not recv any data
****/
/***
bug fix:
1. in process_event, when epoll_wait return EPOLLERR, we can't |EPOLLIN|EPOLLOUT and directly call read or write handler, a. sometimes read or write handler is null b. like connecting stage, want to call write's handler but call read's handler
**/
/**
add feature:
1. in sb_session struct add dns_list_node member for dns parse wait using
**/
int sb_accept_event(struct sb_cycle_env *env, struct sb_event *ev);
int sb_proxy_accept_event(struct sb_cycle_env *env, struct sb_event *ev);
int sb_transfer_read_not_send_event(struct sb_cycle_env *env, struct sb_event *e);
int sb_add_listen_event(struct sb_cycle_env *env);
int sb_del_listen_event(struct sb_cycle_env *env);
static int sig_cmd;
static int exiting = 0;
/*********** timer manage start **********/
extern void sb_process_expire_timer(struct sb_cycle_env *env);
extern unsigned long sb_find_lastest_timer();
extern void sb_timer_pop_event(struct sb_event *ev);
extern void sb_timer_push_event(struct sb_event *event);
/*********** timer manage end **********/
/*********** epoll start ************/
/* global var */
int epoll_fd;
struct epoll_event *epoll_wait_events;
#define EPOLL_MAX_WAITEVENTS_NUM 512
static int sb_epoll_init(struct sb_cycle_env *env)
{
int len;
epoll_fd = epoll_create(1024);
if (epoll_fd == -1) {
return SB_ERROR;
}
len = EPOLL_MAX_WAITEVENTS_NUM * sizeof(struct epoll_event);
epoll_wait_events = (struct epoll_event*)malloc(len);
if (epoll_wait_events == NULL) {
return SB_ERROR;
}
memset(epoll_wait_events, 0x00, len);
return SB_OK;
}
static int sb_epoll_done(struct sb_cycle_env *env)
{
if (close(epoll_fd) == -1)
return SB_ERROR;
epoll_fd = -1;
free(epoll_wait_events);
return SB_OK;
}
static int sb_epoll_add_event(struct sb_event* ev, int event)
{
struct sb_connection *c = NULL;
struct sb_event *e = NULL;
int events = 0;
int prev = 0;
int op = 0;
struct epoll_event ee;
events = event;
c = ev->data;
if (event & EPOLLIN) {
e = &c->write;
prev = EPOLLOUT;
}
else if (event & EPOLLOUT) {
e = &c->read;
prev = EPOLLIN;
}
if (e->active) {
op = EPOLL_CTL_MOD;
events = prev | event;
}
else {
op = EPOLL_CTL_ADD;
}
memset(&ee, 0x00, sizeof(struct epoll_event));
ee.events = (events | EPOLLET | EPOLLERR);
ee.data.ptr = c;
if (epoll_ctl(epoll_fd, op, c->sockfd, &ee) == -1) {
return SB_ERROR;
}
ev->active = 1;
ev->event_type = EVENT_TYPE_SOCKET;
return SB_OK;
}
static int sb_epoll_add_listen_event(struct sb_event* ev)
{
struct sb_listen *li = NULL;
int op = 0;
struct epoll_event ee;
li = ev->data;
if (ev->active) {
return 0;
}
op = EPOLL_CTL_ADD;
memset(&ee, 0x00, sizeof(struct epoll_event));
ee.events = EPOLLIN | EPOLLET | EPOLLERR;
ee.data.ptr = (void*)((unsigned long)li|0x1);
if (epoll_ctl(epoll_fd, op, li->listen_fd, &ee) == -1) {
return SB_ERROR;
}
ev->active = 1;
ev->event_type = EVENT_TYPE_LISTEN;
return SB_OK;
}
static int sb_epoll_del_event(struct sb_event* ev, int event)
{
struct sb_connection *c = NULL;
struct sb_event *e = NULL;
int events = 0;
int prev = 0;
int op = 0;
struct epoll_event ee;
c = ev->data;
if (event == EPOLLIN) {
e = &c->write;
prev = EPOLLOUT;
}
else if (event == EPOLLOUT) {
e = &c->read;
prev = EPOLLIN;
}
if (e->active) {
op = EPOLL_CTL_MOD;
events = prev;
}
else {
op = EPOLL_CTL_DEL;
}
memset(&ee, 0x00, sizeof(struct epoll_event));
ee.events = events | EPOLLET | EPOLLERR;
ee.data.ptr = c;
if (epoll_ctl(epoll_fd, op, c->sockfd, &ee) == -1) {
return SB_ERROR;
}
ev->active = 0;
return 0;
}
static int sb_epoll_del_listen_event(struct sb_event* ev)
{
struct sb_listen *li = NULL;
int op = 0;
struct epoll_event ee;
li = ev->data;
/*
ev->handler = NULL;
*/
if (ev->active == 0) {
return 0;
}
op = EPOLL_CTL_DEL;
memset(&ee, 0x00, sizeof(struct epoll_event));
ee.events = EPOLLIN | EPOLLET | EPOLLERR;
ee.data.ptr = NULL;
if (epoll_ctl(epoll_fd, op, li->listen_fd, &ee) == -1) {
return SB_ERROR;
}
ev->active = 0;
return 0;
}
/* @RETURN the number of wait events or SB_OK, SB_ERROR */
static int sb_epoll_process_event(struct sb_cycle_env *env, int timeout, void *arg)
{
int err = 0;
int events_num;
int i = 0;
struct sb_event *e = NULL;
void *event_data_ptr = NULL;
struct sb_connection *c = NULL;
struct sb_session *sess = NULL;
struct sb_listen *li = NULL;
int is_listen_type = 0;
int events;
events_num = epoll_wait(epoll_fd, epoll_wait_events, EPOLL_MAX_WAITEVENTS_NUM, timeout);
err = (events_num == -1) ? errno : 0;
if (err) {
if (err == EINTR) {
return SB_OK;
}
else {
return SB_ERROR;
}
}
for (i = 0; i < events_num; i++) {
event_data_ptr = epoll_wait_events[i].data.ptr;
events = epoll_wait_events[i].events;
/* if recive err force read or write handler can be called */
if (events & (EPOLLERR | EPOLLHUP))
events |= (EPOLLIN | EPOLLOUT);
/*******
if (events & (EPOLLERR | EPOLLHUP)) {
events |= (EPOLLIN | EPOLLOUT);
if (((unsigned long)event_data_ptr & 0x1) == 1) {
li = (struct sb_listen*)((unsigned long)event_data_ptr & ~1);
e = &li->listen_event;
if (!e->handler) {
InfoOutput("listen ip[%s]-port[%d] event's handler is null", li->listen_address.ip, li->listen_address.port);
}
else
e->handler(env, e);
}
else {
c = (struct sb_connection*)event_data_ptr;
sess = c->session;
if (sess->had_finish) {
continue;
}
if (c->connect_type == CONNECT_TYPE_SERVER) {
(void)sb_finish_session(env, sess);
ErrorOutput("client ip[%s] port[%d] occur error when connected", c->net_address.ip, c->net_address.port);
}
}
*********/
/* TODO: put all events into list and later in main loop process handler */
/* because a connection has two event's so read/write just process correspoding event */
if (events & EPOLLIN) {
if (((unsigned long)event_data_ptr & 0x1) == 1) {
li = (struct sb_listen*)((unsigned long)event_data_ptr & ~1);
e = &li->listen_event;
is_listen_type = 1;
}
else {
c = (struct sb_connection*)event_data_ptr;
sess = c->session;
if (sess->had_finish) {
continue;
}
e = &c->read;
}
if (sb_event_is_active(e)) {
if (!e->handler) {
if (is_listen_type) {
FatalOutput("listen ip[%s]-port[%d] event's handler is null", li->listen_address.ip, li->listen_address.port);
}
else {
FatalOutput("socket ip[%s]-port[%d] event's read handler is null", c->net_address.ip, c->net_address.port);
}
}
e->handler(env, e);
}
}
if (events & EPOLLOUT) {
c = (struct sb_connection*)event_data_ptr;
sess = c->session;
if (sess->had_finish){
continue;
}
e = &c->write;
if (sb_event_is_active(e)) {
if (!e->handler) {
InfoOutput("socket ip[%s]-port[%d] event's write handler is null", c->net_address.ip, c->net_address.port);
}
e->handler(env, e);
}
}
}
return events_num;
}
struct sb_event_action event_action =
{
sb_epoll_init,
sb_epoll_done,
sb_epoll_add_event,
sb_epoll_del_event,
sb_epoll_add_listen_event,
sb_epoll_del_listen_event,
sb_epoll_process_event
};
/*********** epoll start ************/
int sb_add_read_event(struct sb_event *ev, int read_handler(struct sb_cycle_env*, struct sb_event*))
{
ev->handler = read_handler;
return add_event(ev, EPOLLIN);
}
int sb_add_write_event(struct sb_event *ev, int write_handler(struct sb_cycle_env*, struct sb_event*))
{
ev->handler = write_handler;
return add_event(ev, EPOLLOUT);
}
int sb_del_read_event(struct sb_event *ev)
{
ev->handler = NULL;
return del_event(ev, EPOLLIN);
}
int sb_del_write_event(struct sb_event *ev)
{
ev->handler = NULL;
return del_event(ev, EPOLLOUT);
}
/*********** session manager start *******/
void sb_list_push_session(struct sb_session **list, struct sb_session *sess)
{
if (*list == NULL) {
sess->next = sess;
sess->prev = sess;
*list = sess;
}
else {
/* push in first */
sess->next = *list;
sess->prev = (*list)->prev;
(*list)->prev = sess;
sess->prev->next = sess;
(*list) = sess;
}
}
struct sb_session* sb_list_pop_session(struct sb_session **list)
{
struct sb_session *sess = NULL;
if (*list == NULL)
return NULL;
sess = *list;
sb_list_delete_session(list, sess);
/* zero the reused session */
memset(sess, 0x00, sizeof(struct sb_session));
INIT_LIST_HEAD(&sess->dns_list_node);
return sess;
}
void sb_list_delete_session(struct sb_session **list, struct sb_session *sess)
{
if (sess == NULL)
return;
if (*list == NULL) {
FatalOutput("delete session from list, but list is null");
return;
}
/* only one node */
if (sess->next == sess && sess->prev == sess)
*list = NULL;
else {
int is_first = 0;
struct sb_session *prev;
struct sb_session *next;
prev = sess->prev;
next = sess->next;
/* the deleted node is the first one */
if (*list == sess) {
is_first = 1;
}
next->prev = prev;
prev->next = next;
if (is_first == 1)
*list = next;
}
sess->next = NULL;
sess->prev = NULL;
}
void sb_merge_reused_session(struct sb_cycle_env *env)
{
struct sb_session *list_head = NULL;
struct sb_session *new_list_head = NULL;
struct sb_session *temp_ptr = NULL;
new_list_head = env->new_reuse_session_list;
if (new_list_head == NULL) return;
if (env->reuse_session_list == NULL) {
env->reuse_session_list = new_list_head;
}
else {
list_head = env->reuse_session_list;
list_head->prev->next = new_list_head;
new_list_head->prev->next = list_head;
temp_ptr = list_head->prev;
list_head->prev = new_list_head->prev;
new_list_head->prev = temp_ptr;
}
env->new_reuse_session_list = NULL;
}
struct sb_session* sb_new_session(struct sb_cycle_env *env)
{
int err = 0;
struct sb_session *sess;
sess = (struct sb_session*)malloc(sizeof(struct sb_session));
if (sess == NULL) {
ErrorOutput("malloc failed");
return NULL;
}
memset(sess, 0x00, sizeof(struct sb_session));
sess->client.session = sess;
sess->server.session = sess;
sess->client.read.data = (void*)&sess->client;
sess->client.write.data = (void*)&sess->client;
sess->server.read.data = (void*)&sess->server;
sess->server.write.data = (void*)&sess->server;
sess->client.connect_type = CONNECT_TYPE_CLIENT;
sess->server.connect_type = CONNECT_TYPE_SERVER;
INIT_LIST_HEAD(&sess->dns_list_node);
return sess;
}
void sb_destory_session(struct sb_session *sess)
{
if (sess)
free(sess);
sess = NULL;
}
struct sb_session* sb_get_session(struct sb_cycle_env *env)
{
struct sb_session *sess = NULL;
sess = sb_list_pop_session(&env->reuse_session_list);
if (sess == NULL) {
sess = sb_new_session(env);
}
else {
env->reuse_sessoion_num--;
}
if (sess == NULL)
return NULL;
else {
sb_list_push_session(&env->active_session_list, sess);
env->active_session_num++;
sess->sessionid = session_id++;
}
return sess;
}
/*********** session manager end *******/
void sb_push_accept_delay_timer(struct sb_cycle_env* env)
{
env->accept_delay_timer.timeout_time = sb_get_msec_timeout_time(env->accept_delay_timeout);
if (env->accept_delay_timer.in_timer_set == 0) {
sb_timer_push_event(&env->accept_delay_timer);
}
}
/* @RETURN 3: equal 2: port equal 1: ip equal */
int sb_check_listening_equal(struct sb_listen *listen1, struct sb_listen *listen2)
{
int rc = 0;
if (strcmp(listen1->listen_address.ip, listen2->listen_address.ip) == 0) {
rc = 1;
}
if (listen1->listen_address.port == listen2->listen_address.port) {
rc += 2;
}
return rc;
}
int sb_write_empty_event(struct sb_cycle_env *env, struct sb_event *ev)
{
int rc = 0;
InfoOutput("sb_write_empty_event\n");
return 0;
}
int sb_read_empty_event(struct sb_cycle_env *env, struct sb_event *ev)
{
int rc = 0;
InfoOutput("sb_read_empty_event\n");
return 0;
}
/* @RETURN always SB_OK */
int sb_finish_session(struct sb_cycle_env *env, struct sb_session *sess)
{
struct sb_connection *cli_c;
struct sb_connection *ser_c;
env->total_session++;
if (sess->had_finish){
return SB_OK;
}
sess->had_finish = 1;
cli_c = &sess->client;
ser_c = &sess->server;
/* unregister event */
if (sb_event_is_active(&cli_c->read)) {
sb_del_read_event(&cli_c->read);
}
if (sb_event_is_active(&cli_c->write)) {
sb_del_write_event(&cli_c->write);
}
if (sb_event_is_active(&ser_c->read)) {
sb_del_read_event(&ser_c->read);
}
if (sb_event_is_active(&ser_c->write)) {
sb_del_write_event(&ser_c->write);
}
/* close client and server socket */
if (sess->client.sockfd > 0) {
close(sess->client.sockfd);
sess->client.sockfd = 0;
}
if (sess->server.sockfd > 0) {
close(sess->server.sockfd);
sess->server.sockfd = 0;
}
/* delete timer */
if (cli_c->read.in_timer_set) {
sb_timer_pop_event(&cli_c->read);
}
if (cli_c->write.in_timer_set) {
sb_timer_pop_event(&cli_c->write);
}
if (ser_c->read.in_timer_set) {
sb_timer_pop_event(&ser_c->read);
}
if (ser_c->write.in_timer_set) {
sb_timer_pop_event(&ser_c->write);
}
/* mv session from active list to reuse list */
sb_list_delete_session(&env->active_session_list, sess);
env->active_session_num--;
sb_list_push_session(&env->new_reuse_session_list, sess);
env->reuse_sessoion_num++;
/* remove session from dns parse waiting list */
if (sess->dns_list_node.next != &sess->dns_list_node) {
list_del_init(&sess->dns_list_node);
}
return SB_OK;
}
/* @RETURN (1. NULL: not allow 2. server) */
struct sb_forward_rule* sb_get_rule_by_client_addr(struct sb_cycle_env *env, char *ip, int port)
{
int ip1 = -1, ip2 = -1, ip3 = -1, ip4 = -1;
int i = 0, j;
struct sb_forward_rule *rule = NULL;
struct sb_conf_range_address *range_addr = NULL;
sscanf(ip, "%d.%d.%d.%d", &ip1, &ip2, &ip3, &ip4);
for (i = 0; i < env->rule_num; i++) {
rule = env->rules[i];
for (j = 0; j < rule->client_num; j++) {
range_addr = &rule->client_addr[j];
if (range_addr->port == port || range_addr->port == CONF_INT_ANY) {
if (range_addr->ip1_start == CONF_INT_ANY) {
return rule;
}
else if (
( range_addr->ip1_start <= ip1 && ip1 <= range_addr->ip1_end) &&
( range_addr->ip2_start <= ip2 && ip2 <= range_addr->ip2_end) &&
( range_addr->ip3_start <= ip3 && ip3 <= range_addr->ip3_end) &&
( range_addr->ip4_start <= ip4 && ip4 <= range_addr->ip4_end)
)
{
return rule;
}
}
}
}
return NULL;
}
struct sb_conf_server* sb_get_server(struct sb_cycle_env *env, struct sb_forward_rule *rule, char *ip, int port)
{
char ip_port[128];
memset(ip_port, 0x00, sizeof(ip_port));
sprintf(ip_port, "%s:%d", ip, port);
return sb_load_balance_get_server(env, rule, ip_port);
}
/* EVENT HANDLER: @RETURN SB_TIMEOUT, SB_ERROR, SB_OK */
int sb_test_connect_complete_event(struct sb_cycle_env *env, struct sb_event *e)
{
int err = 0, len = sizeof(int);
int r;
struct sb_connection *c;
struct sb_session *sess;
c = (struct sb_connection*)e->data;
sess = c->session;
if (e->timeout) {
if (c->write.in_timer_set) {
sb_timer_pop_event(&c->write);
ErrorOutput("delete timeout");
}
/**** (void)sb_finish_session(env, sess); *****/
(void)sb_connect_server_failed(env, sess);
ErrorOutput("client[%s]-server[%s] connect timtout", sess->client.net_address.ip, sess->server.net_address.ip);
return SB_TIMEOUT;
}
r = getsockopt(c->sockfd, SOL_SOCKET, SO_ERROR, (void *) &err, &len);
if (r || err) {
/**** (void)sb_finish_session(env, sess); ****/
(void)sb_connect_server_failed(env, sess);
ErrorOutput("cli[%s]-ser[%s] make transfer link failed, err[%d]", sess->client.net_address.ip, sess->server.net_address.ip, err);
return SB_ERROR;
}
else {
/* ok, finally success */
err = sb_connect_server_ok(env, sess);
if (err) {
if (err == SB_DONE) {
ErrorOutput("client[%s]-server[%s], client request close before connect server ok", sess->client.net_address.ip, sess->server.net_address.ip);
return SB_OK;
}
else {
(void)sb_finish_session(env, sess);
ErrorOutput("client[%s]-server[%s], in client'read occur error before connect server ok", sess->client.net_address.ip, sess->server.net_address.ip);
return SB_ERROR;
}
}
else {
DebugOutput("client[%s]-server[%s] make transfer link success", sess->client.net_address.ip, sess->server.net_address.ip);
return SB_OK;
}
}
}
/*
after connect, register timer, event, change event handler
@RETURN: SB_OK, SB_ERROR, SB_DONE(when client close socket and send no data)
*/
/* 1. add server's read event(not need add client's read event, had be added in previous)
2. before connect server ok had read some data, add server's write event
(of course, when server's write event is in epoll[sb_test_connect_complete_event],
firstly delete it, then add[we are in EPOLLET mode])
3. before connect server ok, client use short tcp connection and close or shutdown socket,
4. when client close socket and send no data return SB_DONE, the caller should finish the session
*/
int sb_connect_server_ok(struct sb_cycle_env *env, struct sb_session *sess)
{
struct sb_event *cli_re;
struct sb_event *ser_re;
struct sb_event *ser_we;
struct sb_event *cli_we;
int err;
cli_re = &sess->client.read;
cli_we = &sess->client.write;
ser_re = &sess->server.read;
ser_we = &sess->server.write;
env->connecting_server_num--;
sess->connected_stage = 1;
/* delete server write timer */
if (ser_we->in_timer_set) {
sb_timer_pop_event(ser_we);
}
/* client shutdown(normal close or read error) */
if (cli_re->error) {
sess->why_finish = FINISH_REASON_CLIENTREADERR;
return SB_ERROR;
}
else if (cli_re->shutdown) {
/* NOTICE: must del event and add event, use epoll ET mode, possible write event had raise, origin is sb_test_connect_complete_event
and just change handler will not trigger event */
if (sb_event_is_active(ser_we)) {
if (sb_del_write_event(ser_we)) {
ErrorOutput("connect server ok, but delete write event failed\n");
sess->why_finish = FINISH_REASON_PROCESSEVENTERR;
return SB_ERROR;
}
}
if (sess->cs_used != 0) { /* between connecting server's time window, read some client data, now change server's write event */
/* change server's handler: sb_test_connect_complete_event -> sb_transfer_write_event */
if (sb_add_write_event(ser_we, sb_transfer_write_event)) {
ErrorOutput("connect server ok, but add write event failed\n");
sess->why_finish = FINISH_REASON_PROCESSEVENTERR;
return SB_ERROR;
}
return SB_OK;
}
else {
/* notice: the event handler that call this function will finish session
so here not need finish the session */
return SB_DONE;
}
}
else {
/* add server read event */
if (sb_add_read_event(ser_re, sb_transfer_read_event)) {
ErrorOutput("add read event failed\n");
sess->why_finish = FINISH_REASON_PROCESSEVENTERR;
return SB_ERROR;
}
/* change server's handler: sb_transfer_read_not_send_event -> sb_transfer_read_event */
cli_re->handler = sb_transfer_read_event;
/* NOTICE: must del event and add event, use epoll ET mode, possible write event had raise
and just change handler will not trigger event */
/* must check if active, when in sb_accept_event's sb_try_connect_server function success, server write not register */
if (sb_event_is_active(ser_we)) {
if (sb_del_write_event(ser_we)) {
ErrorOutput("connect server ok, but delete write event failed\n");
sess->why_finish = FINISH_REASON_PROCESSEVENTERR;
return SB_ERROR;
}
}
if (sess->cs_used != 0) { /* between connecting server's time window, read some client data, now change server's write event */
/* change server's handler: sb_test_connect_complete_event -> sb_transfer_write_event */
if (sb_add_write_event(ser_we, sb_transfer_write_event)) {
ErrorOutput("connect server ok, but add write event failed\n");
sess->why_finish = FINISH_REASON_PROCESSEVENTERR;
return SB_ERROR;
}
return SB_OK;
}
return SB_OK;
}
}
/* @RETURN: SB_AGAIN: had failed and now try again */
int sb_connect_server_failed(struct sb_cycle_env *env, struct sb_session *sess)
{
int rc = 0;
if (sess->had_failed == 0) {
struct sb_conf_server *server = NULL;
sess->had_failed = 1;
server = sb_get_server(env, sess->select_rule, sess->client.net_address.ip, sess->client.net_address.port);
if (!server) {
sess->why_finish = FINISH_REASON_CANNOTGETSERVER;
return sb_finish_session(env, sess);
}
else {
/* @RETURN SB_DECLINE(resource not enough or client ip not allow), SB_ERROR,
SB_CONNECTERR(connect error), SB_OK(connect ok), SB_AGAIN(in progress) */
sess->select_server = server;
if ((rc = sb_try_connect_server(env, sess))) {
return sb_finish_session(env, sess);
}
else {
/* only here can live again */
return rc;
}
}
}
else {
sess->why_finish = FINISH_REASON_CONNECTSERVERERR;
return sb_finish_session(env, sess);
}
}
int sb_dump_session_buf(struct sb_session *sess)
{
int rc = 0;
strcpy(sess->cs_buf, "GET / HTTP/1.0\r\n\r\n");
sess->cs_used = strlen("GET / HTTP/1.0\r\n\r\n");
return 0;
}
int sb_get_rule_and_server(struct sb_cycle_env *env, struct sb_session *sess)
{
struct sb_connection *client = &sess->client;
sess->select_rule = sb_get_rule_by_client_addr(env, client->net_address.ip, client->net_address.port);
if (sess->select_rule == NULL) {
DebugOutput("client %s:%d not allow\n", client->net_address.ip, client->net_address.port);
sess->why_finish = FINISH_REASON_CLIENTNOTALLOW;
return SB_DECLINE;
}
sess->select_server = sb_get_server(env, sess->select_rule, client->net_address.ip, client->net_address.port);
if (sess->select_server == NULL) {
ErrorOutput("rule %s, ip[%s]:port[%d] can't get server\n", sess->select_rule->rule_name, client->net_address.ip,
client->net_address.port);
return SB_CONNECTERR;
}
return SB_OK;
}
/* use session's server ip address and port */
/* add server write timer, the handler is sb_test_connect_complete_event */
/* @RETURN SB_DECLINE(resource not enough or client ip not allow), SB_ERROR,
SB_CONNECTERR(connect error), SB_OK(connect ok), SB_AGAIN(in progress) */
int sb_try_connect_server(struct sb_cycle_env *env, struct sb_session *sess)
{
int fd = 0;
struct sockaddr_in addr;
unsigned int addr_len;
struct sb_connection *c; /* server connection */
struct sb_connection *client; /* client connection */
struct sb_event *we;
struct sb_event *re;
int err;
c = &sess->server;
client = &sess->client;
we = &c->write;
re = &c->read;
if (sess->had_failed == 0) {
c->sockfd = 0;
if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0 ) {
if (errno == EMFILE || errno == ENFILE) {
/* delay accept */
if (env->accept_delay_timeout > 0) {
sb_push_accept_delay_timer(env);
}
ErrorOutput("too many fd, access system limit, ip[%s]'s request is cut", sess->client.net_address.ip);
}
else
ErrorOutput("connect server, get socket fd failed, ip[%s]'s request is cut", sess->client.net_address.ip);
return SB_ERROR;
}
c->sockfd = fd;
}
/***********
sess->select_rule = sb_get_rule_by_client_addr(env, client->net_address.ip, client->net_address.port);
if (sess->select_rule == NULL) {
DebugOutput("client %s:%d not allow\n", client->net_address.ip, client->net_address.port);
sess->why_finish = FINISH_REASON_CLIENTNOTALLOW;
return SB_DECLINE;
}
sess->select_server = sb_get_server(env, sess->select_rule, client->net_address.ip, client->net_address.port);
if (sess->select_server == NULL) {
ErrorOutput("rule %s, ip[%s]:port[%d] can't get server\n", sess->select_rule->rule_name, client->net_address.ip,
client->net_address.port);
return SB_CONNECTERR;
}
*********/
/* sb_conf_simple_address -> sb_net_address */
strcpy(c->net_address.ip, sess->select_server->simple_addr.simple_ip);
c->net_address.port = sess->select_server->simple_addr.port;
memset(&addr, 0x00, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(c->net_address.port);
if (inet_pton(AF_INET, c->net_address.ip, &addr.sin_addr.s_addr) != 1) {
ErrorOutput("inet_pton failed, errno[%d]", errno);
sess->why_finish = FINISH_REASON_SYSTEMCALLERR;
return SB_ERROR;
}
addr_len = sizeof(struct sockaddr_in);
memcpy(&c->net_address.sockaddr, &addr, addr_len);
sb_set_nonblocking(c->sockfd);
sb_set_reuseaddr(c->sockfd, 1);
sb_set_tcpkeepalive(c->sockfd);
if (connect(c->sockfd, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
if (errno != EINPROGRESS) {
ErrorOutput("connect server[%s] failed, errno[%d]", sess->server.net_address.ip, errno);
sess->why_finish = FINISH_REASON_CONNECTSERVERERR;
return SB_CONNECTERR;
}
}
/* can't know the connect result
so register the server's read event and set handler */
if (errno == EINPROGRESS) {
/* re->handler = sb_test_connect_complete_event; */
if (sb_event_is_active(we)) {
if (sb_del_write_event(we)) {
ErrorOutput("connect ip[%s]:port[%d] delete write event failed, errno[%d]", c->net_address.ip, c->net_address.port, errno);
sess->why_finish = FINISH_REASON_PROCESSEVENTERR;
return SB_ERROR;
}
}
if (sb_add_write_event(we, sb_test_connect_complete_event)) {
ErrorOutput("connect ip[%s]:port[%d] add write event failed, errno[%d]", c->net_address.ip, c->net_address.port, errno);
sess->why_finish = FINISH_REASON_PROCESSEVENTERR;
return SB_ERROR;
}
/* add timer, the handler is sb_test_connect_complete_event */
we->timeout_time = sb_get_msec_timeout_time(sess->select_rule->connect_timeout);
sb_timer_push_event(we);
if (sess->had_failed == 0)
env->connecting_server_num++;
return SB_AGAIN;
}
else { /* connect success */
err = sb_connect_server_ok(env, sess);
if (err) {
if (err == SB_DONE) {
DebugOutput("client[%s]-server[%s], client request close before connect server ok",
sess->client.net_address.ip, sess->server.net_address.ip);
return SB_OK;
}
else {
(void)sb_finish_session(env, sess);
ErrorOutput("client[%s]-server[%s], in client'read occur error before connect server ok",
sess->client.net_address.ip, sess->server.net_address.ip);
return SB_ERROR;
}
}
else {
InfoOutput("client[%s]:[%d]--[%d]local[%d]--server[%s]:[%d] link success", sess->client.net_address.ip,
sess->client.net_address.port, sess->client.sockfd,sess->server.sockfd,
sess->server.net_address.ip, sess->server.net_address.port);
return SB_OK;
}
}
}
/* EVENT HANDLER */
int sb_accept_event(struct sb_cycle_env *env, struct sb_event *ev)
{
int err;
int new_fd;
struct sockaddr_in new_addr;
int new_addr_len;
struct sb_session *new_session;
struct sb_listen *li;
li = ev->data;
while(1) {
/* eat too many resource need wait */
if (env->accept_dalay_flag == 1)
break;
new_addr_len = sizeof(struct sockaddr_in);
new_fd = accept(li->listen_fd, (struct sockaddr*)&new_addr, &new_addr_len);
if (new_fd == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
DebugOutput("accept EAGAIN");
}
else if (errno == EMFILE || errno == ENFILE) {
/* need wait for a moment */
/* delay accept */
if (env->accept_delay_timeout > 0) {
sb_push_accept_delay_timer(env);
}
ErrorOutput("too many fd, access the system limit");
}
else {
ErrorOutput("listen ip[%s]-port[%d]-listen_fd[%d] accept failed",
li->listen_address.ip,
li->listen_address.port,
li->listen_fd );
}
break;
}
else {
struct sb_connection *cli_c;
struct sb_connection *ser_c;
struct sb_event *cli_re;
struct sb_event *cli_we;
struct sb_event *ser_re;
struct sb_event *ser_we;
/* try two times */
new_session = sb_get_session(env);
if (new_session == NULL) {
new_session = sb_get_session(env);
}
if (new_session == NULL) {
ErrorOutput("get session failed");
/* close the new socket */
/* can't get session, need wait for a moment */
close(new_fd);
break;
}
cli_c = &new_session->client;
cli_c->connect_type = CONNECT_TYPE_CLIENT;
ser_c = &new_session->server;
ser_c->connect_type = CONNECT_TYPE_SERVER;
cli_c->session = new_session;
ser_c->session = new_session;
cli_re = &cli_c->read;
cli_re->data = cli_c;
cli_we = &cli_c->write;
cli_we->data = cli_c;
ser_re = &ser_c->read;
ser_re->data = ser_c;
ser_we = &ser_c->write;
ser_we->data = ser_c;
/* set cli sockpt */
sb_set_nonblocking(new_fd);
sb_set_reuseaddr(new_fd, 1);
sb_set_tcpkeepalive(new_fd);
/* get client address info */
cli_c->sockfd = new_fd;
memcpy(&cli_c->net_address.sockaddr, &new_addr, new_addr_len);
strcpy(cli_c->net_address.ip, inet_ntoa(new_addr.sin_addr));
cli_c->net_address.port = ntohs(new_addr.sin_port );
/* select rule and server */
if ((err = sb_get_rule_and_server(env, new_session))) {
(void)sb_finish_session(env, new_session);
continue;
}
InfoOutput("listen ip[%s]-port[%d] accept connect ip[%s]-port[%d]", li->listen_address.ip, li->listen_address.port,
cli_c->net_address.ip, cli_c->net_address.port);
/* add client's read event, sometime connect take a long time and client close connection before
connect server or success */
if (sb_add_read_event(cli_re, sb_transfer_read_not_send_event)) {
new_session->why_finish = FINISH_REASON_PROCESSEVENTERR;
(void)sb_finish_session(env, new_session);
}
/* connect to server */
/* @RETURN SB_DECLINE(resource not enough or client ip not allow), SB_ERROR(error)
SB_CONNECTERR(connect error), SB_OK(connect ok), SB_AGAIN(in progress) */
err = sb_try_connect_server(env, new_session);
if (err == SB_DECLINE || err == SB_ERROR) {
(void)sb_finish_session(env, new_session);
}
else if (err == SB_CONNECTERR) {
(void)sb_connect_server_failed(env, new_session);
}
else {
/* success */
}
}
}
return SB_OK;
}
unsigned int sb_get_cs_available_len(struct sb_session *sess)
{
return (MAXIOBUFLEN - sess->cs_used);
}
char* sb_get_cs_available_start_ptr(struct sb_session *sess)
{
return (sess->cs_buf + sess->cs_used);
}
void sb_add_cs_used_len(struct sb_session *sess, unsigned int len)
{
sess->cs_used += len;
}
unsigned int sb_get_sc_available_len(struct sb_session *sess)
{
return (MAXIOBUFLEN_2 - sess->sc_used);
}
char* sb_get_sc_available_start_ptr(struct sb_session *sess)
{
return (sess->sc_buf + sess->sc_used);
}
void sb_add_sc_used_len(struct sb_session *sess, unsigned int len)
{
sess->sc_used += len;
}
/*
@RETURN: 1. SB_BUFNOTENOUGH 2. SB_AGAIN 3. SB_ERROR(errno) 4. SB_DONE(fixme:client use shutdown(SHUT_WR) 5.SB_TIMEOUT
*/
int sb_recv(struct sb_connection *c)
{
int rc;
unsigned int available;
unsigned int recv_len;
int sockfd;
struct sb_session *sess = NULL;
char *ptr = NULL;
sess = c->session;
rc = SB_OK;
sockfd = c->sockfd;
recv_len = 0;
if (c->connect_type == CONNECT_TYPE_CLIENT) {
ptr = sb_get_cs_available_start_ptr(sess);
available = sb_get_cs_available_len(sess);
}
else if(c->connect_type == CONNECT_TYPE_SERVER) {
ptr = sb_get_sc_available_start_ptr(sess);
available = sb_get_sc_available_len(sess);
}
for (;;) {
if (available <= 0) {
rc = SB_BUFNOTENOUGH;
break;
}
rc = recv(sockfd, ptr + recv_len, available, 0);
if (rc == -1) {
if (errno == EINTR)
continue;
else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
rc = SB_AGAIN;
}
/***
else if (errno == ETIMEDOUT) {
rc = SB_TIMEOUT;
}
***/
else {
ErrorOutput("[%s]:[%d]-socket[%d] occur recv failed, errno[%d]", c->net_address.ip, c->net_address.port, c->sockfd, errno);
rc = SB_ERROR;
}
break;
}
}
else if (rc == 0) {
rc = SB_DONE;
break;
}
else {
recv_len += rc;
available -= rc;
}
}
#if 0
int i;
int cnt=0;
if (c->connect_type == CONNECT_TYPE_SERVER) {
printf("RECV FROM CLIENT++++++++++++++++++\n");
}
else {
printf("RECV FROM CLIENT---------------\n");
FILE *file = fopen("sb.log", "a+");
fprintf(file, "SESSIONID:%lu:", sess->sessionid);
fwrite(ptr, 1, recv_len, file);
fclose(file);
}
#endif
if (c->connect_type == CONNECT_TYPE_CLIENT) {
sess->cs_used += recv_len;
}
else if(c->connect_type == CONNECT_TYPE_SERVER) {
sess->sc_used += recv_len;
}
return rc;
}
/*
@RETURN: 1. SB_AGAIN 2. SB_ERROR(errno) *3. SB_DONE(no use)* 4. SB_OK
*/
int sb_send(struct sb_connection *c)
{
int rc;
int sockfd;
struct sb_session *sess = NULL;
unsigned int sent_len;
unsigned int used = 0;
char *origin_ptr = NULL, *ptr = NULL;
sess = c->session;
rc = SB_OK;
sockfd = c->sockfd;
sent_len = 0;
if (c->connect_type == CONNECT_TYPE_CLIENT) {
origin_ptr = ptr = sess->sc_buf;
used = sess->sc_used;
}
else if(c->connect_type == CONNECT_TYPE_SERVER) {
origin_ptr = ptr = sess->cs_buf;
used = sess->cs_used;
}
for (;;) {
if (used == 0) {
rc = SB_OK;
break;
}
rc = send(sockfd, ptr + sent_len, used, 0);
if (rc == -1) {
if (errno == EINTR)
continue;
else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
rc = SB_AGAIN;
}
else {
ErrorOutput("connect[%s] occur session failed, err[%d]", c->net_address.ip, errno);
rc = SB_ERROR;
}
break;
}
}
else if (rc == 0) {
/* ATTENTION: */
rc = SB_ERROR;
break;
}
else {
used -= rc;
sent_len += rc;
}
}
/***
int i;
int cnt=0;
if (c->connect_type == CONNECT_TYPE_CLIENT) {
printf("SEND TO CLIENT++++++++++++++++++\n");
}
else
printf("SEND TO SERVER---------------\n");
for (i = 0; i < sent_len; i++) {
if (origin_ptr[i] >= 32 && origin_ptr[i] <= 126) {
printf("%c ", origin_ptr[i]);
}
else{
printf("0x%x ", (unsigned char)origin_ptr[i]);
}
cnt++;
if (cnt==32) {
printf("\n");
}
}
printf("\n");
if (c->connect_type == CONNECT_TYPE_CLIENT)
printf("SEND TO CLIENT END===================\n");
else
printf("SEND TO SERVER END===================\n");
**/
if (c->connect_type == CONNECT_TYPE_CLIENT) {
sess->sc_used = used;
}
else if(c->connect_type == CONNECT_TYPE_SERVER) {
sess->cs_used = used;
}
if (used > 0) {
memmove(origin_ptr, origin_ptr + sent_len, used);
}
return rc;
}
int sb_linux_recv(struct sb_connection *c, char *buf, int available)
{
int err = 0;
int n;
do {
n = recv(c->sockfd, buf, available, 0);
if (n >= 0) {
return n;
}
err = errno;
if (err != EAGAIN && err != EINTR) {
ErrorOutput("connect[%s] occur recv failed, err[%d]", c->net_address.ip, err);
break;
}
} while(err == EINTR);
/* err can be 0, EAGAIN */
return err;
}
int sb_linux_send(struct sb_connection *c, char *buf, int len)
{
int err = 0;
int n;
do {
n = send(c->sockfd, buf, len, 0);
if (n >= 0) {
return n;
}
err = errno;
if (err != EAGAIN && err != EINTR) {
ErrorOutput("connect[%s] occur send failed, err[%d]", c->net_address.ip, err);
break;
}
} while (err == EINTR);
return err;
}
/* EVENT HANDLER[not use] */
int sb_transfer_read_discard_event(struct sb_cycle_env *env, struct sb_event *e)
{
struct sb_connection *in;
struct sb_connection *out;
struct sb_session *sess;
int r, rc, rc2;
int n;
char tmp_buf[1024];
in = e->data;
sess = in->session;
if (in->connect_type == CONNECT_TYPE_CLIENT)
out = &sess->server;
else if (in->connect_type == CONNECT_TYPE_SERVER)
out = &sess->client;
for (;;) {
n = recv(in->sockfd, tmp_buf, 1024, 0);
if (n == -1) {
if (errno == EINTR)
continue;
else if(errno = EAGAIN) {
/* delete this side's read event */
if (sb_del_read_event(&in->read)) {
ErrorOutput("connect ip[%s]:port[%d] delete read event failed, errno[%d]", in->net_address.ip, in->net_address.port, errno);
sess->why_finish = FINISH_REASON_PROCESSEVENTERR;
return SB_ERROR;
}
}
else {
(void)sb_finish_session(env, sess);
ErrorOutput("[%s]<--->[%s] discard transfer data failed, err[%d]", in->net_address.ip, out->net_address.ip, errno);
}
}
else if (n > 0) {
/* discard data */
continue;
}
else {
/* n == 0, finish session */
(void)sb_finish_session(env, sess);
InfoOutput("[%s]<--->[%s] complete when read discard event!!", in->net_address.ip, out->net_address.ip);
}
break;
}
return SB_OK;
}
/* EVENT HANDLER: only read not send, when accept client's connection use this handler and after connect server success
the handler will change to sb_transfer_read_event.
notice: when read error, not finish the whole session
*/
int sb_transfer_read_not_send_event(struct sb_cycle_env *env, struct sb_event *e)
{
struct sb_connection *in;
struct sb_connection *out;
struct sb_session *sess;
int r, rc, rc2;
int n;
in = e->data;
sess = in->session;
if (in->connect_type == CONNECT_TYPE_CLIENT)
out = &sess->server;
else if (in->connect_type == CONNECT_TYPE_SERVER)
out = &sess->client;
/* @RETURN: 1. SB_BUFNOTENOUGH 2. SB_AGAIN 3. SB_ERROR(errno) 4. SB_DONE */
rc = sb_recv(in);
if (in->connect_type == CONNECT_TYPE_CLIENT) {
sb_dump_session_buf(sess);
}
if (rc == SB_DONE) {
in->read.shutdown = 1;
close(in->sockfd);
if (sb_del_read_event(&in->read)) {
ErrorOutput("connect ip[%s]:port[%d] delete read event failed, errno[%d]", in->net_address.ip, in->net_address.port, errno);
sess->why_finish = FINISH_REASON_PROCESSEVENTERR;
return SB_ERROR;
}
in->sockfd = 0;
InfoOutput("[%s]<--->[%s] client' read finish before connect return", in->net_address.ip, out->net_address.ip);
}
else if(rc == SB_BUFNOTENOUGH) { /* remain data in tcp-internal buf */
in->read.read_ready = 1;
}
else if(rc == SB_AGAIN) { /* recv ok, send buf is full */
in->read.read_ready = 0;
}
else if (rc == SB_ERROR) {
/*** Here can't finish session at now, should make 'shutdown' flag and wait for connect server system call
no matter connect's result(success or fail) find client's read error will finish the whole session
***/
in->read.shutdown = 1;
in->read.error = 1;
close(in->sockfd);
if (sb_del_read_event(&in->read)) {
ErrorOutput("connect ip[%s]:port[%d] delete read event failed, errno[%d]", in->net_address.ip, in->net_address.port, errno);
sess->why_finish = FINISH_REASON_PROCESSEVENTERR;
return SB_ERROR;
}
in->sockfd = 0;
ErrorOutput("[%s]<--->[%s] client' read error before connect return", in->net_address.ip, out->net_address.ip);
return SB_ERROR;
}
return SB_OK;
}
/* EVENT HANDLER */
int sb_transfer_delay_read_event(struct sb_cycle_env *env, struct sb_event *e)
{
struct sb_connection *c;
c = e->data;
c->read.force_delay_read = 1;
c->read.read_ready = 1;
return SB_OK;
}
/* EVENT HANDLER */
/* 1. when buf has data, add write event
2. when send all data in buf, delete write event */
int sb_transfer_read_event(struct sb_cycle_env *env, struct sb_event *e)
{
struct sb_connection *in = NULL;
struct sb_connection *out = NULL;
struct sb_session *sess = NULL;
int r, rc, rc2;
in = e->data;
sess = in->session;
if (in->connect_type == CONNECT_TYPE_CLIENT)
out = &sess->server;
else if (in->connect_type == CONNECT_TYPE_SERVER)
out = &sess->client;
if (out == NULL) {
sess->why_finish = FINISH_REASON_INTERNALERR;
FatalOutput("connect_type unset");
(void)sb_finish_session(env, sess);
return SB_ERROR;
}
for (;;) {
/* @RETURN: 1. SB_BUFNOTENOUGH 2. SB_AGAIN 3. SB_ERROR(errno) 4. SB_DONE */
rc = sb_recv(in);
/**
if (in->connect_type == CONNECT_TYPE_CLIENT) {
sb_dump_session_buf(sess);
}
***/
if (rc == SB_DONE || rc == SB_BUFNOTENOUGH || rc == SB_AGAIN) {
/* @RETURN: 1. SB_AGAIN 2. SB_ERROR(errno) 4. SB_OK */
rc2 = sb_send(out);
if (rc2 == SB_AGAIN) { /* remain data in session's buf */
if (rc == SB_DONE) {
in->read.shutdown = 1;
}
else if(rc == SB_BUFNOTENOUGH) { /* remain data in tcp-internal buf */
in->read.read_ready = 1;
}
else if(rc == SB_AGAIN) { /* recv ok, send buf is full */
in->read.read_ready = 0;
}
if (!sb_event_is_active(&out->write)) {
if (sb_add_write_event(&out->write, sb_transfer_write_event)) {
sess->why_finish = FINISH_REASON_PROCESSEVENTERR;
(void)sb_finish_session(env, sess);
return SB_ERROR;
}
}
r = SB_OK;
}
else if (rc2 == SB_OK) { /* send whole buf data, notice: one side shutdown passively, another forced shutdown */
if (rc == SB_DONE) {
/* success finish session */
sess->why_finish = FINISH_REASON_SUCESS_TRANSFER;
env->total_success++;
(void)sb_finish_session(env, sess);
InfoOutput("[%s-request close side]<--->[%s] complete", in->net_address.ip, out->net_address.ip);
}
else if (rc == SB_BUFNOTENOUGH) {
/* CONTINUE, should recv and send data */
DebugOutput("[%s]<--->[%s] buf not enough", in->net_address.ip, out->net_address.ip);
continue;
}
else if (rc == SB_AGAIN) {
/* all recv data send out completely, should add in's read event */
in->read.read_ready = 0;
}
if (sb_event_is_active(&out->write)) {
if (sb_del_write_event(&out->write)) {
ErrorOutput("connect ip[%s]:port[%d] delete write event failed, errno[%d]", out->net_address.ip, out->net_address.port, errno);
sess->why_finish = FINISH_REASON_PROCESSEVENTERR;
return SB_ERROR;
}
}
r = SB_OK;
}
else if (rc2 == SB_ERROR) { /* send error */
if (in->connect_type == CONNECT_TYPE_CLIENT)
sess->why_finish = FINISH_REASON_SERVERWRITEERR;
else
sess->why_finish = FINISH_REASON_CLIENTWRITEERR;
(void)sb_finish_session(env, sess);
ErrorOutput("[%s]<--->[%s-error side] occur send failed", in->net_address.ip, out->net_address.ip);
r = SB_ERROR;
}
}
else if(rc == SB_ERROR) {
if (in->connect_type == CONNECT_TYPE_CLIENT)
sess->why_finish = FINISH_REASON_CLIENTREADERR;
else
sess->why_finish = FINISH_REASON_CLIENTWRITEERR;
(void)sb_finish_session(env, sess);
ErrorOutput("[%s]<--->[%s] occur recv failed", in->net_address.ip, out->net_address.ip);
r = SB_ERROR;
}
break;
}
return r;
}
/* EVENT HANDLER */
/* when send all data in buf, write event will be deleted */
int sb_transfer_write_event(struct sb_cycle_env *env, struct sb_event *e)
{
struct sb_connection *in;
struct sb_connection *out;
struct sb_session *sess;
unsigned int available_len = 0;
int rc;
out = e->data;
sess = out->session;
if (out->connect_type == CONNECT_TYPE_CLIENT)
in = &sess->server;
else if (out->connect_type == CONNECT_TYPE_SERVER)
in = &sess->client;
/* @RETURN: 1. SB_AGAIN 2. SB_ERROR(errno) 4. SB_OK */
rc = sb_send(out);
if (rc == SB_ERROR) {
if (in->connect_type == CONNECT_TYPE_CLIENT)
sess->why_finish = FINISH_REASON_CLIENTWRITEERR;
else
sess->why_finish = FINISH_REASON_SERVERWRITEERR;
(void)sb_finish_session(env, sess);
ErrorOutput("[%s]<--->[%s] occur send failed, err[%d]", in->net_address.ip, out->net_address.ip, errno);
return SB_ERROR;
}
else if (rc == SB_OK) { /* send complete */
/* notice: send all buf, we should delete write event */
sb_del_write_event(&out->write);
if (in->read.shutdown) {
sess->why_finish = FINISH_REASON_SUCESS_TRANSFER;
env->total_success++;
(void)sb_finish_session(env, sess);
ErrorOutput("[%s-request close]<--->[%s] complete", in->net_address.ip, out->net_address.ip);
}
else if(in->read.read_ready) {
/* because not enough buffer in previous, now read mannually */
if (sb_event_is_active(&in->read)) {
sb_transfer_read_event(env, &in->read);
}
}
}
else {
if (out->connect_type == CONNECT_TYPE_CLIENT)
available_len = sb_get_sc_available_len(sess);
else if(out->connect_type == CONNECT_TYPE_SERVER)
available_len = sb_get_cs_available_len(sess);
if(available_len > 0 && in->read.read_ready) {
/* send some data, because not enough buffer in previous, now read mannually */
if (sb_event_is_active(&in->read)) {
sb_transfer_read_event(env, &in->read);
}
}
}
return SB_OK;
}
int sb_accept_delay_timer(struct sb_cycle_env *env, struct sb_event *e)
{
env->accept_dalay_flag = 0;
if (e->in_timer_set) {
sb_timer_pop_event(e);
}
return SB_OK;
}
int sb_init_listen_handler(struct sb_cycle_env *env)
{
int i, j;
struct sb_forward_rule *rule;
struct sb_listen *li;
struct sb_event *e;
if (env->http_proxy == 1) {
if (env->rule_num > 1) {
ErrorOutput("only one httpproxy can be allow");
return SB_ERROR;
}
}
printf("total_listenfd_num%d\n", env->total_listenfd_num);
for (i = 0; i < env->total_listenfd_num; i++) {
/* set listen handler */
li = &env->listen[i];
printf("fd %d\n", li->listen_fd);
e = &li->listen_event;
if (env->http_proxy) {
e->handler = sb_proxy_accept_event;
}
else {
e->handler = sb_accept_event;
}
}
return SB_OK;
}
int sb_process_signal_command(struct sb_cycle_env *env)
{
int rc, i, j;
struct sb_forward_rule *rule;
struct sb_listen *li;
if (sig_cmd == SB_SYS_CMD_QUICKEXIT) {
for (i = 0; i < env->total_listenfd_num; i++) {
li = &env->listen[i];
close(li->listen_fd);
}
SystemInfo("quick exit...");
exit(1);
}
else if (sig_cmd == SB_SYS_CMD_SLOWEXIT) {
for (i = 0; i < env->total_listenfd_num; i++) {
li = &env->listen[i];
close(li->listen_fd);
}
SystemInfo("slow exit...");
sb_set_process_title("sbworker/%d exiting", env->workerno);
sleep(5);
exiting = 1;
}
else if(sig_cmd == SB_SYS_CMD_SHOW_STATUS) {
sb_conf_show(env);
sig_cmd = 0;
}
}
int sb_try_get_accept_lock(struct sb_cycle_env *env)
{
if (sb_spintrylock(env->accept_lock)) {
if (sb_add_listen_event(env)) {
return SB_ERROR;
}
sb_accept_lock_mutex = 1;
}
return SB_OK;
}
int sb_free_accept_lock(struct sb_cycle_env *env)
{
sb_spinunlock(env->accept_lock);
sb_accept_lock_mutex = 0;
if (sb_del_listen_event(env)) {
return SB_ERROR;
}
return SB_OK;
}
int sb_server_loop(struct sb_cycle_env *env)
{
/* 1. find the lastest timer
2. use muti I/O interface - set the timeout ad the step 1's time
3. process muti I/O's report active fd
4. find all expire timer and process timer */
int rc;
int err;
int wait_num;
unsigned long delta;
unsigned long last_timeout;
sb_update_time(&cache_time, cache_str_time);
while (1)
{
if (!exiting) {
sb_process_signal_command(env);
}
last_timeout = sb_find_lastest_timer(env);
if (exiting == 0 && env->workers_num > 1) {
if (sb_try_get_accept_lock(env)) {
ErrorOutput("get accept lock error");
exit(1);
}
}
delta = sb_current_msec_time(env);
wait_num = process_event(env, (int)last_timeout, NULL);
sb_update_time(&cache_time, cache_str_time);
/* free lock */
if (sb_accept_lock_mutex) {
if (sb_free_accept_lock(env)) {
ErrorOutput("free accept lock error");
exit(1);
}
}
delta = sb_current_msec_time(env) - delta;
if (env->http_proxy) {
rc = sb_proxy_dns_recv_resp(env);
if (rc) {
ErrorOutput("dns process failed");
exit(1);
}
}
/* only delta bigger than 5ms */
if (delta >= 5) {
sb_process_expire_timer(env);
sb_update_time(&cache_time, cache_str_time);
}
sb_merge_reused_session(env);
if (exiting) {
if (sb_timer_is_empty()){
/* TODO */
SystemInfo("pid[%d] exit...", env->pid);
exit(1);
}
}
}
return SB_OK;
}
int sb_add_listen_event(struct sb_cycle_env *env)
{
int rc = 0;
int i;
for (i = 0; i < env->total_listenfd_num; i++) {
rc = add_listen_event(&env->listen[i].listen_event);
if (rc) {
ErrorOutput("add listen event failed");
return SB_ERROR;
}
}
return SB_OK;
}
int sb_del_listen_event(struct sb_cycle_env *env)
{
int rc = 0;
int i;
for (i = 0; i < env->total_listenfd_num; i++) {
rc = del_listen_event(&env->listen[i].listen_event);
if (rc) {
ErrorOutput("add listen event failed");
return SB_ERROR;
}
}
return SB_OK;
}
int sb_init_worker_env(struct sb_cycle_env *env)
{
exiting = 0;
env->accept_delay_timeout = SYS_RECOURCE_POOR_ACCEPT_DELAY;
/* set handler */
env->accept_delay_timer.handler = sb_accept_delay_timer;
sb_accept_lock_mutex = 0;
return SB_OK;
}
static void sb_signal_handler(int signo)
{
if (sig_cmd != 0) {
return;
}
SystemInfo("catch signo[%d]", signo);
if (signo == SIGUSR1) {
sig_cmd = SB_SYS_CMD_QUICKEXIT;
}
else if (signo == SIGUSR2) {
sig_cmd = SB_SYS_CMD_SLOWEXIT;
}
else {
SystemInfo("unknow signal");
}
}
void sb_signal_init()
{
int signo;
sigset_t sgset;
struct sigaction act;
/* cancel signale mask that inherit from parent */
sigemptyset(&sgset);
sigprocmask(SIG_SETMASK, &sgset, NULL);
for (signo = 1; signo <= 64; signo++) {
if (signo == SIGKILL || signo == SIGSTOP) {
continue;
}
signal(signo, SIG_IGN);
}
signal(SIGINT, SIG_DFL);
signal(SIGHUP, SIG_DFL);
memset(&act, 0x00, sizeof(struct sigaction));
act.sa_handler = sb_signal_handler;
sigaction(SIGUSR1, &act, NULL);
sigaction(SIGUSR2, &act, NULL);
}
int sb_worker_main(struct sb_cycle_env *g_env)
{
struct sb_listen *li;
int rc = 0;
int i = 0;
sb_update_time(&cache_time, cache_str_time);
rc = sb_init_worker_env(g_env);
if (rc) {
ErrorOutput("init environment failed");
exit(1);
}
SystemInfo("init worker environment success!");
rc = init_event(g_env);
if (rc) {
ErrorOutput("init event manager failed");
exit(1);
}
SystemInfo("init event manager success!");
/* init listen */
if (sb_init_listen_handler(g_env)) {
ErrorOutput("init listen handler failed");
exit(1);
}
sb_signal_init();
SystemInfo("init signal success!");
/* start main loop */
sb_server_loop(g_env);
SystemInfo("start sbalance exit");
return 0;
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化