代码拉取完成,页面将自动刷新
/*jshint node:true, laxcomma:true */
const util = require('util');
const config = require('./lib/config');
const helpers = require('./lib/helpers');
const fs = require('fs');
const events = require('events');
const logger = require('./lib/logger');
const set = require('./lib/set');
const pm = require('./lib/process_metrics');
const process_mgmt = require('./lib/process_mgmt');
const mgmt_server = require('./lib/mgmt_server');
const mgmt = require('./lib/mgmt_console');
// initialize data structures with defaults for statsd stats
let keyCounter = {};
let counters = {};
let timers = {};
let timer_counters = {};
let gauges = {};
let gaugesTTL = {};
let sets = {};
let counter_rates = {};
let timer_data = {};
let pctThreshold = null;
let flushInterval, keyFlushInt, serversLoaded, mgmtServer;
let startup_time = Math.round(new Date().getTime() / 1000);
let backendEvents = new events.EventEmitter();
let healthStatus = config.healthStatus || 'up';
let old_timestamp = 0;
let timestamp_lag_namespace;
let keyNameSanitize = true;
// Load and init the backend from the backends/ directory.
function loadBackend(config, name) {
const backendmod = require(name);
if (config.debug) {
l.log("Loading backend: " + name, 'DEBUG');
}
const ret = backendmod.init(startup_time, config, backendEvents, l);
if (!ret) {
l.log("Failed to load backend: " + name, "ERROR");
process.exit(1);
}
}
// Load and init the server from the servers/ directory.
// The callback mimics the dgram 'message' event parameters (msg, rinfo)
// msg: the message received by the server. may contain more than one metric
// rinfo: contains remote address information and message length
// (attributes are .address, .port, .family, .size - you're welcome)
function startServer(config, name, callback) {
const servermod = require(name);
if (config.debug) {
l.log("Loading server: " + name, 'DEBUG');
}
const ret = servermod.start(config, callback);
if (!ret) {
l.log("Failed to load server: " + name, "ERROR");
process.exit(1);
}
}
// global for conf
let conf;
// Flush metrics to each backend.
function flushMetrics() {
const time_stamp = Math.round(new Date().getTime() / 1000);
if (old_timestamp > 0) {
gauges[timestamp_lag_namespace] = (time_stamp - old_timestamp - (Number(conf.flushInterval)/1000));
}
old_timestamp = time_stamp;
const metrics_hash = {
counters: counters,
gauges: gauges,
timers: timers,
timer_counters: timer_counters,
sets: sets,
counter_rates: counter_rates,
timer_data: timer_data,
pctThreshold: pctThreshold,
histogram: conf.histogram
};
// After all listeners, reset the stats
backendEvents.once('flush', function clear_metrics(ts, metrics) {
// Clear the counters
conf.deleteCounters = conf.deleteCounters || false;
for (const counter_key in metrics.counters) {
if (conf.deleteCounters) {
if ((counter_key.indexOf("packets_received") != -1) ||
(counter_key.indexOf("metrics_received") != -1) ||
(counter_key.indexOf("bad_lines_seen") != -1)) {
metrics.counters[counter_key] = 0;
} else {
delete(metrics.counters[counter_key]);
}
} else {
metrics.counters[counter_key] = 0;
}
}
// Clear the timers
conf.deleteTimers = conf.deleteTimers || false;
for (const timer_key in metrics.timers) {
if (conf.deleteTimers) {
delete(metrics.timers[timer_key]);
delete(metrics.timer_counters[timer_key]);
} else {
metrics.timers[timer_key] = [];
metrics.timer_counters[timer_key] = 0;
}
}
// Clear the sets
conf.deleteSets = conf.deleteSets || false;
for (const set_key in metrics.sets) {
if (conf.deleteSets) {
delete(metrics.sets[set_key]);
} else {
metrics.sets[set_key] = new set.Set();
}
}
// Normally gauges are not reset. so if we don't delete them, continue to persist previous value
conf.deleteGauges = conf.deleteGauges || false;
if (conf.deleteGauges) {
for (const gauge_key in gaugesTTL) {
gaugesTTL[gauge_key]--;
// if the gauge has been idle for more than the allowed TTL cycles delete it
if (gaugesTTL[gauge_key] < 1) {
delete(metrics.gauges[gauge_key]);
delete(gaugesTTL[gauge_key]);
}
}
}
});
pm.process_metrics(metrics_hash, conf.calculatedTimerMetrics, flushInterval, time_stamp, function emitFlush(metrics) {
backendEvents.emit('flush', time_stamp, metrics);
});
// Performing this setTimeout at the end of this method rather than the beginning
// helps ensure we adapt to negative clock skew by letting the method's latency
// introduce a short delay that should more than compensate.
setTimeout(flushMetrics, getFlushTimeout(flushInterval));
}
const stats = {
messages: {
last_msg_seen: startup_time,
bad_lines_seen: 0
}
};
function sanitizeKeyName(key) {
if (keyNameSanitize) {
return key.replace(/\s+/g, '_')
.replace(/\//g, '-')
.replace(/[^a-zA-Z_\-0-9\.;=]/g, '');
} else {
return key;
}
}
function getFlushTimeout(interval) {
const now = new Date().getTime()
const deltaTime = now - startup_time * 1000;
const timeoutAttempt = Math.round(deltaTime / interval) + 1;
const fixedTimeout = (startup_time * 1000 + timeoutAttempt * interval) - now;
return fixedTimeout;
}
// Global for the logger
let l;
config.configFile(process.argv[2], function (config) {
conf = config;
process_mgmt.init(config);
l = new logger.Logger(config.log || {});
// force conf.gaugesMaxTTL to 1 if it not a positive integer > 1
if (helpers.isInteger(conf.gaugesMaxTTL) && conf.gaugesMaxTTL > 1) {
conf.gaugesMaxTTL = conf.gaugesMaxTTL;
} else {
conf.gaugesMaxTTL = 1;
}
// allows us to flag all of these on with a single config but still override them individually
conf.deleteIdleStats = conf.deleteIdleStats !== undefined ? conf.deleteIdleStats : false;
if (conf.deleteIdleStats) {
conf.deleteCounters = conf.deleteCounters !== undefined ? conf.deleteCounters : true;
conf.deleteTimers = conf.deleteTimers !== undefined ? conf.deleteTimers : true;
conf.deleteSets = conf.deleteSets !== undefined ? conf.deleteSets : true;
conf.deleteGauges = conf.deleteGauges !== undefined ? conf.deleteGauges : true;
}
// if gauges are not being deleted, clear gaugesTTL counters to save memory
if (! conf.deleteGauges) {
gaugesTTL = {}
}
// setup config for stats prefix
let prefixStats = config.prefixStats;
prefixStats = prefixStats !== undefined ? prefixStats : "statsd";
//setup the names for the stats stored in counters{}
bad_lines_seen = prefixStats + ".bad_lines_seen";
packets_received = prefixStats + ".packets_received";
metrics_received = prefixStats + ".metrics_received";
timestamp_lag_namespace = prefixStats + ".timestamp_lag";
//now set to zero so we can increment them
counters[bad_lines_seen] = 0;
counters[packets_received] = 0;
counters[metrics_received] = 0;
if (config.keyNameSanitize !== undefined) {
keyNameSanitize = config.keyNameSanitize;
}
if (!serversLoaded) {
// key counting
const keyFlushInterval = Number((config.keyFlush && config.keyFlush.interval) || 0);
const handlePacket = function (msg, rinfo) {
backendEvents.emit('packet', msg, rinfo);
counters[packets_received]++;
let metrics;
const packet_data = msg.toString();
if (packet_data.indexOf("\n") > -1) {
metrics = packet_data.split("\n");
} else {
metrics = [ packet_data ] ;
}
for (const midx in metrics) {
if (metrics[midx].length === 0) {
continue;
}
counters[metrics_received]++;
if (config.dumpMessages) {
l.log(metrics[midx].toString());
}
let bits = metrics[midx].toString().split('|#');
let tags = [];
if (bits.length > 1 && bits[1].length > 0) {
tags = bits[1].split(',');
}
bits = bits[0].split(':');
let key = bits.shift();
if (tags.length > 0) {
key += ';' + tags.map(function(tag) {
return tag.replace(';', '_').replace(':', '=');
}).join(';');
}
key = sanitizeKeyName(key);
if (keyFlushInterval > 0) {
if (! keyCounter[key]) {
keyCounter[key] = 0;
}
keyCounter[key] += 1;
}
if (bits.length === 0) {
bits.push("1");
}
for (let i = 0; i < bits.length; i++) {
let sampleRate = 1;
const fields = bits[i].split("|");
if (!helpers.is_valid_packet(fields)) {
l.log('Bad line: ' + fields + ' in msg "' + metrics[midx] +'"');
counters[bad_lines_seen]++;
stats.messages.bad_lines_seen++;
continue;
}
if (fields[2]) {
sampleRate = Number(fields[2].match(/^@([\d\.]+)/)[1]);
}
const metric_type = fields[1].trim();
if (metric_type === "ms") {
if (! timers[key]) {
timers[key] = [];
timer_counters[key] = 0;
}
timers[key].push(Number(fields[0] || 0));
timer_counters[key] += (1 / sampleRate);
} else if (metric_type === "g") {
// if deleteGauges is true reset the max TTL to its initial value
if (conf.deleteGauges) {
gaugesTTL[key] = conf.gaugesMaxTTL;
}
if (gauges[key] && fields[0].match(/^[-+]/)) {
gauges[key] += Number(fields[0] || 0);
} else {
gauges[key] = Number(fields[0] || 0);
}
} else if (metric_type === "s") {
if (! sets[key]) {
sets[key] = new set.Set();
}
sets[key].insert(fields[0] || '0');
} else {
if (! counters[key]) {
counters[key] = 0;
}
counters[key] += Number(fields[0] || 1) * (1 / sampleRate);
}
}
}
stats.messages.last_msg_seen = Math.round(new Date().getTime() / 1000);
};
// If config.servers isn't specified, use the top-level config for backwards-compatibility
const server_config = config.servers || [config];
for (let i = 0; i < server_config.length; i++) {
// The default server is UDP
const server = server_config[i].server || './servers/udp';
startServer(server_config[i], server, handlePacket);
}
mgmt_server.start(
config,
function(cmd, parameters, stream) {
switch(cmd) {
case "help":
stream.write("Commands: stats, counters, timers, gauges, delcounters, deltimers, delgauges, health, config, quit\n\n");
break;
case "config":
helpers.writeConfig(config, stream);
break;
case "health":
if (parameters.length > 0) {
const cmdaction = parameters[0].toLowerCase();
if (cmdaction === 'up') {
healthStatus = 'up';
} else if (cmdaction === 'down') {
healthStatus = 'down';
}
}
stream.write("health: " + healthStatus + "\n");
break;
case "stats":
const now = Math.round(new Date().getTime() / 1000);
const uptime = now - startup_time;
stream.write("uptime: " + uptime + "\n");
const stat_writer = function(group, metric, val) {
let delta;
if (metric.match("^last_")) {
delta = now - val;
}
else {
delta = val;
}
stream.write(group + "." + metric + ": " + delta + "\n");
};
// Loop through the base stats
for (const group in stats) {
for (const metric in stats[group]) {
stat_writer(group, metric, stats[group][metric]);
}
}
backendEvents.once('status', function() {
stream.write("END\n\n");
});
// Let each backend contribute its status
backendEvents.emit('status', function(err, name, stat, val) {
if (err) {
l.log("Failed to read stats for backend " +
name + ": " + err);
} else {
stat_writer(name, stat, val);
}
});
break;
case "counters":
stream.write(util.inspect(counters) + "\n");
stream.write("END\n\n");
break;
case "timers":
stream.write(util.inspect(timers) + "\n");
stream.write("END\n\n");
break;
case "gauges":
stream.write(util.inspect(gauges) + "\n");
stream.write("END\n\n");
break;
case "delcounters":
mgmt.delete_stats(counters, parameters, stream);
break;
case "deltimers":
mgmt.delete_stats(timers, parameters, stream);
break;
case "delgauges":
mgmt.delete_stats(gauges, parameters, stream);
break;
case "quit":
stream.end();
break;
default:
stream.write("ERROR\n");
break;
}
},
function(err, stream) {
l.log('MGMT: Caught ' + err +', Moving on', 'WARNING');
}
);
serversLoaded = true;
util.log("server is up", "INFO");
pctThreshold = config.percentThreshold || 90;
if (!Array.isArray(pctThreshold)) {
pctThreshold = [ pctThreshold ]; // listify percentiles so single values work the same
}
flushInterval = Number(config.flushInterval || 10000);
config.flushInterval = flushInterval;
if (config.backends) {
for (let j = 0; j < config.backends.length; j++) {
loadBackend(config, config.backends[j]);
}
} else {
// The default backend is graphite
loadBackend(config, './backends/graphite');
}
// Setup the flush timer
const flushInt = setTimeout(flushMetrics, getFlushTimeout(flushInterval));
if (keyFlushInterval > 0) {
const keyFlushPercent = Number((config.keyFlush && config.keyFlush.percent) || 100);
const keyFlushLog = config.keyFlush && config.keyFlush.log;
keyFlushInt = setInterval(function () {
const sortedKeys = [];
for (const key in keyCounter) {
sortedKeys.push([key, keyCounter[key]]);
}
sortedKeys.sort(function(a, b) { return b[1] - a[1]; });
let logMessage = "";
const timeString = (new Date()) + "";
// only show the top "keyFlushPercent" keys
for (let i = 0, e = sortedKeys.length * (keyFlushPercent / 100); i < e; i++) {
logMessage += timeString + " count=" + sortedKeys[i][1] + " key=" + sortedKeys[i][0] + "\n";
}
if (keyFlushLog) {
const logFile = fs.createWriteStream(keyFlushLog, {flags: 'a+'});
logFile.write(logMessage);
logFile.end();
} else {
process.stdout.write(logMessage);
}
// clear the counter
keyCounter = {};
}, keyFlushInterval);
}
}
});
process.on('exit', function () {
flushMetrics();
});
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。