Thursday, October 31, 2013

Using Node.js streams to massage data into the format you want

Google provides some pretty cool flu data in CSV format, and I wanted to display that in a chart at Dash. However, the raw data isn't quite right for my needs:
  1. It has a bunch of intro/header text (copyright stuff, description of the data, etc), and Dash needs just the raw data.
  2. It shows dozens of states/regions/cities, and I just want to show overall U.S. data and my home state.
Fortunately, Dash can read data from any publicly accessible endpoint, so I decided to throw together a quick Node.js app to massage the data into what I needed. The most straightforward solution was probably to load the whole file, read through it line by line, build up an array of data, then write it out. And since the data feed is currently just under 400KB, maybe that would have been alright. But a better pattern (and more fun, IMO) is to take advantage of Node Streams. As long as we use streams throughout the entire process, we can make sure that only a small buffer is kept in memory at any given time.

If you just want to see the full app, it's on GitHub. Otherwise, read on to see my thought process.

Filter out the intro/header text

First, we'll write a stream that filters out the copyright/overview stuff and passes on the rest:

var stream = require('stream')
  , util = require('util')

function CleanIntro(options) {
  stream.Transform.call(this, options)
}

util.inherits(CleanIntro, stream.Transform)

CleanIntro.prototype._transform = function (chunk, enc, cb) {
  if (this.readingData) {
    this.push(chunk, enc)
  } else {
    // Ignore all text until we find a line beginning with 'Date,''
    var start = chunk.toString().search(/^Date,/m)
    if (start !== -1) {
      this.readingData = true
      this.push(chunk.slice(start), enc)
    }
  }
  cb()
}

A Transform stream simply takes data that was piped in from another stream, does whatever it wants to it, then pushes whatever it wants back out. In our case, we're just ignoring anything before the actual data begins, then pushing the rest of the data back out. Easy.

Parse the CSV data

Now that we have a filter to get just the raw CSV data, we can start parsing it. There are lots of CSV parsing libraries out there; I like csv-stream because, well, it's a stream. So our basic process is to make the HTTP request, pipe it to our header-cleaning filter, then pipe it to csv-stream and start working with the data:
var request = require('request')
  , csv = require('csv-stream')
  , util = require('util')
  , _ = require('lodash')
  , moment = require('moment')
  , OutStream = require('./out-stream')
  , CleanIntroFilter = require('./clean-intro-filter')

// Returns a Stream that emits CSV records from Google Flu Trends.
// options:
//   - regions: an array of regions for which data should be generated.
//     See http://www.google.org/flutrends/us/data.txt for possible values
module.exports = function (options) {
  options = _.extend({
    regions: ['United States']
  }, options)

  var earliest = moment().subtract('years', 1)

  request('http://www.google.org/flutrends/us/data.txt')
    .pipe(new CleanIntroFilter())
    .pipe(csv.createStream({}))
    .on('error',function(err){
        // Oops, got an error
    })
    .on('data',function(data) {
      var date = moment(data.Date)

      // Only return data from the past year
      if (date.isAfter(earliest) || date.isSame(earliest)) {
        // Let's build the output String...
        console.log(data.Date + ',' + _.map(options.regions, function (region) {
          return data[region]
        }).join())
      }
    })
    .on('end', function () {
      // Okay we're done, now what?
    })
}

Alright, now we're getting close. We've built the CSV output, but now what do we do with it? Push it all into an array and return that? NO! Remember, we'll lose the slim memory benefits of streams if we don't keep using them the whole way through.

Write out to another Stream

Instead, let's just make our own writeable stream:
var stream = require('stream')

var OutStream = function() {
  stream.Transform.call(this,{objectMode: false})
}

OutStream.prototype = Object.create(
  stream.Transform.prototype, {constructor: {value: OutStream}} )

OutStream.prototype._transform = function(chunk, encoding, callback) {
  this.push(chunk, encoding)
  callback && callback()
}

OutStream.prototype.write = function () {
  this._transform.apply(this, arguments)
}

OutStream.prototype.end = function () {
  this._transform.apply(this, arguments)
  this.emit('end')
}

And now our parsing function can return that stream and write to it:
module.exports = function (options) {
  options = _.extend({
    regions: ['United States']
  }, options)

  var out = new OutStream()
  out.write('Date,' + options.regions.join())

  var earliest = moment().subtract('years', 1)

  request('http://www.google.org/flutrends/us/data.txt')
    .pipe(new CleanIntroFilter())
    .pipe(csv.createStream({}))
    .on('error',function(err){
        out.emit('error', err)
    })
    .on('data',function(data) {
      var date = moment(data.Date)

      // Only return data from the past year
      if (date.isAfter(earliest) || date.isSame(earliest)) {
        out.write(data.Date + ',' + _.map(options.regions, function (region) {
          return data[region]
        }).join())
      }
    })
    .on('end', function () {
      out.end()
    })

  return out
}

Serve it up

Finally, we'll use Express to expose our data as a web endpoint:
var express = require('express')
  , data = require('./lib/data')
  , _ = require('lodash')

var app = express()

app.get('/', function(req, res){
  var options = {}

  if (req.query.region) {
    options.regions = _.isArray(req.query.region) ? req.query.region : [req.query.region]
  }

  res.setHeader('Content-Type', 'text/csv')

  data(options)
    .on('data', function (data) {
      res.write(data)
      res.write('\n')
    })
    .on('end', function (data) {
      res.end()
    })
    .on('error', function (err) {
      console.log('error: ', error)
    })
})

var port = process.env.PORT || 5000
app.listen(port)
console.log('Listening on port ' + port)

Once again, note that as soon as we get data from our stream, we manipulate and write it out to another stream (the HTTP response, in this case). This keeps us from holding a lot of data in memory unnecessarily.

Now if we fire up the server, we can use curl to see it in action:

$ curl 'http://localhost:5000'
Date,United States
2012-11-04,2492
2012-11-11,2913
2012-11-18,3040
2012-11-25,3641
2012-12-02,4427
[and so on]

$ curl 'http://localhost:5000?region=United%20States&region=Pennsylvania'
Date,United States,Pennsylvania
2012-11-04,2492,2579
2012-11-11,2913,2889
2012-11-18,3040,2785
2012-11-25,3641,3248
2012-12-02,4427,3679
[and so on]

As long the server is running someplace that is accessible to the public, we can head on over to Dash and plug it into a Custom Chart widget, giving us something like this:
Hey, looks like December and January are big months for the flu in the U.S. Who knew?

Want to try this yourself? The full source for this app is on GitHub, along with step-by-step instructions for running the project and creating a widget in Dash. Enjoy!

No comments:

Post a Comment