Thursday, October 31, 2013

Using Node.js streams to massage data into the format you want

Google provides some pretty cool flu data in CSV format, and I wanted to display that in a chart at Dash. However, the raw data isn't quite right for my needs:
  1. It has a bunch of intro/header text (copyright stuff, description of the data, etc), and Dash needs just the raw data.
  2. It shows dozens of states/regions/cities, and I just want to show overall U.S. data and my home state.
Fortunately, Dash can read data from any publicly accessible endpoint, so I decided to throw together a quick Node.js app to massage the data into what I needed. The most straightforward solution was probably to load the whole file, read through it line by line, build up an array of data, then write it out. And since the data feed is currently just under 400KB, maybe that would have been alright. But a better pattern (and more fun, IMO) is to take advantage of Node Streams. As long as we use streams throughout the entire process, we can make sure that only a small buffer is kept in memory at any given time.

If you just want to see the full app, it's on GitHub. Otherwise, read on to see my thought process.

Filter out the intro/header text

First, we'll write a stream that filters out the copyright/overview stuff and passes on the rest:

var stream = require('stream')
  , util = require('util')

function CleanIntro(options) {
  stream.Transform.call(this, options)
}

util.inherits(CleanIntro, stream.Transform)

CleanIntro.prototype._transform = function (chunk, enc, cb) {
  if (this.readingData) {
    this.push(chunk, enc)
  } else {
    // Ignore all text until we find a line beginning with 'Date,''
    var start = chunk.toString().search(/^Date,/m)
    if (start !== -1) {
      this.readingData = true
      this.push(chunk.slice(start), enc)
    }
  }
  cb()
}

A Transform stream simply takes data that was piped in from another stream, does whatever it wants to it, then pushes whatever it wants back out. In our case, we're just ignoring anything before the actual data begins, then pushing the rest of the data back out. Easy.

Parse the CSV data

Now that we have a filter to get just the raw CSV data, we can start parsing it. There are lots of CSV parsing libraries out there; I like csv-stream because, well, it's a stream. So our basic process is to make the HTTP request, pipe it to our header-cleaning filter, then pipe it to csv-stream and start working with the data:
var request = require('request')
  , csv = require('csv-stream')
  , util = require('util')
  , _ = require('lodash')
  , moment = require('moment')
  , OutStream = require('./out-stream')
  , CleanIntroFilter = require('./clean-intro-filter')

// Returns a Stream that emits CSV records from Google Flu Trends.
// options:
//   - regions: an array of regions for which data should be generated.
//     See http://www.google.org/flutrends/us/data.txt for possible values
module.exports = function (options) {
  options = _.extend({
    regions: ['United States']
  }, options)

  var earliest = moment().subtract('years', 1)

  request('http://www.google.org/flutrends/us/data.txt')
    .pipe(new CleanIntroFilter())
    .pipe(csv.createStream({}))
    .on('error',function(err){
        // Oops, got an error
    })
    .on('data',function(data) {
      var date = moment(data.Date)

      // Only return data from the past year
      if (date.isAfter(earliest) || date.isSame(earliest)) {
        // Let's build the output String...
        console.log(data.Date + ',' + _.map(options.regions, function (region) {
          return data[region]
        }).join())
      }
    })
    .on('end', function () {
      // Okay we're done, now what?
    })
}

Alright, now we're getting close. We've built the CSV output, but now what do we do with it? Push it all into an array and return that? NO! Remember, we'll lose the slim memory benefits of streams if we don't keep using them the whole way through.

Write out to another Stream

Instead, let's just make our own writeable stream:
var stream = require('stream')

var OutStream = function() {
  stream.Transform.call(this,{objectMode: false})
}

OutStream.prototype = Object.create(
  stream.Transform.prototype, {constructor: {value: OutStream}} )

OutStream.prototype._transform = function(chunk, encoding, callback) {
  this.push(chunk, encoding)
  callback && callback()
}

OutStream.prototype.write = function () {
  this._transform.apply(this, arguments)
}

OutStream.prototype.end = function () {
  this._transform.apply(this, arguments)
  this.emit('end')
}

And now our parsing function can return that stream and write to it:
module.exports = function (options) {
  options = _.extend({
    regions: ['United States']
  }, options)

  var out = new OutStream()
  out.write('Date,' + options.regions.join())

  var earliest = moment().subtract('years', 1)

  request('http://www.google.org/flutrends/us/data.txt')
    .pipe(new CleanIntroFilter())
    .pipe(csv.createStream({}))
    .on('error',function(err){
        out.emit('error', err)
    })
    .on('data',function(data) {
      var date = moment(data.Date)

      // Only return data from the past year
      if (date.isAfter(earliest) || date.isSame(earliest)) {
        out.write(data.Date + ',' + _.map(options.regions, function (region) {
          return data[region]
        }).join())
      }
    })
    .on('end', function () {
      out.end()
    })

  return out
}

Serve it up

Finally, we'll use Express to expose our data as a web endpoint:
var express = require('express')
  , data = require('./lib/data')
  , _ = require('lodash')

var app = express()

app.get('/', function(req, res){
  var options = {}

  if (req.query.region) {
    options.regions = _.isArray(req.query.region) ? req.query.region : [req.query.region]
  }

  res.setHeader('Content-Type', 'text/csv')

  data(options)
    .on('data', function (data) {
      res.write(data)
      res.write('\n')
    })
    .on('end', function (data) {
      res.end()
    })
    .on('error', function (err) {
      console.log('error: ', error)
    })
})

var port = process.env.PORT || 5000
app.listen(port)
console.log('Listening on port ' + port)

Once again, note that as soon as we get data from our stream, we manipulate and write it out to another stream (the HTTP response, in this case). This keeps us from holding a lot of data in memory unnecessarily.

Now if we fire up the server, we can use curl to see it in action:

$ curl 'http://localhost:5000'
Date,United States
2012-11-04,2492
2012-11-11,2913
2012-11-18,3040
2012-11-25,3641
2012-12-02,4427
[and so on]

$ curl 'http://localhost:5000?region=United%20States&region=Pennsylvania'
Date,United States,Pennsylvania
2012-11-04,2492,2579
2012-11-11,2913,2889
2012-11-18,3040,2785
2012-11-25,3641,3248
2012-12-02,4427,3679
[and so on]

As long the server is running someplace that is accessible to the public, we can head on over to Dash and plug it into a Custom Chart widget, giving us something like this:
Hey, looks like December and January are big months for the flu in the U.S. Who knew?

Want to try this yourself? The full source for this app is on GitHub, along with step-by-step instructions for running the project and creating a widget in Dash. Enjoy!

21 comments:

  1. The information you provided is very useful, thank you very much for sharing useful information with us. You can apply for a Turkey visa online. You can get your Turkey Visa in just 1 hour by selecting the express processing type. It only takes 5 minutes to apply for an electronic visa Turkey. Apply Online.

    ReplyDelete
  2. This comment has been removed by the author.

    ReplyDelete
  3. The content you shared with us is great. Thanks for sharing it. You can apply for an urgent visa application India by clicking the link we just provided. Thanks for adding value to the post.

    ReplyDelete
  4. It is a good site,Thank you.. Obtain the Indian e-Medical visa online via Indian visa website. Indian Medical Visa is a travel permit approved by the Government of India for persons who wish to come to India for medical treatment. Indian e-Medical visa cost depends on your nationality.

    ReplyDelete

  5. I see that there is a good discussion about this paragraph at this place
    on this website. Vietnam visa united states. USA Citizens must have an
    eVisa to visit Vietnam both for pleasure and business reasons. Without
    They cannot enter Turkey.

    ReplyDelete
  6. I am truly thankful to the owner of this web site who has shared this grateful post here. What are Azerbaijan sticker visa requirements ? The Azerbaijan sticker visa requirements Depending upon the length of stay, the purpose of visit, and the numbers of entries, the visitors can select and some other factors.

    ReplyDelete
  7. Thanks for sharing this article with us... I hope you will continue in the future... The question is how much Indian visa fees are? So The Indian visa fees depend on your visa type, your nationality, and processing time. If you have any kind of doubts check about them and clear your doubts.

    ReplyDelete
  8. I read your blog and found many interesting metrics in this material. Thanks for sharing it on the Internet. People who are willing to visit Turkey and want a Turkish evisa can apply for it which is a totally online process. Fill the application form, make payment & receive it in email. As simple as that.

    ReplyDelete
  9. Wow you have nice content on your page. Your audience will enjoy it while they are read. Turkey visitors can apply for a Turkish Visit Visa online from anywhere in the world. The process is easy and convenient which saves your time & money.

    ReplyDelete
  10. When it comes to choosing the best city to visit in Turkey, there are numerous captivating destinations to explore. One city that stands out is Istanbul. Visitors can immerse themselves in the grandeur of the iconic Hagia Sophia, marvel at the intricate designs of the Blue Mosque, and wander through the bustling Grand Bazaar.

    ReplyDelete
  11. Kenya business visa for Indian individuals provides an opportunity for Indian entrepreneurs and professionals to explore the vibrant business landscape of Kenya. This visa enables Indian nationals to engage in various business activities.

    ReplyDelete
  12. We express our heartfelt gratitude for your invaluable contribution, which has greatly enriched the collective experience of all involved. Visa Arrival Policy To Be Made Free For Tourists In Sri Lanka. Sri Lanka has declared that it will not charge visitors a fee to get a visa upon arrival. As a result, travelers to Sri Lanka won't need to pay for their visas when they arrive. The action aims to facilitate admission into the nation and draw in more visitors. Tourists must still fulfill the admission criteria, which include having a current passport, a return ticket, and enough money to cover their stay.

    ReplyDelete
  13. I appreciate your thoughtful content. It's engaging and offers valuable insights. Keep up the great work! If you're a frequent traveler, consider the convenience of a one year multiple entry visa Saudi Arabia, providing flexibility for your frequent visits to this captivating nation.

    ReplyDelete
  14. VFS Global is a global visa and passport processing service provider. Visit www vfsglobal com Saudi arabia for information on services, appointments, and requirements related to Saudi Arabia's visa and consular services.

    ReplyDelete
  15. Hello! Your ongoing support is truly invaluable, driving us to craft increasingly captivating content. We appreciate you being our muse and motivating force, inspiring us to persist in our creative endeavors. Thank you! Experience the convenience of the evisa to moscow Simplify your travel with easy online application, swift processing, and explore the vibrant Russian capital hassle-free.

    ReplyDelete
  16. You're absolutely amazing! 😃 I don't think I've come across anything quite like this before. 🤩 It's fantastic to discover someone with such unique ideas on this subject. 🌟 Thank you so much for initiating this, and your website is truly a valuable addition to the internet. 🌐 We need more individuals like you who bring originality to the table! 🙌👏 Many people are also inquiring about the Indian visa cost for US citizens. You can find more information about it on our website.

    ReplyDelete
  17. Hi! Your blog caught my attention on the internet, and I'm thoroughly impressed by this well-crafted article. I've saved it for future reading and can't wait to explore more of your informative content. Also Stay informed with the latest Nigeria Travel Updates to make your journey hassle-free.

    ReplyDelete