加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
file_controller.js 5.45 KB
一键复制 编辑 原始数据 按行查看 历史
jufan 提交于 2018-04-20 15:47 . 替换node作废的api fs.exsits
const fs = require('fs');
const pro = require('process');
const nodeUtil = require('util');
const config = require('./config')
const util = require('./util')
const classifier = require('./classifier')
/**
* 文件控制器
*
* 关于 request 与 response 文件如何对应的问题。
* 问题描述:请求生成的 request 经过光闸传输后,与返回的 response 被分隔到了两个事件中,如何确认哪个请求对应的哪个响应是一个难题
* 解决方案一:在 file_controller 中增加一个 map 对象,保存请求时的序号与其它内容,使用 response 文件的序号可找到对应的请求
* 解决方案二:不关心请求与响应的对应关系,全部使用无状态请求,把请求中的 back_url 一并写入到 response 文件中,收到响应后根据 back_url 通知相应系统
*/
const readFile = nodeUtil.promisify(fs.readFile);
const rename = nodeUtil.promisify(fs.rename);
const logger = util.logger
const REQUEST = util.REQUEST;
const RESPONSE = util.RESPONSE;
const FILE_BROKEN_ERR_CODE = 'file_broken';
// 文件是否存在
const exists = util.exists
pro.on('uncaughtException', (e)=> {
logger.error('domain catch error : ' , e);
});
/**
* 检验文件是否完成
* @param file 文件路径
* @return 文件内容的 json 对象
*/
const checkFile = async (filepath, filename)=>{
let fix = filepath.split('.').pop().toLowerCase(); //扩展名
if (fix !== REQUEST && fix !== RESPONSE){
logger.warn('文件不是被监听的类型');
return undefined;
}
let sign = filepath.split('-').pop();
sign = sign.split('.').shift();
if (sign && sign.length === 32){
try{
//在读取文件时,有可能会出现 EBUSY 异常导致系统崩溃,产生原因不明
let fileContent = await readFile(filepath, {encoding : 'utf-8'});
if (checkSign(fileContent, sign)){
try{
return JSON.parse(fileContent);
}catch(e){
logger.error(`文件 ${filename} 格式错误:`, e);
}
}else{
logger.error(`文件 ${filename} 签名验证失败,文件有可能被损坏,尝试重传。`);
await fileBroken(filename); //生成文件损坏的 response 文件
}
}catch(e){
logger.error(`文件 ${filename}读取异常,文件有可能被损坏,尝试重传:`, e);
await fileBroken(filename); //生成文件损坏的 response 文件
}
}else{
logger.warn('文件名不规范,删除文件。');
}
await util.removeFile(filepath);
};
/**
* 文件损坏响应,发起端收到该响应后会重发请求
* 该函数可异步执行,不需要观察返回结果
*/
const fileBroken = async (filename)=>{
//request 与 response 文件的处理逻辑是一样的,两种文件都可以支持。
let fileJson = {
header: util.getErrMsg(FILE_BROKEN_ERR_CODE), //文件被损坏
content: {
filename
}
}
await util.createResponseFile(fileJson);
}
/**
* 从已发送的目录中查找文件,重新发送
*/
const resendFile = async (filename)=>{
//从已发送与发送异常目录中寻找文件
for(dir of config.sent_dir){
let path = `${config.request_path}/${dir}/${filename}`;
logger.debug(`检查 ${path}`);
if (await exists(path)){
//找到文件
logger.debug(`开始把文件 ${path} 转移到共享目录`);
await rename(path, `${config.request_path}/${filename}`);
return;
}
}
logger.warn(`未找到需要重发的文件 ${filename} `);
}
/**
* 检验文件内容
* @return true|false
*/
const checkSign = (content, sign) => {
let md5Str = util.hexMd5(content);
logger.debug(`提交的签名:${sign},验证的签名:${md5Str}`);
return md5Str === sign;
}
logger.info(`打开目录监听:${config.response_path}`);
// 打开文件监听器
fs.watch(config.response_path, {persistent : false}, async (eventType, filename) =>{
logger.debug(`监听到文件变化:${eventType},文件名:${filename}`);
if (eventType === 'rename'){
let filePath = config.response_path + '/' + filename;
//只监听文件创建事件,创建和删除都会触发rename事件,需要判断文件是否存在
if (await exists(filePath)){
//文件创建事件
//1.校验文件完整性
let fileJson = await checkFile(filePath, filename);
if (fileJson){
logger.debug('文件内容:', fileJson);
//判断是否是重传请求
if (fileJson.header && fileJson.header.code === FILE_BROKEN_ERR_CODE){
//寻找原始文件,重新发送
resendFile(fileJson.content.filename);
util.removeFile(filePath); //删除收到的文件
}else{
//正常请求,分发请求
classifier.send(filePath, fileJson);
}
}
//2.判断是请求文件还是响应文件
//2.1如果是请求文件,分发到相应的系统服务
//2.2如果是响应文件,根据回调地址发送响应
}else{
logger.debug('文件删除事件,忽略');
}
}
});
//已发送目录文件清理
setInterval(async () => {
logger.info(`开始清理已发送目录 ${config.sent_dir}`);
for(dir of config.sent_dir){
try{
await util.emptyDir(`${config.request_path}/${dir}`);
}catch(e){
logger.error(`清理目录 ${dir} 异常:`, e);
}
}
//删除失败的文件缓存队列
await util.cleanQueue();
}, config.clean_cycle);
module.exports = {};
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化