Thursday, October 31, 2013

Using Node.js streams to massage data into the format you want

Google provides some pretty cool flu data in CSV format, and I wanted to display that in a chart at Dash. However, the raw data isn't quite right for my needs:
  1. It has a bunch of intro/header text (copyright stuff, description of the data, etc), and Dash needs just the raw data.
  2. It shows dozens of states/regions/cities, and I just want to show overall U.S. data and my home state.
Fortunately, Dash can read data from any publicly accessible endpoint, so I decided to throw together a quick Node.js app to massage the data into what I needed. The most straightforward solution was probably to load the whole file, read through it line by line, build up an array of data, then write it out. And since the data feed is currently just under 400KB, maybe that would have been alright. But a better pattern (and more fun, IMO) is to take advantage of Node Streams. As long as we use streams throughout the entire process, we can make sure that only a small buffer is kept in memory at any given time.

If you just want to see the full app, it's on GitHub. Otherwise, read on to see my thought process.

Filter out the intro/header text

First, we'll write a stream that filters out the copyright/overview stuff and passes on the rest:

var stream = require('stream')
  , util = require('util')

function CleanIntro(options) {
  stream.Transform.call(this, options)
}

util.inherits(CleanIntro, stream.Transform)

CleanIntro.prototype._transform = function (chunk, enc, cb) {
  if (this.readingData) {
    this.push(chunk, enc)
  } else {
    // Ignore all text until we find a line beginning with 'Date,''
    var start = chunk.toString().search(/^Date,/m)
    if (start !== -1) {
      this.readingData = true
      this.push(chunk.slice(start), enc)
    }
  }
  cb()
}

A Transform stream simply takes data that was piped in from another stream, does whatever it wants to it, then pushes whatever it wants back out. In our case, we're just ignoring anything before the actual data begins, then pushing the rest of the data back out. Easy.

Parse the CSV data

Now that we have a filter to get just the raw CSV data, we can start parsing it. There are lots of CSV parsing libraries out there; I like csv-stream because, well, it's a stream. So our basic process is to make the HTTP request, pipe it to our header-cleaning filter, then pipe it to csv-stream and start working with the data:
var request = require('request')
  , csv = require('csv-stream')
  , util = require('util')
  , _ = require('lodash')
  , moment = require('moment')
  , OutStream = require('./out-stream')
  , CleanIntroFilter = require('./clean-intro-filter')

// Returns a Stream that emits CSV records from Google Flu Trends.
// options:
//   - regions: an array of regions for which data should be generated.
//     See http://www.google.org/flutrends/us/data.txt for possible values
module.exports = function (options) {
  options = _.extend({
    regions: ['United States']
  }, options)

  var earliest = moment().subtract('years', 1)

  request('http://www.google.org/flutrends/us/data.txt')
    .pipe(new CleanIntroFilter())
    .pipe(csv.createStream({}))
    .on('error',function(err){
        // Oops, got an error
    })
    .on('data',function(data) {
      var date = moment(data.Date)

      // Only return data from the past year
      if (date.isAfter(earliest) || date.isSame(earliest)) {
        // Let's build the output String...
        console.log(data.Date + ',' + _.map(options.regions, function (region) {
          return data[region]
        }).join())
      }
    })
    .on('end', function () {
      // Okay we're done, now what?
    })
}

Alright, now we're getting close. We've built the CSV output, but now what do we do with it? Push it all into an array and return that? NO! Remember, we'll lose the slim memory benefits of streams if we don't keep using them the whole way through.

Write out to another Stream

Instead, let's just make our own writeable stream:
var stream = require('stream')

var OutStream = function() {
  stream.Transform.call(this,{objectMode: false})
}

OutStream.prototype = Object.create(
  stream.Transform.prototype, {constructor: {value: OutStream}} )

OutStream.prototype._transform = function(chunk, encoding, callback) {
  this.push(chunk, encoding)
  callback && callback()
}

OutStream.prototype.write = function () {
  this._transform.apply(this, arguments)
}

OutStream.prototype.end = function () {
  this._transform.apply(this, arguments)
  this.emit('end')
}

And now our parsing function can return that stream and write to it:
module.exports = function (options) {
  options = _.extend({
    regions: ['United States']
  }, options)

  var out = new OutStream()
  out.write('Date,' + options.regions.join())

  var earliest = moment().subtract('years', 1)

  request('http://www.google.org/flutrends/us/data.txt')
    .pipe(new CleanIntroFilter())
    .pipe(csv.createStream({}))
    .on('error',function(err){
        out.emit('error', err)
    })
    .on('data',function(data) {
      var date = moment(data.Date)

      // Only return data from the past year
      if (date.isAfter(earliest) || date.isSame(earliest)) {
        out.write(data.Date + ',' + _.map(options.regions, function (region) {
          return data[region]
        }).join())
      }
    })
    .on('end', function () {
      out.end()
    })

  return out
}

Serve it up

Finally, we'll use Express to expose our data as a web endpoint:
var express = require('express')
  , data = require('./lib/data')
  , _ = require('lodash')

var app = express()

app.get('/', function(req, res){
  var options = {}

  if (req.query.region) {
    options.regions = _.isArray(req.query.region) ? req.query.region : [req.query.region]
  }

  res.setHeader('Content-Type', 'text/csv')

  data(options)
    .on('data', function (data) {
      res.write(data)
      res.write('\n')
    })
    .on('end', function (data) {
      res.end()
    })
    .on('error', function (err) {
      console.log('error: ', error)
    })
})

var port = process.env.PORT || 5000
app.listen(port)
console.log('Listening on port ' + port)

Once again, note that as soon as we get data from our stream, we manipulate and write it out to another stream (the HTTP response, in this case). This keeps us from holding a lot of data in memory unnecessarily.

Now if we fire up the server, we can use curl to see it in action:

$ curl 'http://localhost:5000'
Date,United States
2012-11-04,2492
2012-11-11,2913
2012-11-18,3040
2012-11-25,3641
2012-12-02,4427
[and so on]

$ curl 'http://localhost:5000?region=United%20States&region=Pennsylvania'
Date,United States,Pennsylvania
2012-11-04,2492,2579
2012-11-11,2913,2889
2012-11-18,3040,2785
2012-11-25,3641,3248
2012-12-02,4427,3679
[and so on]

As long the server is running someplace that is accessible to the public, we can head on over to Dash and plug it into a Custom Chart widget, giving us something like this:
Hey, looks like December and January are big months for the flu in the U.S. Who knew?

Want to try this yourself? The full source for this app is on GitHub, along with step-by-step instructions for running the project and creating a widget in Dash. Enjoy!

22 comments:

  1. managed service provider chicago
    When you are looking for a Managed Service Provider in Chicago, you need to consider what advantages you will get when you choose ExterNetworks as your managed service provider.

    ReplyDelete
  2. The information you provided is very useful, thank you very much for sharing useful information with us. You can apply for a Turkey visa online. You can get your Turkey Visa in just 1 hour by selecting the express processing type. It only takes 5 minutes to apply for an electronic visa Turkey. Apply Online.

    ReplyDelete
  3. This is a wonderful inspiring article. I am practically satisfied with your great work. You have really put together extremely helpful data. Keep it up ..Continue this... kenya visa for US citizens, The process of e-Visa application is very simple and straightforward. Applicant can complete the process within 5-10 minutes from anywhere in the world.

    ReplyDelete
  4. Thanks for sharing the post. It is really good which provides enough information to find out about the given topic. Good luck. To know about the visa processing Azerbaijan visa fee and the documentation required, read on through the Asan visa website.

    ReplyDelete
  5. This is a wonderful inspiring article. I am practically satisfied with your great work. You have really put together extremely helpful data. Keep it up... Continue... Getting a kenya e-visa is quite convenient. Anyone can apply at any time of the day.

    ReplyDelete
  6. Nice post. keep up the good work....Travelling to India from Australia, Apply for the Indian Visa through online visa application system.

    ReplyDelete
  7. This comment has been removed by the author.

    ReplyDelete
  8. Wow.. Very informative article thanks for sharing please keep it up.. If you travel to India now , you can check how to apply emergency visa to India online. Within 5 to 10 minutes you can fill your India visa emergency form online.

    ReplyDelete
  9. The content you shared with us is great. Thanks for sharing it. You can apply for an urgent visa application India by clicking the link we just provided. Thanks for adding value to the post.

    ReplyDelete
  10. It is a good site,Thank you.. Obtain the Indian e-Medical visa online via Indian visa website. Indian Medical Visa is a travel permit approved by the Government of India for persons who wish to come to India for medical treatment. Indian e-Medical visa cost depends on your nationality.

    ReplyDelete
  11. Your website is a good website with good stock. I appreciate you. Indian tourist visa, You can get an online Indian tourist visa via India evisa website. And then you can travel to India.

    ReplyDelete
  12. İnstagram takipçi satın al! İnstagram takipçi sitesi ile takipçi satın al sende sosyal medyada fenomen olmaya bir adım at. Sende hemen instagram takipçi satın almak istiyorsan tıkla:

    1- takipçi satın al

    2- takipçi satın al

    3- takipçi satın al

    ReplyDelete

  13. I see that there is a good discussion about this paragraph at this place
    on this website. Vietnam visa united states. USA Citizens must have an
    eVisa to visit Vietnam both for pleasure and business reasons. Without
    They cannot enter Turkey.

    ReplyDelete
  14. I am truly thankful to the owner of this web site who has shared this grateful post here. What are Azerbaijan sticker visa requirements ? The Azerbaijan sticker visa requirements Depending upon the length of stay, the purpose of visit, and the numbers of entries, the visitors can select and some other factors.

    ReplyDelete
  15. Thanks for sharing this article with us... I hope you will continue in the future... The question is how much Indian visa fees are? So The Indian visa fees depend on your visa type, your nationality, and processing time. If you have any kind of doubts check about them and clear your doubts.

    ReplyDelete
  16. I read your blog and found many interesting metrics in this material. Thanks for sharing it on the Internet. People who are willing to visit Turkey and want a Turkish evisa can apply for it which is a totally online process. Fill the application form, make payment & receive it in email. As simple as that.

    ReplyDelete