Thursday, October 31, 2013

Using Node.js streams to massage data into the format you want

Google provides some pretty cool flu data in CSV format, and I wanted to display that in a chart at Dash. However, the raw data isn't quite right for my needs:
  1. It has a bunch of intro/header text (copyright stuff, description of the data, etc), and Dash needs just the raw data.
  2. It shows dozens of states/regions/cities, and I just want to show overall U.S. data and my home state.
Fortunately, Dash can read data from any publicly accessible endpoint, so I decided to throw together a quick Node.js app to massage the data into what I needed. The most straightforward solution was probably to load the whole file, read through it line by line, build up an array of data, then write it out. And since the data feed is currently just under 400KB, maybe that would have been alright. But a better pattern (and more fun, IMO) is to take advantage of Node Streams. As long as we use streams throughout the entire process, we can make sure that only a small buffer is kept in memory at any given time.

If you just want to see the full app, it's on GitHub. Otherwise, read on to see my thought process.

Filter out the intro/header text

First, we'll write a stream that filters out the copyright/overview stuff and passes on the rest:

var stream = require('stream')
  , util = require('util')

function CleanIntro(options) {
  stream.Transform.call(this, options)
}

util.inherits(CleanIntro, stream.Transform)

CleanIntro.prototype._transform = function (chunk, enc, cb) {
  if (this.readingData) {
    this.push(chunk, enc)
  } else {
    // Ignore all text until we find a line beginning with 'Date,''
    var start = chunk.toString().search(/^Date,/m)
    if (start !== -1) {
      this.readingData = true
      this.push(chunk.slice(start), enc)
    }
  }
  cb()
}

A Transform stream simply takes data that was piped in from another stream, does whatever it wants to it, then pushes whatever it wants back out. In our case, we're just ignoring anything before the actual data begins, then pushing the rest of the data back out. Easy.

Parse the CSV data

Now that we have a filter to get just the raw CSV data, we can start parsing it. There are lots of CSV parsing libraries out there; I like csv-stream because, well, it's a stream. So our basic process is to make the HTTP request, pipe it to our header-cleaning filter, then pipe it to csv-stream and start working with the data:
var request = require('request')
  , csv = require('csv-stream')
  , util = require('util')
  , _ = require('lodash')
  , moment = require('moment')
  , OutStream = require('./out-stream')
  , CleanIntroFilter = require('./clean-intro-filter')

// Returns a Stream that emits CSV records from Google Flu Trends.
// options:
//   - regions: an array of regions for which data should be generated.
//     See http://www.google.org/flutrends/us/data.txt for possible values
module.exports = function (options) {
  options = _.extend({
    regions: ['United States']
  }, options)

  var earliest = moment().subtract('years', 1)

  request('http://www.google.org/flutrends/us/data.txt')
    .pipe(new CleanIntroFilter())
    .pipe(csv.createStream({}))
    .on('error',function(err){
        // Oops, got an error
    })
    .on('data',function(data) {
      var date = moment(data.Date)

      // Only return data from the past year
      if (date.isAfter(earliest) || date.isSame(earliest)) {
        // Let's build the output String...
        console.log(data.Date + ',' + _.map(options.regions, function (region) {
          return data[region]
        }).join())
      }
    })
    .on('end', function () {
      // Okay we're done, now what?
    })
}

Alright, now we're getting close. We've built the CSV output, but now what do we do with it? Push it all into an array and return that? NO! Remember, we'll lose the slim memory benefits of streams if we don't keep using them the whole way through.

Write out to another Stream

Instead, let's just make our own writeable stream:
var stream = require('stream')

var OutStream = function() {
  stream.Transform.call(this,{objectMode: false})
}

OutStream.prototype = Object.create(
  stream.Transform.prototype, {constructor: {value: OutStream}} )

OutStream.prototype._transform = function(chunk, encoding, callback) {
  this.push(chunk, encoding)
  callback && callback()
}

OutStream.prototype.write = function () {
  this._transform.apply(this, arguments)
}

OutStream.prototype.end = function () {
  this._transform.apply(this, arguments)
  this.emit('end')
}

And now our parsing function can return that stream and write to it:
module.exports = function (options) {
  options = _.extend({
    regions: ['United States']
  }, options)

  var out = new OutStream()
  out.write('Date,' + options.regions.join())

  var earliest = moment().subtract('years', 1)

  request('http://www.google.org/flutrends/us/data.txt')
    .pipe(new CleanIntroFilter())
    .pipe(csv.createStream({}))
    .on('error',function(err){
        out.emit('error', err)
    })
    .on('data',function(data) {
      var date = moment(data.Date)

      // Only return data from the past year
      if (date.isAfter(earliest) || date.isSame(earliest)) {
        out.write(data.Date + ',' + _.map(options.regions, function (region) {
          return data[region]
        }).join())
      }
    })
    .on('end', function () {
      out.end()
    })

  return out
}

Serve it up

Finally, we'll use Express to expose our data as a web endpoint:
var express = require('express')
  , data = require('./lib/data')
  , _ = require('lodash')

var app = express()

app.get('/', function(req, res){
  var options = {}

  if (req.query.region) {
    options.regions = _.isArray(req.query.region) ? req.query.region : [req.query.region]
  }

  res.setHeader('Content-Type', 'text/csv')

  data(options)
    .on('data', function (data) {
      res.write(data)
      res.write('\n')
    })
    .on('end', function (data) {
      res.end()
    })
    .on('error', function (err) {
      console.log('error: ', error)
    })
})

var port = process.env.PORT || 5000
app.listen(port)
console.log('Listening on port ' + port)

Once again, note that as soon as we get data from our stream, we manipulate and write it out to another stream (the HTTP response, in this case). This keeps us from holding a lot of data in memory unnecessarily.

Now if we fire up the server, we can use curl to see it in action:

$ curl 'http://localhost:5000'
Date,United States
2012-11-04,2492
2012-11-11,2913
2012-11-18,3040
2012-11-25,3641
2012-12-02,4427
[and so on]

$ curl 'http://localhost:5000?region=United%20States&region=Pennsylvania'
Date,United States,Pennsylvania
2012-11-04,2492,2579
2012-11-11,2913,2889
2012-11-18,3040,2785
2012-11-25,3641,3248
2012-12-02,4427,3679
[and so on]

As long the server is running someplace that is accessible to the public, we can head on over to Dash and plug it into a Custom Chart widget, giving us something like this:
Hey, looks like December and January are big months for the flu in the U.S. Who knew?

Want to try this yourself? The full source for this app is on GitHub, along with step-by-step instructions for running the project and creating a widget in Dash. Enjoy!

Wednesday, October 16, 2013

Cut the HealthCare.gov people some slack

On October 1, 2013, HealthCare.gov was opened to the nation. As one of the more tangible aspects of the Affordable Care Act (aka ACA, Obamacare), the glitches it has experienced are getting lots of negative attention. It's been described as an "inexcusable mess" and a "disaster". I'm not going to discuss the merits of the ACA here. But as someone who spends every day building software, I think the criticisms of the HealthCare.gov application have been unfair. Here are a few reasons why we should cut them some slack.

Most webapps aren't immediately released to millions of users

When Google or Facebook are releasing a new app, do you think they just flip a switch and release it to the world? Absolutely not! They slowly release it to beta testers, or add the feature to a subset of users. They encounter bugs, fix them, slowly scale up their infrastructure, and wait until they've seen it succeed before opening it up to the general public.

HealthCare.gov, on the other hand, launched to the entire nation at once. It didn't just have to deal with traffic from thousands (hundreds of thousands? millions?) of people looking for health plans for themselves; I'm sure a lot of curious people (like myself) went there just to check it out. There was no chance to work out the bugs, and no chance to gradually scale the infrastructure. This was almost guaranteed to be a problem.

Now, perhaps they should have slowly rolled out the site. I suspect, however, that the reasons were political. How do you decide who gets to sign up for healthcare first? By state? By last name? Random drawing? People who signed up ahead of time? And how do you explain to everyone else that they have to wait? People expect this from Google, but a highly charged political issue like the ACA makes it dicey.

Much of what happens in the exchange is out of their control

HealthCare.gov is an extremely distributed application. John McDonough, a health policy professor at the Harvard School of Public Health, described it as:
"an enormously complex task. The number of systems that have to work together – federal, state, insurance companies, the Internal Revenue System – the number of systems that have to align here is pretty daunting."
If there's a bug in the IRS endpoints, there's going to be a problem. If one of the hundreds of private insurance companies' systems can't handle the increased workload, there's going to be a problem. And as someone who has built plenty of distributed systems, I know that end users don't say "oh, it must be a problem with system X that this app depends on". They think it's a problem with whatever webapp they're using, and don't care what's going on in the backend.

Nor should they care. But professional IT people should know better.

Most webapps aren't serving as a referendum on a heated national controversy

Frankly, there are a lot of people that want HealthCare.gov to fail primarily because they want the ACA to fail. And many people who oppose the ACA will still have a need to use HealthCare.gov. New Google apps are not typically tied to such a heated political issue, and are generally not used by people who don't want to use them. 

So let's have some empathy for the people who build HealthCare.gov

This application was an enormous undertaking, with many challenges that few (if any) systems out there need to deal with. My expectation and hope is that the bugs will be worked out in the coming weeks, as happens with any new system. Whether the Affordable Care Act will be a success is yet to be seen; but the healthcare exchanges that support it were built by regular IT professionals that are just trying to do their jobs. Let's cut them some slack.