Category: node.js

Connection Pooling with node-postgres

Update on June 24, 2012: I submitted a patch to node-pool that exposes this pattern through a method called pooled on generic connection pools. I’ve updated the code samples below to include the more robust pattern. Take a look for Pooled function decoration in node-pool’s README.md for more information.

Documentation on node-postgres’ connection pooling is a little sparse. It is built-in but usage is somewhat unclear. Below is a minimally intrusive way to introduce it into an app.

node-postgres’ connection pooling works through the connect function on the module object:

pg = require 'pg'
pg.connect connectionString, (err, pgClient) ->
  return console.log "Error! #{err}" if err?
  # use pgClient

pgClient above is a pooled client. When a drain event occurs on the client, it is automatically returned to the pool: drain events generally occur after query has been executed on the client, unless you have suppressed drain events during a transaction. For full information, see the node-postgres documentation

Let’s assume you have a data access module with methods like this:

exports.findWidget = findWidget = (id, callback) -> ...
exports.transactedWidget = transactedWidget = (widgetSauce, callback) -> ...

The obvious way to incorporate connection pooling is something like:

pg = require 'pg'
connectionString = "tcp://postgres:postgres@localhost/dummy_db"
exports.findWidget = findWidget = (id, callback) ->
  pg.connect connectionString, (err, pgClient) ->
    return callback(err) if err?
    ... #Use pgClient to find

exports.transactedWidget = transactedWidget = (widgetSauce, callback) ->
  pg.connect connectionString, (err, pgClient) ->
    return callback(err) if err?
    ... #Use pgClient and do some transaction stuff

Frown town: three lines of useless boilerplate to every exported method in the data layer. Furthermore, trying to re-use methods in a transaction context is impossible. We can do better – create a new file called pg_pool.coffee with the following:

pg = require 'pg'

module.exports = pooler =
   #Get a connection from the pool
  acquire: (callback) -> pg.connect "tcp://postgres:postgres@localhost/dummy_db", callback

  #Decorate a function to use the de-pooled connection as a first argument
  pooled: (fn) -> ->
    callerCallback = arguments[arguments.length - 1]
    callerHasCallback = typeof callerCallback == 'function'
    callerArgs = Array::slice.call(arguments, 0, if callerHasCallback then -1 else undefined)

    pooler.acquire (err, pgClient) ->
      return (callerCallback err if err?) if err?
      pgClient.pauseDrain()
      callerArgs.push ->
        pgClient.resumeDrain()
        callerCallback.apply(null, arguments) if callerHasCallback
      fn pgClient, callerArgs...

Also available as gist. The pooled method creates a python-style decorator that will wrap data access methods in a connection pool. Furthermore, the client is kept out of the pool until the callback is executed (this is what pgClient.pauseDrain() and pgClient.resumeDrain() do in the above example). Using this, we can replace the data layer code with:

{pooled} = require './pg_pool'
exports.findWidget = pooled findWidget = (pgClient, id, callback) ->
 ... #Use pgClient to find widget
exports.transactedWidget = pooled transactedWidget = (pgClient, widgetSauce, callback) -> 
 ... #Use pgClient to do some jazz

In addition to brevity, this enabled nested method calls in transaction-context. The non-exported versions of the methods accept a pgClient parameter. For example, if transactedWidget needed to call findWidget in a transaction:

{pooled} = require 'pg_pool'
exports.findWidget = pooled findWidget = (pgClient, id, callback) ->
 ... #Use pgClient to find widget
exports.transactedWidget = pooled transactedWidget = (pgClient, widgetSauce, callback) -> 
 pgClient.query "BEGIN", (err) ->
   return callback(err) if err?  
   findWidget pgClient, "123", (err, widget) ->
     return callback(err) if err?
     pgClient.query "COMMIT", (err) ->
       callback(err, widget)

Not the prettiest code, but it beats having to write two versions of each method. Since we are using the pooled decorator, we can guarantee that the pgClient is stable for duration of this transaction.

A Saner S3 PUT for Node.js

The state of node.js libraries is hit and miss. I have been using Knox to do my s3 uploads and recently came across this gem of a stack trace:

assert.js:93
throw new assert.AssertionError({

AssertionError: true == false
at IncomingMessage. (http.js:1341:9)
at IncomingMessage.emit (events.js:61:17)
at HTTPParser.onMessageComplete (http.js:133:23)
at Socket.ondata (http.js:1231:22)
at Socket._onReadable (net.js:683:27)
at IOWatcher.onReadable [as callback] (net.js:177:10)

Sure enough, there is an outstanding issue for Knox that calls to PUT actually crash the node process when Amazon returns a non-200 (https://github.com/LearnBoost/knox/issues/41). Digging deeper into the source code I noticed this comment:

/**
* PUT the file at `src` to `filename`, with callback `fn`
* receiving a possible exception, and the response object.
*
* NOTE: this method reads the _entire_ file into memory using
* fs.readFile(), and is not recommended or large files.
* ...

Yarg! A S3 PUT is not a complicated operation. All I want is a solution that

  • Method signature that takes in a file path and throws it into s3 (i.e. no mucking with request objects)
  • Supports timeouts, HTTP continue (i.e. fails fast)
  • Uses callbacks and pass useful error objects (i.e. the text from amazon)
  • Doesn’t read entire files (!) into memory (i.e. uses pipe from node.js)

Here is what I came up with (in CoffeeScript):

fs = require 'fs'
http = require 'http'
https = require 'https'
crypto = require 'crypto'

mime = require 'mime'
xml2js = require 'xml2js'

delayTimeout = (ms, func) -> setTimeout func, ms
class @S3Put
  constructor: (@awsKey, @awsSecret, @bucket, @secure=true, @timeout=60*1000) ->

  put: (filePath, resource, amzHeaders, callback) ->
    mimeType = mime.lookup(filePath)
    fs.stat filePath, (err, stats) =>
      return callback(err) if err?

      contentLength = stats.size
      md5Hash = crypto.createHash 'md5'

      rs = fs.ReadStream(filePath)
      rs.on 'data', (d) -> md5Hash.update(d)
      rs.on 'end',  =>
        md5 = md5Hash.digest('base64')
        date = new Date()
        httpOptions =
          host: "s3.amazonaws.com"
          path: "/#{@bucket}#{resource}"
          headers:
            "Authorization": "AWS #{@awsKey}:#{@sign(resource, md5, mimeType, date, amzHeaders)}"
            "Date": date.toUTCString()
            "Content-Length": contentLength
            "Content-Type": mimeType
            "Content-MD5": md5
            "Expect": "100-continue"
          method: "PUT"

        (httpOptions.headers[k] = v for k,v of amzHeaders)
        timeout = null

        req = (if @secure then https else http).request httpOptions, (res) =>
          if res.statusCode == 200
            clearTimeout(timeout)
            headers = JSON.stringify(res.headers)
            return callback(null, {headers: headers, code: res.statusCode})

          responseBody = ""
          res.setEncoding("utf8")
          res.on "data", (chunk) ->
            responseBody += chunk

          res.on "end", ->
            parser = new xml2js.Parser()
            parser.parseString responseBody, (err, result) ->
              return callback(err) if err?
              return callback(result)

        timeout = delayTimeout @timeout, =>
          req.abort()
          return callback({message: "Timed out after #{@timeout}ms"})

        req.on "continue", ->
          rs2 = fs.ReadStream(filePath)
          rs2.on 'error', callback
          rs2.pipe(req)

  sign: (resource, md5, contentType, date, amzHeaders) ->
    data = ["PUT", md5, contentType, date.toUTCString(), @canonicalHeaders(amzHeaders).join("\n"), "/#{@bucket}#{resource}"].join("\n")
    crypto.createHmac('sha1', @awsSecret).update(data).digest('base64')

  canonicalHeaders: (headers) ->
    ("#{k.toLowerCase()}:#{v}" for k,v of headers).sort()

Use like

S3Put = require('s3put').S3Put
s3Put = new S3Put("awsKey", "awsSecret", "s3Bucket")
s3Put.put "/path/to/file", "key", {"x-amz-acl": "public-read"}, (err, res) ->
   # err will be the error object given from Amazon (converted from xml)
   # res will contain res.headers and res.code
   console.log "Hurrah"

I’ve also put a gist up here: https://gist.github.com/1347203