加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
swoole_async.c 17.61 KB
一键复制 编辑 原始数据 按行查看 历史
韩天峰 提交于 2014-11-25 18:18 . sync 1.7.8
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714
/*
+----------------------------------------------------------------------+
| Swoole |
+----------------------------------------------------------------------+
| This source file is subject to version 2.0 of the Apache license, |
| that is bundled with this package in the file LICENSE, and is |
| available through the world-wide-web at the following url: |
| http://www.apache.org/licenses/LICENSE-2.0.html |
| If you did not receive a copy of the Apache2.0 license and are unable|
| to obtain it through the world-wide-web, please send a note to |
| license@php.net so we can mail you a copy immediately. |
+----------------------------------------------------------------------+
| Author: Tianfeng Han <mikan.tenny@gmail.com> |
+----------------------------------------------------------------------+
*/
#include "php_swoole.h"
#include "php_streams.h"
#include "php_network.h"
typedef struct
{
zval *callback;
zval *filename;
int fd;
off_t offset;
uint16_t type;
uint8_t once;
char *file_content;
uint32_t content_length;
} swoole_async_file_request;
typedef struct {
zval *callback;
zval *domain;
} swoole_async_dns_request;
static void php_swoole_check_aio();
static void php_swoole_aio_onComplete(swAio_event *event);
static char php_swoole_aio_init = 0;
static swHashMap *php_swoole_open_files;
void swoole_async_init(int module_number TSRMLS_DC)
{
bzero(&SwooleAIO, sizeof(SwooleAIO));
REGISTER_LONG_CONSTANT("SWOOLE_AIO_BASE", SW_AIO_BASE, CONST_CS | CONST_PERSISTENT);
REGISTER_LONG_CONSTANT("SWOOLE_AIO_GCC", SW_AIO_GCC, CONST_CS | CONST_PERSISTENT);
REGISTER_LONG_CONSTANT("SWOOLE_AIO_LINUX", SW_AIO_LINUX, CONST_CS | CONST_PERSISTENT);
php_swoole_open_files = swHashMap_new(SW_HASHMAP_INIT_BUCKET_N, NULL);
if (php_swoole_open_files == NULL)
{
php_error_docref(NULL TSRMLS_CC, E_ERROR, "create hashmap failed.");
}
}
static void php_swoole_check_aio()
{
if (php_swoole_aio_init == 0)
{
php_swoole_check_reactor();
swAio_init();
SwooleAIO.callback = php_swoole_aio_onComplete;
php_swoole_try_run_reactor();
php_swoole_aio_init = 1;
}
}
static void php_swoole_aio_onComplete(swAio_event *event)
{
int isEOF = SW_FALSE;
int64_t ret;
zval *retval = NULL, *zcallback = NULL, *zwriten = NULL;
zval *zcontent = NULL;
zval **args[2];
swoole_async_file_request *file_req = NULL;
swoole_async_dns_request *dns_req = NULL;
TSRMLS_FETCH_FROM_CTX(sw_thread_ctx ? sw_thread_ctx : NULL);
if (event->type == SW_AIO_DNS_LOOKUP)
{
dns_req = (swoole_async_dns_request *) event->req;
if (dns_req->callback == NULL)
{
php_error_docref(NULL TSRMLS_CC, E_WARNING, "swoole_async: onAsyncComplete callback not found[2]");
return;
}
zcallback = dns_req->callback;
}
else
{
if (zend_hash_find(&php_sw_aio_callback, (char *)&(event->fd), sizeof(event->fd), (void**) &file_req) != SUCCESS)
{
php_error_docref(NULL TSRMLS_CC, E_WARNING, "swoole_async: onAsyncComplete callback not found[1]");
return;
}
if (file_req->callback == NULL && file_req->type == SW_AIO_READ)
{
php_error_docref(NULL TSRMLS_CC, E_WARNING, "swoole_async: onAsyncComplete callback not found[2]");
return;
}
zcallback = file_req->callback;
}
ret = event->ret;
if (ret < 0)
{
php_error_docref(NULL TSRMLS_CC, E_WARNING, "swoole_async: Aio Error: %s[%d]", strerror(event->error), event->error);
}
else if (file_req != NULL)
{
if (ret == 0)
{
bzero(event->buf, event->nbytes);
isEOF = SW_TRUE;
}
else if (file_req->once == 1 && ret < file_req->content_length)
{
php_error_docref(NULL TSRMLS_CC, E_WARNING, "swoole_async: ret_length[%d] < req->length[%d].", (int) ret, file_req->content_length);
}
else if (event->type == SW_AIO_READ)
{
file_req->offset += event->ret;
}
}
if (event->type == SW_AIO_READ)
{
MAKE_STD_ZVAL(zcontent);
args[0] = &file_req->filename;
args[1] = &zcontent;
ZVAL_STRINGL(zcontent, event->buf, ret, 0);
}
else if (event->type == SW_AIO_WRITE)
{
MAKE_STD_ZVAL(zwriten);
args[0] = &file_req->filename;
args[1] = &zwriten;
ZVAL_LONG(zwriten, ret);
if (file_req->once != 1)
{
if (SwooleAIO.mode == SW_AIO_LINUX)
{
free(event->buf);
}
else
{
efree(event->buf);
}
}
}
else if(event->type == SW_AIO_DNS_LOOKUP)
{
MAKE_STD_ZVAL(zcontent);
args[0] = &dns_req->domain;
if (ret < 0)
{
ZVAL_STRING(zcontent, "", 0);
}
else
{
ZVAL_STRING(zcontent, event->buf, 0);
}
args[1] = &zcontent;
}
else
{
php_error_docref(NULL TSRMLS_CC, E_WARNING, "swoole_async: onAsyncComplete unknow event type");
return;
}
if (zcallback)
{
if (call_user_function_ex(EG(function_table), NULL, zcallback, &retval, 2, args, 0, NULL TSRMLS_CC) == FAILURE)
{
php_error_docref(NULL TSRMLS_CC, E_WARNING, "swoole_async: onAsyncComplete handler error");
return;
}
}
//readfile/writefile
if (file_req != NULL)
{
//只操作一次,完成后释放缓存区并关闭文件
if (file_req->once == 1)
{
close_file:
zval_ptr_dtor(&file_req->callback);
zval_ptr_dtor(&file_req->filename);
if (SwooleAIO.mode == SW_AIO_LINUX)
{
free(event->buf);
}
else
{
efree(event->buf);
}
close(event->fd);
//remove from hashtable
zend_hash_del(&php_sw_aio_callback, (char *)&(event->fd), sizeof(event->fd));
}
else if(file_req->type == SW_AIO_WRITE)
{
if (retval != NULL && !Z_BVAL_P(retval))
{
swHashMap_del(php_swoole_open_files, Z_STRVAL_P(file_req->filename), Z_STRLEN_P(file_req->filename));
goto close_file;
}
}
else
{
if (!Z_BVAL_P(retval) || isEOF)
{
goto close_file;
}
else if (SwooleAIO.read(event->fd, event->buf, event->nbytes, file_req->offset) < 0)
{
php_error_docref(NULL TSRMLS_CC, E_WARNING, "swoole_async: continue to read failed. Error: %s[%d]", strerror(event->error), event->error);
}
}
}
else if(dns_req != NULL)
{
zval_ptr_dtor(&dns_req->callback);
zval_ptr_dtor(&dns_req->domain);
efree(dns_req);
efree(event->buf);
}
if (zcontent != NULL)
{
efree(zcontent);
}
if (zwriten != NULL)
{
zval_ptr_dtor(&zwriten);
}
if (retval != NULL)
{
zval_ptr_dtor(&retval);
}
if (php_sw_in_client && SwooleG.main_reactor->event_num == 1 && SwooleAIO.task_num == 0)
{
SwooleG.running = 0;
}
}
PHP_FUNCTION(swoole_async_read)
{
#ifdef ZTS
if (sw_thread_ctx == NULL)
{
TSRMLS_SET_CTX(sw_thread_ctx);
}
#endif
zval *cb;
zval *filename;
long trunk_len = 8192;
int open_flag = O_RDONLY;
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "zz|l", &filename, &cb, &trunk_len) == FAILURE)
{
return;
}
convert_to_string(filename);
if (SwooleAIO.mode == SW_AIO_LINUX)
{
open_flag |= O_DIRECT;
}
int fd = open(Z_STRVAL_P(filename), open_flag, 0644);
if (fd < 0)
{
php_error_docref(NULL TSRMLS_CC, E_WARNING, "open file[%s] failed. Error: %s[%d]", Z_STRVAL_P(filename), strerror(errno), errno);
RETURN_FALSE;
}
void *fcnt;
if (SwooleAIO.mode == SW_AIO_LINUX)
{
int buf_len = trunk_len + (sysconf(_SC_PAGESIZE) - (trunk_len % sysconf(_SC_PAGESIZE)));
if (posix_memalign((void **) &fcnt, sysconf(_SC_PAGESIZE), buf_len))
{
php_error_docref(NULL TSRMLS_CC, E_WARNING, "posix_memalign failed. Error: %s[%d]", strerror(errno), errno);
RETURN_FALSE;
}
}
else
{
fcnt = emalloc(trunk_len);
if (fcnt == NULL)
{
php_error_docref(NULL TSRMLS_CC, E_WARNING, "malloc failed. Error: %s[%d]", strerror(errno), errno);
RETURN_FALSE;
}
}
//printf("buf_len=%d|addr=%p\n", buf_len, fcnt);
//printf("pagesize=%d|st_size=%d\n", sysconf(_SC_PAGESIZE), buf_len);
swoole_async_file_request req;
req.fd = fd;
req.filename = filename;
req.callback = cb;
req.file_content = fcnt;
req.once = 0;
req.type = SW_AIO_READ;
req.content_length = trunk_len;
req.offset = 0;
Z_ADDREF_PP(&cb);
Z_ADDREF_PP(&filename);
if (zend_hash_update(&php_sw_aio_callback, (char * )&fd, sizeof(fd), &req, sizeof(swoole_async_file_request), NULL) == FAILURE)
{
php_error_docref(NULL TSRMLS_CC, E_WARNING, "add to hashtable[1] failed");
RETURN_FALSE;
}
php_swoole_check_aio();
SW_CHECK_RETURN(SwooleAIO.read(fd, fcnt, trunk_len, 0));
RETURN_TRUE;
}
PHP_FUNCTION(swoole_async_write)
{
#ifdef ZTS
if (sw_thread_ctx == NULL)
{
TSRMLS_SET_CTX(sw_thread_ctx);
}
#endif
zval *cb = NULL;
zval *filename;
char *fcnt;
int fcnt_len = 0;
int fd;
off_t offset = -1;
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "zs|lz", &filename, &fcnt, &fcnt_len, &offset, &cb) == FAILURE)
{
return;
}
convert_to_string(filename);
char *wt_cnt;
int open_flag = O_WRONLY | O_CREAT;
if (SwooleAIO.mode == SW_AIO_LINUX)
{
if (posix_memalign((void **) &wt_cnt, sysconf(_SC_PAGESIZE), fcnt_len))
{
php_error_docref(NULL TSRMLS_CC, E_WARNING, "posix_memalign failed. Error: %s[%d]", strerror(errno), errno);
RETURN_FALSE;
}
open_flag |= O_DIRECT;
}
else
{
wt_cnt = fcnt;
wt_cnt = emalloc(fcnt_len);
}
swoole_async_file_request *req = swHashMap_find(php_swoole_open_files, Z_STRVAL_P(filename), Z_STRLEN_P(filename));
if (req == NULL)
{
fd = open(Z_STRVAL_P(filename), open_flag, 0644);
if (fd < 0)
{
php_error_docref(NULL TSRMLS_CC, E_WARNING, "open file failed. Error: %s[%d]", strerror(errno), errno);
RETURN_FALSE;
}
swoole_async_file_request new_req;
new_req.fd = fd;
new_req.filename = filename;
new_req.callback = cb;
new_req.file_content = wt_cnt;
new_req.once = 0;
new_req.type = SW_AIO_WRITE;
new_req.content_length = fcnt_len;
if (offset < 0)
{
struct stat file_stat;
if (fstat(fd, &file_stat) < 0)
{
php_error_docref(NULL TSRMLS_CC, E_WARNING, "fstat() failed. Error: %s[%d]", strerror(errno), errno);
RETURN_FALSE;
}
offset = file_stat.st_size - 1;
new_req.offset = offset + fcnt_len;
}
else
{
new_req.offset = 0;
}
if (cb != NULL)
{
Z_ADDREF_PP(&cb);
}
if (zend_hash_update(&php_sw_aio_callback, (char *)&fd, sizeof(fd), (void **) &new_req, sizeof(new_req), (void **) &req) == FAILURE)
{
php_error_docref(NULL TSRMLS_CC, E_WARNING, "add to hashtable[1] failed");
RETURN_FALSE;
}
swHashMap_add(php_swoole_open_files, Z_STRVAL_P(filename), Z_STRLEN_P(filename), req, NULL);
}
else
{
if (offset < 0)
{
offset = req->offset;
req->offset += fcnt_len;
}
fd = req->fd;
}
//swTrace("buf_len=%d|addr=%p", buf_len, fcnt);
//swTrace("pagesize=%d|st_size=%d", sysconf(_SC_PAGESIZE), buf_len);
memcpy(wt_cnt, fcnt, fcnt_len);
php_swoole_check_aio();
SW_CHECK_RETURN(SwooleAIO.write(fd, wt_cnt, fcnt_len, offset));
RETURN_TRUE;
}
PHP_FUNCTION(swoole_async_readfile)
{
#ifdef ZTS
if (sw_thread_ctx == NULL)
{
TSRMLS_SET_CTX(sw_thread_ctx);
}
#endif
zval *cb;
zval *filename;
int open_flag = O_RDONLY;
if (SwooleAIO.mode == SW_AIO_LINUX)
{
open_flag |= O_DIRECT;
}
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "zz", &filename, &cb) == FAILURE)
{
return;
}
convert_to_string(filename);
int fd = open(Z_STRVAL_P(filename), open_flag, 0644);
if (fd < 0)
{
php_error_docref(NULL TSRMLS_CC, E_WARNING, "open file[%s] failed. Error: %s[%d]", Z_STRVAL_P(filename), strerror(errno), errno);
RETURN_FALSE;
}
struct stat file_stat;
if (fstat(fd, &file_stat) < 0)
{
php_error_docref(NULL TSRMLS_CC, E_WARNING, "fstat failed. Error: %s[%d]", strerror(errno), errno);
RETURN_FALSE;
}
if (file_stat.st_size <= 0)
{
php_error_docref(NULL TSRMLS_CC, E_WARNING, "file is empty.");
RETURN_FALSE;
}
if (file_stat.st_size > SW_AIO_MAX_FILESIZE)
{
php_error_docref(NULL TSRMLS_CC, E_WARNING, "file_size[size=%ld|max_size=%d] is too big. Please use swoole_async_read.",
(long int) file_stat.st_size, SW_AIO_MAX_FILESIZE);
RETURN_FALSE;
}
void *fcnt;
int buf_len;
if (SwooleAIO.mode == SW_AIO_LINUX)
{
buf_len = file_stat.st_size + (sysconf(_SC_PAGESIZE) - (file_stat.st_size % sysconf(_SC_PAGESIZE)));
if (posix_memalign((void **) &fcnt, sysconf(_SC_PAGESIZE), buf_len))
{
php_error_docref(NULL TSRMLS_CC, E_WARNING, "posix_memalign failed. Error: %s[%d]", strerror(errno), errno);
RETURN_FALSE;
}
}
else
{
buf_len = file_stat.st_size;
fcnt = emalloc(buf_len);
if (fcnt == NULL)
{
php_error_docref(NULL TSRMLS_CC, E_WARNING, "malloc failed. Error: %s[%d]", strerror(errno), errno);
RETURN_FALSE;
}
}
//printf("buf_len=%d|addr=%p\n", buf_len, fcnt);
//printf("pagesize=%d|st_size=%d\n", sysconf(_SC_PAGESIZE), buf_len);
swoole_async_file_request req;
req.fd = fd;
req.filename = filename;
req.callback = cb;
req.file_content = fcnt;
req.once = 1;
req.type = SW_AIO_READ;
req.content_length = file_stat.st_size;
req.offset = 0;
Z_ADDREF_PP(&cb);
Z_ADDREF_PP(&filename);
if (zend_hash_update(&php_sw_aio_callback, (char * )&fd, sizeof(fd), &req, sizeof(swoole_async_file_request), NULL) == FAILURE)
{
php_error_docref(NULL TSRMLS_CC, E_WARNING, "add to hashtable failed");
RETURN_FALSE;
}
php_swoole_check_aio();
SW_CHECK_RETURN(SwooleAIO.read(fd, fcnt, buf_len, 0));
}
PHP_FUNCTION(swoole_async_writefile)
{
#ifdef ZTS
if (sw_thread_ctx == NULL)
{
TSRMLS_SET_CTX(sw_thread_ctx);
}
#endif
zval *cb = NULL;
zval *filename;
char *fcnt;
int fcnt_len;
#ifdef HAVE_LINUX_NATIVE_AIO
int open_flag = O_CREAT | O_WRONLY | O_DIRECT;
#else
int open_flag = O_CREAT | O_WRONLY;
#endif
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "zs|z", &filename, &fcnt, &fcnt_len, &cb) == FAILURE)
{
return;
}
if (fcnt_len <= 0)
{
php_error_docref(NULL TSRMLS_CC, E_WARNING, "file is empty.");
RETURN_FALSE;
}
if (fcnt_len > SW_AIO_MAX_FILESIZE)
{
php_error_docref(NULL TSRMLS_CC, E_WARNING, "file_size[size=%d|max_size=%d] is too big. Please use swoole_async_read.",
fcnt_len, SW_AIO_MAX_FILESIZE);
RETURN_FALSE;
}
convert_to_string(filename);
int fd = open(Z_STRVAL_P(filename), open_flag, 0644);
if (fd < 0)
{
php_error_docref(NULL TSRMLS_CC, E_WARNING, "open file failed. Error: %s[%d]", strerror(errno), errno);
RETURN_FALSE;
}
char *wt_cnt;
#ifdef SW_AIO_LINUX_NATIVE
fcnt_len = fcnt_len + (sysconf(_SC_PAGESIZE) - (fcnt_len % sysconf(_SC_PAGESIZE)));
if (posix_memalign((void **)&wt_cnt, sysconf(_SC_PAGESIZE), fcnt_len))
{
php_error_docref(NULL TSRMLS_CC, E_WARNING, "posix_memalign failed. Error: %s[%d]", strerror(errno), errno);
RETURN_FALSE;
}
#else
wt_cnt = emalloc(fcnt_len);
#endif
memcpy(wt_cnt, fcnt, fcnt_len);
swoole_async_file_request req;
req.fd = fd;
req.filename = filename;
req.callback = cb;
req.type = SW_AIO_WRITE;
req.file_content = wt_cnt;
req.once = 1;
req.content_length = fcnt_len;
req.offset = 0;
Z_ADDREF_PP(&filename);
if (req.callback != NULL)
{
Z_ADDREF_PP(&req.callback);
}
if (zend_hash_update(&php_sw_aio_callback, (char *)&fd, sizeof(fd), &req, sizeof(swoole_async_file_request), NULL) == FAILURE)
{
php_error_docref(NULL TSRMLS_CC, E_WARNING, "add to hashtable failed");
RETURN_FALSE;
}
memcpy(wt_cnt, fcnt, fcnt_len);
php_swoole_check_aio();
SW_CHECK_RETURN(SwooleAIO.write(fd, wt_cnt, fcnt_len, 0));
}
PHP_FUNCTION(swoole_async_set)
{
#ifdef ZTS
if (sw_thread_ctx == NULL)
{
TSRMLS_SET_CTX(sw_thread_ctx);
}
#endif
zval *zset;
HashTable *vht;
zval **v;
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "a", &zset) == FAILURE)
{
return;
}
vht = Z_ARRVAL_P(zset);
if (zend_hash_find(vht, ZEND_STRS("aio_mode"), (void **)&v) == SUCCESS)
{
convert_to_long(*v);
SwooleAIO.mode = (uint8_t) Z_LVAL_PP(v);
}
if (zend_hash_find(vht, ZEND_STRS("thread_num"), (void **)&v) == SUCCESS)
{
convert_to_long(*v);
SwooleAIO.thread_num = (uint8_t) Z_LVAL_PP(v);
}
}
PHP_FUNCTION(swoole_async_dns_lookup)
{
#ifdef ZTS
if (sw_thread_ctx == NULL)
{
TSRMLS_SET_CTX(sw_thread_ctx);
}
#endif
zval *domain;
zval *cb;
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "zz", &domain, &cb) == FAILURE)
{
return;
}
#ifdef ZTS
if(sw_thread_ctx == NULL)
{
TSRMLS_SET_CTX(sw_thread_ctx);
}
#endif
if (Z_STRLEN_P(domain) == 0)
{
php_error_docref(NULL TSRMLS_CC, E_WARNING, "domain name empty.");
RETURN_FALSE;
}
swoole_async_dns_request *req = emalloc(sizeof(swoole_async_dns_request));
req->callback = cb;
req->domain = domain;
Z_ADDREF_PP(&req->callback);
Z_ADDREF_PP(&req->domain);
int buf_size;
if(Z_STRLEN_P(domain) < SW_IP_MAX_LENGTH)
{
buf_size = SW_IP_MAX_LENGTH+1;
}
else
{
buf_size = Z_STRLEN_P(domain)+1;
}
void *buf = emalloc(buf_size);
bzero(buf, buf_size);
memcpy(buf, Z_STRVAL_P(domain), Z_STRLEN_P(domain));
php_swoole_check_aio();
SW_CHECK_RETURN(swAio_dns_lookup(req, buf, buf_size));
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化