diff --git a/README.md b/README.md index bd931f20..bc151752 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -StatsD [![Build Status](https://secure.travis-ci.org/etsy/statsd.png)](http://travis-ci.org/etsy/statsd) +StatsD [![Build Status](https://travis-ci.org/etsy/statsd.png?branch=backends-as-packages)](https://travis-ci.org/etsy/statsd) ====== A network daemon that runs on the [Node.js][node] platform and diff --git a/backends/graphite.js b/backends/graphite.js index 8dab60a2..1ce92c17 100644 --- a/backends/graphite.js +++ b/backends/graphite.js @@ -13,7 +13,10 @@ */ var net = require('net'), - util = require('util'); + logger = require('../lib/logger'); + +// this will be instantiated to the logger +var l; var debug; var flushInterval; @@ -46,12 +49,12 @@ var post_stats = function graphite_post_stats(statString) { var graphite = net.createConnection(graphitePort, graphiteHost); graphite.addListener('error', function(connectionException){ if (debug) { - util.log(connectionException); + l.log(connectionException); } }); graphite.on('connect', function() { var ts = Math.round(new Date().getTime() / 1000); - var namespace = globalNamespace.concat('statsd'); + var namespace = globalNamespace.concat(prefixStats); statString += namespace.join(".") + '.graphiteStats.last_exception ' + last_exception + ' ' + ts + "\n"; statString += namespace.join(".") + '.graphiteStats.last_flush ' + last_flush + ' ' + ts + "\n"; this.write(statString); @@ -60,7 +63,7 @@ var post_stats = function graphite_post_stats(statString) { }); } catch(e){ if (debug) { - util.log(e); + l.log(e); } graphiteStats.last_exception = Math.round(new Date().getTime() / 1000); } @@ -122,12 +125,12 @@ var flush_stats = function graphite_flush(ts, metrics) { numStats += 1; } - var namespace = globalNamespace.concat('statsd'); + var namespace = globalNamespace.concat(prefixStats); if (legacyNamespace === true) { - statString += 'statsd.numStats ' + numStats + ts_suffix; - statString += 'stats.statsd.graphiteStats.calculationtime ' + (Date.now() - starttime) + ts_suffix; + statString += prefixStats + '.numStats ' + numStats + ts_suffix; + statString += 'stats.' + prefixStats + '.graphiteStats.calculationtime ' + (Date.now() - starttime) + ts_suffix; for (key in statsd_metrics) { - statString += 'stats.statsd.' + key + ' ' + statsd_metrics[key] + ts_suffix; + statString += 'stats.' + prefixStats + '.' + key + ' ' + statsd_metrics[key] + ts_suffix; } } else { statString += namespace.join(".") + '.numStats ' + numStats + ts_suffix; @@ -145,12 +148,13 @@ var flush_stats = function graphite_flush(ts, metrics) { }; var backend_status = function graphite_status(writeCb) { - for (stat in graphiteStats) { + for (var stat in graphiteStats) { writeCb(null, 'graphite', stat, graphiteStats[stat]); } }; exports.init = function graphite_init(startup_time, config, events) { + l = new logger.Logger(config.log || {}); debug = config.debug; graphiteHost = config.graphiteHost; graphitePort = config.graphitePort; diff --git a/exampleConfig.js b/exampleConfig.js index 90584ae8..c0b0316d 100644 --- a/exampleConfig.js +++ b/exampleConfig.js @@ -35,6 +35,8 @@ Optional Variables: percent: percentage of frequent keys to log [%, default: 100] log: location of log file for frequent keys [default: STDOUT] deleteCounters: don't send values to graphite for inactive counters, as opposed to sending 0 [default: false] + prefixStats: prefix to use for the statsd statistics data for this running instance of statsd [default: statsd] + applies to both legacy and new namespacing console: prettyprint: whether to prettyprint the console backend @@ -59,14 +61,12 @@ Optional Variables: e.g. [ { host: '10.10.10.10', port: 8125 }, { host: 'observer', port: 88125 } ] - repeaterProtocol: whether to use udp4 or udp4 for repeaters. + repeaterProtocol: whether to use udp4 or udp6 for repeaters. ["udp4" or "udp6", default: "udp4"] */ { graphitePort: 2003 -, graphiteHost: "graphite.host.com" +, graphiteHost: "graphite.example.com" , port: 8125 , backends: [ "./backends/graphite" ] -, repeater: [ { host: "10.8.3.214", port: 8125 } ] -, repeaterProtocol: "udp4" } diff --git a/stats.js b/stats.js index 081a4f8a..721a634d 100644 --- a/stats.js +++ b/stats.js @@ -11,10 +11,7 @@ var dgram = require('dgram') // initialize data structures with defaults for statsd stats var keyCounter = {}; -var counters = { - "statsd.packets_received": 0, - "statsd.bad_lines_seen": 0 -}; +var counters = {}; var timers = {}; var gauges = {}; var sets = {}; @@ -61,7 +58,7 @@ function flushMetrics() { backendEvents.once('flush', function clear_metrics(ts, metrics) { // Clear the counters conf.deleteCounters = conf.deleteCounters || false; - for (key in metrics.counters) { + for (var key in metrics.counters) { if (conf.deleteCounters) { delete(metrics.counters[key]); } else { @@ -70,12 +67,12 @@ function flushMetrics() { } // Clear the timers - for (key in metrics.timers) { + for (var key in metrics.timers) { metrics.timers[key] = []; } // Clear the sets - for (key in metrics.sets) { + for (var key in metrics.sets) { metrics.sets[key] = new set.Set(); } }); @@ -117,6 +114,17 @@ config.configFile(process.argv[2], function (config, oldConfig) { }, config.debugInterval || 10000); } + // setup config for stats prefix + prefixStats = config.prefixStats; + prefixStats = prefixStats !== undefined ? prefixStats : "statsd"; + //setup the names for the stats stored in counters{} + bad_lines_seen = prefixStats + ".bad_lines_seen"; + packets_received = prefixStats + ".packets_received"; + + //now set to zero so we can increment them + counters[bad_lines_seen] = 0; + counters[packets_received] = 0; + if (server === undefined) { // key counting @@ -124,10 +132,10 @@ config.configFile(process.argv[2], function (config, oldConfig) { server = dgram.createSocket('udp4', function (msg, rinfo) { backendEvents.emit('packet', msg, rinfo); - counters["statsd.packets_received"]++; + counters[packets_received]++; var metrics = msg.toString().split("\n"); - for (midx in metrics) { + for (var midx in metrics) { if (config.dumpMessages) { l.log(metrics[midx].toString()); } @@ -153,7 +161,7 @@ config.configFile(process.argv[2], function (config, oldConfig) { var fields = bits[i].split("|"); if (fields[1] === undefined) { l.log('Bad line: ' + fields + ' in msg "' + metrics[midx] +'"'); - counters["statsd.bad_lines_seen"]++; + counters[bad_lines_seen]++; stats['messages']['bad_lines_seen']++; continue; } @@ -175,7 +183,7 @@ config.configFile(process.argv[2], function (config, oldConfig) { sampleRate = Number(fields[2].match(/^@([\d\.]+)/)[1]); } else { l.log('Bad line: ' + fields + ' in msg "' + metrics[midx] +'"; has invalid sample rate'); - counters["statsd.bad_lines_seen"]++; + counters[bad_lines_seen]++; stats['messages']['bad_lines_seen']++; continue; } @@ -227,8 +235,8 @@ config.configFile(process.argv[2], function (config, oldConfig) { }; // Loop through the base stats - for (group in stats) { - for (metric in stats[group]) { + for (var group in stats) { + for (var metric in stats[group]) { stat_writer(group, metric, stats[group][metric]); } } @@ -265,7 +273,7 @@ config.configFile(process.argv[2], function (config, oldConfig) { break; case "delcounters": - for (index in cmdline) { + for (var index in cmdline) { delete counters[cmdline[index]]; stream.write("deleted: " + cmdline[index] + "\n"); } @@ -273,7 +281,7 @@ config.configFile(process.argv[2], function (config, oldConfig) { break; case "deltimers": - for (index in cmdline) { + for (var index in cmdline) { delete timers[cmdline[index]]; stream.write("deleted: " + cmdline[index] + "\n"); } @@ -281,7 +289,7 @@ config.configFile(process.argv[2], function (config, oldConfig) { break; case "delgauges": - for (index in cmdline) { + for (var index in cmdline) { delete gauges[cmdline[index]]; stream.write("deleted: " + cmdline[index] + "\n"); } @@ -333,7 +341,7 @@ config.configFile(process.argv[2], function (config, oldConfig) { var key; var sortedKeys = []; - for (key in keyCounter) { + for (var key in keyCounter) { sortedKeys.push([key, keyCounter[key]]); } diff --git a/test/graphite_legacy_tests_statsprefix.js b/test/graphite_legacy_tests_statsprefix.js new file mode 100644 index 00000000..4dcf1986 --- /dev/null +++ b/test/graphite_legacy_tests_statsprefix.js @@ -0,0 +1,230 @@ +var fs = require('fs'), + net = require('net'), + temp = require('temp'), + spawn = require('child_process').spawn, + util = require('util'), + urlparse = require('url').parse, + _ = require('underscore'), + dgram = require('dgram'), + qsparse = require('querystring').parse, + http = require('http'); + + +var writeconfig = function(text,worker,cb,obj){ + temp.open({suffix: '-statsdconf.js'}, function(err, info) { + if (err) throw err; + fs.writeSync(info.fd, text); + fs.close(info.fd, function(err) { + if (err) throw err; + worker(info.path,cb,obj); + }); + }); +} + +var array_contents_are_equal = function(first,second){ + var intlen = _.intersection(first,second).length; + var unlen = _.union(first,second).length; + return (intlen == unlen) && (intlen == first.length); +} + +var statsd_send = function(data,sock,host,port,cb){ + send_data = new Buffer(data); + sock.send(send_data,0,send_data.length,port,host,function(err,bytes){ + if (err) { + throw err; + } + cb(); + }); +} + +// keep collecting data until a specified timeout period has elapsed +// this will let us capture all data chunks so we don't miss one +var collect_for = function(server,timeout,cb){ + var received = []; + var in_flight = 0; + var timed_out = false; + var collector = function(req,res){ + in_flight += 1; + var body = ''; + req.on('data',function(data){ body += data; }); + req.on('end',function(){ + received = received.concat(body.split("\n")); + in_flight -= 1; + if((in_flight < 1) && timed_out){ + server.removeListener('request',collector); + cb(received); + } + }); + } + + setTimeout(function (){ + timed_out = true; + if((in_flight < 1)) { + server.removeListener('connection',collector); + cb(received); + } + },timeout); + + server.on('connection',collector); +} + +module.exports = { + setUp: function (callback) { + this.testport = 31337; + this.myflush = 200; + var configfile = "{graphService: \"graphite\"\n\ + , batch: 200 \n\ + , flushInterval: " + this.myflush + " \n\ + , percentThreshold: 90\n\ + , port: 8125\n\ + , dumpMessages: false \n\ + , debug: false\n\ + , prefixStats: \"statsprefix\"\n\ + , graphitePort: " + this.testport + "\n\ + , graphiteHost: \"127.0.0.1\"}"; + + this.acceptor = net.createServer(); + this.acceptor.listen(this.testport); + this.sock = dgram.createSocket('udp4'); + + this.server_up = true; + this.ok_to_die = false; + this.exit_callback_callback = process.exit; + + writeconfig(configfile,function(path,cb,obj){ + obj.path = path; + obj.server = spawn('node',['stats.js', path]); + obj.exit_callback = function (code) { + obj.server_up = false; + if(!obj.ok_to_die){ + console.log('node server unexpectedly quit with code: ' + code); + process.exit(1); + } + else { + obj.exit_callback_callback(); + } + }; + obj.server.on('exit', obj.exit_callback); + obj.server.stderr.on('data', function (data) { + console.log('stderr: ' + data.toString().replace(/\n$/,'')); + }); + /* + obj.server.stdout.on('data', function (data) { + console.log('stdout: ' + data.toString().replace(/\n$/,'')); + }); + */ + obj.server.stdout.on('data', function (data) { + // wait until server is up before we finish setUp + if (data.toString().match(/server is up/)) { + cb(); + } + }); + + },callback,this); + }, + tearDown: function (callback) { + this.sock.close(); + this.acceptor.close(); + this.ok_to_die = true; + if(this.server_up){ + this.exit_callback_callback = callback; + this.server.kill(); + } else { + callback(); + } + }, + + send_well_formed_posts: function (test) { + test.expect(2); + + // we should integrate a timeout into this + this.acceptor.once('connection',function(c){ + var body = ''; + c.on('data',function(d){ body += d; }); + c.on('end',function(){ + var rows = body.split("\n"); + var entries = _.map(rows, function(x) { + var chunks = x.split(' '); + var data = {}; + data[chunks[0]] = chunks[1]; + return data; + }); + test.ok(_.include(_.map(entries,function(x) { return _.keys(x)[0] }),'statsprefix.numStats'),'graphite output includes numStats'); + test.equal(_.find(entries, function(x) { return _.keys(x)[0] == 'statsprefix.numStats' })['statsprefix.numStats'],2); + test.done(); + }); + }); + }, + + timers_are_valid: function (test) { + test.expect(3); + + var testvalue = 100; + var me = this; + this.acceptor.once('connection',function(c){ + statsd_send('a_test_value:' + testvalue + '|ms',me.sock,'127.0.0.1',8125,function(){ + collect_for(me.acceptor,me.myflush*2,function(strings){ + test.ok(strings.length > 0,'should receive some data'); + var hashes = _.map(strings, function(x) { + var chunks = x.split(' '); + var data = {}; + data[chunks[0]] = chunks[1]; + return data; + }); + var numstat_test = function(post){ + var mykey = 'statsprefix.numStats'; + return _.include(_.keys(post),mykey) && (post[mykey] == 3); + }; + test.ok(_.any(hashes,numstat_test), 'statsprefix.numStats should be 1'); + + var testtimervalue_test = function(post){ + var mykey = 'stats.timers.a_test_value.mean_90'; + return _.include(_.keys(post),mykey) && (post[mykey] == testvalue); + }; + test.ok(_.any(hashes,testtimervalue_test), 'stats.timers.a_test_value.mean should be ' + testvalue); + + test.done(); + }); + }); + }); + }, + + counts_are_valid: function (test) { + test.expect(4); + + var testvalue = 100; + var me = this; + this.acceptor.once('connection',function(c){ + statsd_send('a_test_value:' + testvalue + '|c',me.sock,'127.0.0.1',8125,function(){ + collect_for(me.acceptor,me.myflush*2,function(strings){ + test.ok(strings.length > 0,'should receive some data'); + var hashes = _.map(strings, function(x) { + var chunks = x.split(' '); + var data = {}; + data[chunks[0]] = chunks[1]; + return data; + }); + var numstat_test = function(post){ + var mykey = 'statsprefix.numStats'; + return _.include(_.keys(post),mykey) && (post[mykey] == 3); + }; + test.ok(_.any(hashes,numstat_test), 'statsprefix.numStats should be 1'); + + var testavgvalue_test = function(post){ + var mykey = 'stats.a_test_value'; + return _.include(_.keys(post),mykey) && (post[mykey] == (testvalue/(me.myflush / 1000))); + }; + test.ok(_.any(hashes,testavgvalue_test), 'stats.a_test_value should be ' + (testvalue/(me.myflush / 1000))); + + var testcountvalue_test = function(post){ + var mykey = 'stats_counts.a_test_value'; + return _.include(_.keys(post),mykey) && (post[mykey] == testvalue); + }; + test.ok(_.any(hashes,testcountvalue_test), 'stats_counts.a_test_value should be ' + testvalue); + + test.done(); + }); + }); + }); + } +} diff --git a/test/graphite_tests_statsprefix.js b/test/graphite_tests_statsprefix.js new file mode 100644 index 00000000..762bc44a --- /dev/null +++ b/test/graphite_tests_statsprefix.js @@ -0,0 +1,264 @@ +var fs = require('fs'), + net = require('net'), + temp = require('temp'), + spawn = require('child_process').spawn, + util = require('util'), + urlparse = require('url').parse, + _ = require('underscore'), + dgram = require('dgram'), + qsparse = require('querystring').parse, + http = require('http'); + + +var writeconfig = function(text,worker,cb,obj){ + temp.open({suffix: '-statsdconf.js'}, function(err, info) { + if (err) throw err; + fs.writeSync(info.fd, text); + fs.close(info.fd, function(err) { + if (err) throw err; + worker(info.path,cb,obj); + }); + }); +} + +var array_contents_are_equal = function(first,second){ + var intlen = _.intersection(first,second).length; + var unlen = _.union(first,second).length; + return (intlen == unlen) && (intlen == first.length); +} + +var statsd_send = function(data,sock,host,port,cb){ + send_data = new Buffer(data); + sock.send(send_data,0,send_data.length,port,host,function(err,bytes){ + if (err) { + throw err; + } + cb(); + }); +} + +// keep collecting data until a specified timeout period has elapsed +// this will let us capture all data chunks so we don't miss one +var collect_for = function(server,timeout,cb){ + var received = []; + var in_flight = 0; + var timed_out = false; + var collector = function(req,res){ + in_flight += 1; + var body = ''; + req.on('data',function(data){ body += data; }); + req.on('end',function(){ + received = received.concat(body.split("\n")); + in_flight -= 1; + if((in_flight < 1) && timed_out){ + server.removeListener('request',collector); + cb(received); + } + }); + } + + setTimeout(function (){ + timed_out = true; + if((in_flight < 1)) { + server.removeListener('connection',collector); + cb(received); + } + },timeout); + + server.on('connection',collector); +} + +module.exports = { + setUp: function (callback) { + this.testport = 31337; + this.myflush = 200; + var configfile = "{graphService: \"graphite\"\n\ + , batch: 200 \n\ + , flushInterval: " + this.myflush + " \n\ + , percentThreshold: 90\n\ + , port: 8125\n\ + , dumpMessages: false \n\ + , debug: false\n\ + , prefixStats: \"statsprefix\"\n\ + , graphite: { legacyNamespace: false }\n\ + , graphitePort: " + this.testport + "\n\ + , graphiteHost: \"127.0.0.1\"}"; + + this.acceptor = net.createServer(); + this.acceptor.listen(this.testport); + this.sock = dgram.createSocket('udp4'); + + this.server_up = true; + this.ok_to_die = false; + this.exit_callback_callback = process.exit; + + writeconfig(configfile,function(path,cb,obj){ + obj.path = path; + obj.server = spawn('node',['stats.js', path]); + obj.exit_callback = function (code) { + obj.server_up = false; + if(!obj.ok_to_die){ + console.log('node server unexpectedly quit with code: ' + code); + process.exit(1); + } + else { + obj.exit_callback_callback(); + } + }; + obj.server.on('exit', obj.exit_callback); + obj.server.stderr.on('data', function (data) { + console.log('stderr: ' + data.toString().replace(/\n$/,'')); + }); + /* + obj.server.stdout.on('data', function (data) { + console.log('stdout: ' + data.toString().replace(/\n$/,'')); + }); + */ + obj.server.stdout.on('data', function (data) { + // wait until server is up before we finish setUp + if (data.toString().match(/server is up/)) { + cb(); + } + }); + + },callback,this); + }, + tearDown: function (callback) { + this.sock.close(); + this.acceptor.close(); + this.ok_to_die = true; + if(this.server_up){ + this.exit_callback_callback = callback; + this.server.kill(); + } else { + callback(); + } + }, + + send_well_formed_posts: function (test) { + test.expect(2); + + // we should integrate a timeout into this + this.acceptor.once('connection',function(c){ + var body = ''; + c.on('data',function(d){ body += d; }); + c.on('end',function(){ + var rows = body.split("\n"); + var entries = _.map(rows, function(x) { + var chunks = x.split(' '); + var data = {}; + data[chunks[0]] = chunks[1]; + return data; + }); + test.ok(_.include(_.map(entries,function(x) { return _.keys(x)[0] }),'stats.statsprefix.numStats'),'graphite output includes numStats'); + test.equal(_.find(entries, function(x) { return _.keys(x)[0] == 'stats.statsprefix.numStats' })['stats.statsprefix.numStats'],2); + test.done(); + }); + }); + }, + + send_malformed_post: function (test) { + test.expect(3); + + var testvalue = 1; + var me = this; + this.acceptor.once('connection',function(c){ + statsd_send('a_bad_test_value|z',me.sock,'127.0.0.1',8125,function(){ + collect_for(me.acceptor,me.myflush*2,function(strings){ + test.ok(strings.length > 0,'should receive some data'); + var hashes = _.map(strings, function(x) { + var chunks = x.split(' '); + var data = {}; + data[chunks[0]] = chunks[1]; + return data; + }); + var numstat_test = function(post){ + var mykey = 'stats.statsprefix.numStats'; + return _.include(_.keys(post),mykey) && (post[mykey] == 2); + }; + test.ok(_.any(hashes,numstat_test), 'statsprefix.numStats should be 0'); + + var bad_lines_seen_value_test = function(post){ + var mykey = 'stats.counters.statsprefix.bad_lines_seen.count'; + return _.include(_.keys(post),mykey) && (post[mykey] == testvalue); + }; + test.ok(_.any(hashes,bad_lines_seen_value_test), 'stats.counters.statsprefix.bad_lines_seen.count should be ' + testvalue); + + test.done(); + }); + }); + }); + }, + + timers_are_valid: function (test) { + test.expect(3); + + var testvalue = 100; + var me = this; + this.acceptor.once('connection',function(c){ + statsd_send('a_test_value:' + testvalue + '|ms',me.sock,'127.0.0.1',8125,function(){ + collect_for(me.acceptor,me.myflush*2,function(strings){ + test.ok(strings.length > 0,'should receive some data'); + var hashes = _.map(strings, function(x) { + var chunks = x.split(' '); + var data = {}; + data[chunks[0]] = chunks[1]; + return data; + }); + var numstat_test = function(post){ + var mykey = 'stats.statsprefix.numStats'; + return _.include(_.keys(post),mykey) && (post[mykey] == 3); + }; + test.ok(_.any(hashes,numstat_test), 'stats.statsprefix.numStats should be 1'); + + var testtimervalue_test = function(post){ + var mykey = 'stats.timers.a_test_value.mean_90'; + return _.include(_.keys(post),mykey) && (post[mykey] == testvalue); + }; + test.ok(_.any(hashes,testtimervalue_test), 'stats.timers.a_test_value.mean should be ' + testvalue); + + test.done(); + }); + }); + }); + }, + + counts_are_valid: function (test) { + test.expect(4); + + var testvalue = 100; + var me = this; + this.acceptor.once('connection',function(c){ + statsd_send('a_test_value:' + testvalue + '|c',me.sock,'127.0.0.1',8125,function(){ + collect_for(me.acceptor,me.myflush*2,function(strings){ + test.ok(strings.length > 0,'should receive some data'); + var hashes = _.map(strings, function(x) { + var chunks = x.split(' '); + var data = {}; + data[chunks[0]] = chunks[1]; + return data; + }); + var numstat_test = function(post){ + var mykey = 'stats.statsprefix.numStats'; + return _.include(_.keys(post),mykey) && (post[mykey] == 3); + }; + test.ok(_.any(hashes,numstat_test), 'statsprefix.numStats should be 3'); + + var testavgvalue_test = function(post){ + var mykey = 'stats.counters.a_test_value.rate'; + return _.include(_.keys(post),mykey) && (post[mykey] == (testvalue/(me.myflush / 1000))); + }; + test.ok(_.any(hashes,testavgvalue_test), 'a_test_value.rate should be ' + (testvalue/(me.myflush / 1000))); + + var testcountvalue_test = function(post){ + var mykey = 'stats.counters.a_test_value.count'; + return _.include(_.keys(post),mykey) && (post[mykey] == testvalue); + }; + test.ok(_.any(hashes,testcountvalue_test), 'a_test_value.count should be ' + testvalue); + + test.done(); + }); + }); + }); + } +}