Category: coding

Node.js Postgres Pooling Revisited (with transactions!)

This is a follow up to the last connection pooling blog entry. If it seems like I am spending an undue amount of time on this topic then you feel the same way as I do.

I’ve grown wary of node-postgres‘ built-in pooling and have decided to implement my own connection pooling on top of it. In production, I was seeing that connections would “leak” from the pool, causing the pool to fill and time out all further requests. The only fix I had was to have a periodic health check that rebooted the process: obviously this wouldn’t work for launch. The issue is documented here: https://github.com/brianc/node-postgres/issues/137 but most of the discussed solutions won’t work with my pooled decorator pattern.

Luckily, new versions of node-pool has the pattern built in and baking your own pool is a couple of lines. Let’s create a pool:

pg = require('pg').native # Or non-native, if you prefer
poolModule = require 'generic-pool'

connectionString = "tcp://user:pass@localhost/your_database"
pgPool = poolModule.Pool
    name: 'postgres'
    create: (cb) ->
        client = new pg.Client connectionString
        client.connect (err) ->
          return cb(err) if err?
          client.on "error", (err) ->
            console.log "Error in postgres client, removing from pool"
            pgPool.destroy(client)
          client.pauseDrain() #Make sure internal pooling is disabled
          cb null, client

    destroy: (client) -> client.end()
    max: 10
    idleTimeoutMillis : 30 * 1000
    log: true #remove me if you aren't debugging

pgPool is a pool of up to ten postgres clients. Calling pgPool.acquire will give you a native postgres client to work with, while calling pgPool.release will return it back to the pool. Lastly, calling pgPool.pooled and passing in a function will “decorate” your function so that it auto-acquires on call, and auto-releases on callback. See the pooled function decoration documentation.

Note that in create we call the pg.Client constructor directly instead of using pg.connect because pg.connect returns a proxy object that does internal connection pooling.

Okay! That was pretty basic. To spice this post up a bit, here’s a little class that helps out with transactions:

class Transaction
    #Unmanaged client (not auto-released)
    @startInClient: (pgClient, releaseOnCompletion, callback) ->
      [callback, releaseOnCompletion] = [releaseOnCompletion, false] if typeof releaseOnCompletion == 'function'
      (new Transaction(pgClient, releaseOnCompletion)).startTransaction callback

    #Managed client (auto-released on commit / rollback)
    @start: (callback) ->
      pgPool.acquire (err, pgClient) ->
        return callback(err) if err?
        Transaction.startInClient pgClient, true, callback

    constructor: (@pgClient, @releaseOnComplete) ->

    startTransaction: (cb) ->
      @pgClient.query "BEGIN", (err) => cb(err, @)

    rollback: (cb) ->
      @pgClient.query "ROLLBACK", (err) =>
        pgPool.release @pgClient if @releaseOnComplete
        cb(err, @) if cb?

    commit: (cb) ->
      @pgClient.query "COMMIT", (err) =>
        pgPool.release @pgClient if @releaseOnComplete
        cb(err, @) if cb?

    wrapCallback: (cb) -> (err) =>
      callerArguments = arguments
      if err?
        @rollback()
        return cb callerArguments...
      else
        @commit (commitErr) ->
          return cb(commitErr) if commitErr?
          cb callerArguments...

Transaction.start acquires a new connection from the pool, begins a transaction and returns an object that can be used to manage the transaction. You can either rollback or commit the transaction. One more sophisticated function is the wrapCallback function. This is best understood with an example:

# Start a transaction and do something with it
Transaction.startTransaction (err, t) =>
      return console.log "Couldn't start transaction" if err?
      callback = t.wrapCallback(callback)
      pgClient = t.pgClient
      pgClient.query "INSERT INTO fun(id) VALUES(1)", (err) ->
        return callback(err) if err?
        pgClient.query "INSERT INTO people(id,fun_id) VALUES (1,1)", callback

The above code transactionally inserts an item into the fun table and the people table. Beyond Transaction.startTransaction it never mentions the transaction again. The magic here is the t.wrapCallback function: it produces a wrapped callback that rollbacks when called with an error (the first parameter isn’t undefined/null) and commits otherwise.

In the languages I know, the “gold standard” for transactions is when they are as transparent as possible. Sometimes (i.e. Spring with JDBC in Java), this means resorting to thread-local contexts. I think the above transaction class matches this standard, even if the wrapCallback is a little tricky to understand.

And with that, we’re back to coding. If you like postgres and silly faces, try out the Silly Face Society on the iPhone.

Hardening node.js for production part 2: using nginx to avoid node.js load

This is part 2 of a quasi-series on hardening node.js for production systems (e.g. the Silly Face Society). The previous article covered a process supervisor that creates multiple node.js processes, listening on different ports for load balancing. This article will focus on HTTP: how to lighten the incoming load on node.js processes. Update: I’ve also posted a part 3 on zero downtime deployments in this setup.

Our stack consists of nginx serving external traffic by proxying to upstream node.js processes running express.js. As I’ll explain, nginx is used for almost everything: gzip encoding, static file serving, HTTP caching, SSL handling, load balancing and spoon feeding clients. The idea is use nginx to prevent unnecessary traffic from hitting our node.js processes. Furthermore, we remove as much overhead as possible for traffic that has to hit node.js.

Too much talk. Here is our nginx config:

 http {
    proxy_cache_path  /var/cache/nginx levels=1:2 keys_zone=one:8m max_size=3000m inactive=600m;
    proxy_temp_path /var/tmp;
    include       mime.types;
    default_type  application/octet-stream;
    sendfile        on;
    keepalive_timeout  65;

    gzip on;
    gzip_comp_level 6;
    gzip_vary on;
    gzip_min_length  1000;
    gzip_proxied any;
    gzip_types text/plain text/html text/css application/json application/x-javascript text/xml application/xml application/xml+rss text/javascript;
    gzip_buffers 16 8k;
 
    upstream silly_face_society_upstream {
      server 127.0.0.1:61337;
      server 127.0.0.1:61338;
      keepalive 64;
    }

    server {
        listen 80;
        listen 443 ssl;

        ssl_certificate /some/location/sillyfacesociety.com.bundle.crt;
        ssl_certificate_key /some/location/sillyfacesociety.com.key;
        ssl_protocols        SSLv3 TLSv1;
        ssl_ciphers HIGH:!aNULL:!MD5;

        server_name sillyfacesociety.com www.sillyfacesociety.com;

        if ($host = 'sillyfacesociety.com' ) {
                rewrite  ^/(.*)$  http://www.sillyfacesociety.com/$1  permanent;
        }

        error_page 502  /errors/502.html;

        location ~ ^/(images/|img/|javascript/|js/|css/|stylesheets/|flash/|media/|static/|robots.txt|humans.txt|favicon.ico) {
          root /usr/local/silly_face_society/node/public;
          access_log off;
          expires max;
        }

        location /errors {
          internal;
          alias /usr/local/silly_face_society/node/public/errors;
        }

        location / {
          proxy_redirect off;
          proxy_set_header   X-Real-IP            $remote_addr;
          proxy_set_header   X-Forwarded-For  $proxy_add_x_forwarded_for;
          proxy_set_header   X-Forwarded-Proto $scheme;
          proxy_set_header   Host                   $http_host;
          proxy_set_header   X-NginX-Proxy    true;
          proxy_set_header   Connection "";
          proxy_http_version 1.1;
          proxy_cache one;
          proxy_cache_key sfs$request_uri$scheme;
          proxy_pass         http://silly_face_society_upstream;
        }
    }
}

Also available as a gist.

Perhaps this code dump isn’t particularly enlightening: I’ll try to step through the config and give pointers on how this balances the express.js code.

The nginx <-> node.js link
First things first: how can we get nginx to proxy / load balance traffic to our node.js instances? We’ll assume that we are running two instances of express.js on ports 61337 and 61338. Take a look at the upstream section:

http {
    ...
    upstream silly_face_society_upstream {
      server 127.0.0.1:61337;
      server 127.0.0.1:61338;
      keepalive 64;
    }
    ...
}

The upstream directive specifies that these two instances work in tandem as an upstream server for nginx. The keepalive 64; directs nginx to keep a minimum of 64 HTTP/1.1 connections to the proxy server at any given time. This is a true minimum: if there is more traffic then nginx will open more connections to the proxy.

upstream alone is not sufficient – nginx needs to know how and when to route traffic to node. The magic happens within our server section. Scrolling to the bottom, we have a location / section like:

http {
    ...
    server {
        ...
        location / {
          proxy_redirect off;
          proxy_set_header   X-Real-IP            $remote_addr;
          proxy_set_header   X-Forwarded-For  $proxy_add_x_forwarded_for;
          proxy_set_header   Host                   $http_host;
          proxy_set_header   X-NginX-Proxy    true;
          ...
          proxy_set_header   Connection "";
          proxy_http_version 1.1;
          proxy_pass         http://silly_face_society_upstream;
        }
        ...
    }
}

This section is a fall-through for traffic that hasn’t matched any other rules: we have node.js handle the traffic and nginx proxy the response. The most important part of the section is proxy_pass – this tells nginx to use the upstream server that we defined higher up in the config. Next in line is proxy_http_version which tells nginx that it should use HTTP/1.1 for connections to the proxy server. Using HTTP/1.1 spares the overhead of establishing a connection between nginx and node.js with every proxied request and has a significant impact on response latency. Finally, we have a couple of proxy_set_header directives to tell our express.js processes that this is a proxied request and not a direct one. Full explanations can be found in the HttpProxyModule docs.

This part of the config is the minimum amount needed to get nginx serving port 80 and proxying our node.js processes underneath. The rest of this article will cover how to use nginx features to lighten the traffic load on node.js.

Static file intercept
Although express.js has built in static file handling through some connect middleware, you should never use it. Nginx can do a much better job of handling static files and can prevent requests for non-dynamic content from clogging our node processes. The location directive in question is:

http {
    ...
    server {
        ...
        location ~ ^/(images/|img/|javascript/|js/|css/|stylesheets/|flash/|media/|static/|robots.txt|humans.txt|favicon.ico) {
          root /usr/local/silly_face_society/node/public;
          access_log off;
          expires max;
        }
        ...
    }
}
 

Any requests for with a URI starting with images, img, css, js, ... will be matched by this location. In my express.js directory structure, the public/ directory is used to store all my static assets – things like CSS, javascript and the like. Using root I instruct nginx to serve these files without ever talking to the underlying servers. The expires max; section is a caching hint that these assets are immutable. For other sites, it may be more appropriate to use a quicker cache expiry through something like expires 1h;. Full information can be in nginx’s HttpHeadersModule.

Caching
In my opinion, any caching is better than no caching. Sites with extremely heavy traffic will use all kinds of caching solutions including varnish for HTTP acceleration and memcached for fragment caching and query caching. Our site isn’t so high-traffic but caching is still going to save us a fortune in server costs. For simplicity of configuration I decided to use nginx’s built-in caching.

Nginx’s built in caching is crude: when an upstream server provides HTTP header hints like Cache-Control, it enables caching with an expiry time matching the header hint. Within the expiry time, the next request will pull a cached file from disk instead of hitting the underlying node.js process. To set up caching, I have set two directives in the http section of the nginx config:

http {
    ...
    proxy_cache_path  /var/cache/nginx levels=1:2 keys_zone=one:8m max_size=3000m inactive=600m;
    proxy_temp_path /var/tmp;
    ...
}

These two lines instruct nginx that we are going to use it in caching mode. proxy_cache_path specifies the root directory for our cache, the directory-depth (levels), the max_size of the cache and the inactive expire time. More importantly, it specifies the size of the in-memory keys for the files through keys_zone. When nginx receives a request, it computes an MD5 hash and uses this key set to find the corresponding file on disk. If it is not available, the request will hit our underlying node.js processes. Finally, to make our proxied requests use this cache, we have to change the location / section to include some caching information:

http {
  server {
     ...
     location / {
          ...
          proxy_cache one;
          proxy_cache_key sfs$request_uri$scheme;
          ...
     }
     ...
  }
}

This instructs nginx that it can use our one keys_set to cache incoming requests. MD5 hashes will be computed using the proxy_cache_key

We have one miss: express.js will not be serving the proper HTTP cache hint headers. I wrote a quick piece of middleware that will provide this functionality.

cacheMiddleware = cacheMiddleware = (seconds) -> (req, res, next) ->
    res.setHeader "Cache-Control", "public, max-age=#{seconds}"
    next()

It is not appropriate to apply this middleware globally – certain requests (e.g. post requests that affect server state) should never be cached. As such, I use it on a per-route basis in my express.js app:

...
app.get "/recent", cacheMiddleware(5 * 60), (req, res, next) ->
  #When someone hits /recent, nginx will cache it for 5 minutes!
...

GZIP
GZIP is a no-brainer for HTTP. By compressing incoming requests, clients will spend less time hogging up your server and everyone saves money on bandwidth. You could use some express.js middleware to handle gzipping of outgoing requests but nginx will do a better job and leave express.js with more resources. To enable GZIPed requests in nginx, add the following lines to the http section:

http {
    ...
    gzip on;
    gzip_comp_level 6;
    gzip_vary on;
    gzip_min_length  1000;
    gzip_proxied any;
    gzip_types text/plain text/html text/css application/json application/x-javascript text/xml application/xml application/xml+rss text/javascript;
    gzip_buffers 16 8k;
    ...
}

I won’t go into details on what these directives do. Like caching, any gzip is better than no gzip. For more control, there are a thousand micro-optimizations that you can perform that are all documented well under nginx’s HttpGzipModule.

SSL
Continuing our theme of leaving node.js to handle only basic HTTP, we arrive at SSL. As long as upstream servers are within a trusted network, it doesn’t make sense to encrypt traffic further than nginx – node.js can serve HTTP traffic for nginx to encrypt. This setup is easy to configure; in the server directive you can tell nginx how to configure SSL:

http {
   ...
   server {
        ...
        listen 443 ssl;
        ssl_certificate /some/location/sillyfacesociety.com.bundle.crt;
        ssl_certificate_key /some/location/sillyfacesociety.com.key;
        ssl_protocols        SSLv3 TLSv1;
        ssl_ciphers HIGH:!aNULL:!MD5;
        ...
   }
}

listen tells nginx to enable SSL traffic. ssl_certificate and ssl_certificate_key tell nginx where to find the certificates for your server. The ssl_protocols and ssl_ciphers lines instruct
nginx on how to serve traffic. These are details: full configuration options are available in the nginx HttpSslModule.

We are almost there. The above configuration will get nginx decrypting traffic and proxying unecrypted requests to our upstream server. However, the upstream server may need to know whether it is in a secure context or not. This can be used to serve SSL-enabled assets from CDNs like Cloudfront, or to reject requests that come unencrypted. Add the following lines to the location / section:

http {
   ...
   server {
       ...
       location / {
          ...
          proxy_set_header   X-Forwarded-Proto $scheme;
          ...
        }
      }
   }
}

This will send an HTTP header hint down to your node.js processes. Again, I whipped up a bit of middleware that make SSL-detection a little bit easier:

app.use (req, res, next) ->
  req.forwardedSecure = (req.headers["x-forwarded-proto"] == "https")
  next()

Within your routes, req.forwardedSecure will be true iff nginx is handling HTTPS traffic. For reference, the silly face society uses SSL for Facebook authentication token exchange when a user logs in using SSO (single sign on) on their phone. As an extension of implementing this, I also threw up a secure version of the site here.

Wrapping up
Phew. We covered how to (1) set up node.js as an upstream server for nginx and (2) how to lighten the load on the upstream server by letting nginx handle load balancing, static files, SSL, GZIP and caching. Caveat emptor: the silly face society hasn’t launched yet. The above configuration is based on personal testing and research: we haven’t reached production-level traffic yet. If anyone is still reading, I welcome suggestions for improvements in the comments.

Incidentally, our use of express.js for HTTP traffic is an accident. We started using node.js to provide a socket-server for the real time “party mode” of the silly face society. As we got close to launching, we decided to add a Draw Something-esque passive mode so that the Silly Face Society could have the launch inertia to be successful. Instead of rewriting our technology stack, we reused as much of the old code as possible and exposed an HTTP interface. If I had to do it all from scratch, I would re-evaluate our choices: node.js is a hard beast to tame for CRUD-apps.

Does your interest in nginx align with your interest in silly faces? Try out the Silly Face Society on the iPhone.

This article has also been translated to the Serbo-Croatian language by Anja Skrba from Webhostinggeeks.com.

Hardening node.js for production: a process supervisor

This post outlines a basic process supervisor for node.js that can be used to manage multiple PIDs underneath the same application. It contains sample code for our child_monitor with a deep health check for socket servers.

We started using node.js in the Silly Face Society because it seemed like the quickest way to get a (true, not web) socket server up for our “party mode”. The party mode is stateful chat+game server where a set of processes check into redis to advertise themselves as available to host games (in another post I’ll expand on this architecture). For development, node.js was a wonderful way to get a prototype out the door but as our launch approaches are getting nervous the robustness of node.js in production. This is especially noticeable when you tread off the beaten path of HTTP.

To begin: keeping our servers up is hard. Whenever a node.js error is unhandled, it unwinds the stack and leaves v8 in an unrecoverable state. The only recovery is to restart the process. There are few modules that help and forever seems to be the most popular. However, using forever won’t allow us to easily manage a set of child processes that fail independently, nor will it give us deep health checks. In the Silly Face Society’s party mode, each process has a set of game rooms and users in the same room must connect to the same process. For multi-core utilization we run separate processes (i.e. distinct PIDs) on the same host that listen on different ports for socket connections. To supervise these processes we have a script called child_monitor that handles logging, health checks and restarts across multiple PIDs.

Before we get to code, a couple of things to note:

  • forever could be used to manage the child_monitor, although I prefer using Ubuntu’s upstart for this purpose.
  • The cluster API isn’t applicable. Our traffic is routed to specific processes based on the game and thus can’t be load balanced.
  • By capturing exits of child processes, you can perform some recovery tasks for unhandled exceptions in a safe v8 process.

Here is child_process.coffee and a detailed walkthrough:

{_} = require 'underscore'
child_process = require 'child_process'

healthCheckInterval = 60 * 1000

delayTimeout = (ms, func) -> setTimeout func, ms #make setTimeout drink coffee
exports.spawnMonitoredChild = (script, port, healthCheck, environmentVariables) ->
  respawn = ->
    child = child_process.spawn process.execPath, [script],
      env: _.extend(environmentVariables, process.env)

    console.log "Started child, port=#{port}, pid=#{child.pid}"
    child.stdout.pipe process.stdout
    child.stderr.pipe process.stderr

    healthCheckTimeout = null

    delayedHealthCheck = ->
      healthCheckTimeout = delayTimeout healthCheckInterval, ->
        start = new Date()
        healthCheck port, (healthy) ->
          if healthy
            console.log "#{port} is healthy - ping time #{new Date() - start}ms"
            delayedHealthCheck()
          else
            console.error "#{port} did not respond in time - killing it"
            child.kill()

    child.on 'exit', (code, signal) ->
      clearTimeout healthCheckTimeout
      console.error "Child exited with code #{code}, signal #{signal}, respawning"
      respawn()

    delayedHealthCheck()
  respawn()

Also available as part of this gist on github.

First, the method signature:

spawnMonitoredChild = (script, port, healthCheck, environmentVariables) ->

spawnMonitoredChild accepts a script path (i.e. the node.js script you want to spawn), the port of the child (for disambiguation in logging statements and for the healthcheck), a deep healthCheck function and a set of environment variables. environmentVariables can be used to provide variables downwards, e.g. the environment (prod/development) or the child’s port.

The implementation of spawnMonitoredChild declares the respawn function, and then calls respawn() to bootstrap the respawn cycle. respawn is where the magic happens – it starts up the subprocess and performs periodic health checks:

    child = child_process.spawn process.execPath, [script],
      env: _.extend(environmentVariables, process.env)

This uses node’s standard child_process module’s spawn to bring up another instance of v8 (process.execPath) pointing at our child script ([script] are the arguments to the node or coffee scripts).

spawn is used instead of fork because spawn allows capture of the child process’ standard error / standard out.

respawn then proceeds to child process management. First, it takes care of its stderr and stdout:

child.stdout.pipe process.stdout
child.stderr.pipe process.stderr

The above code redirects the child stdout/stderr to that of the supervisor. In our actual production deployment, I instead capture the output streams and log to winston using a statement like this:

child.stderr.on 'data', ((data) -> winston.fatal "ERROR from child #{port}: #{data}")

winston is configured to send me an e-mail whenever a fatal occurs. Usually, winston’s e-mail mechanism can’t be run in the child process after an uncaught exception has occurred. Logging underneath the child_monitor supervisor skirts around this issue.

Moving on, we get to delayedHealthCheck. In a loop (i.e., recursively on callback) it calls the provided the provided healthCheck function. If the check ever fails, it kills the child process and bails on further health checks.

Finally, there is the exit handler for the subprocess:

child.on 'exit', (code, signal) -> 

The code here will explain itself: whenever the child process exits, it calls up to our respawn and logs a fatal.

That’s it! However, it wouldn’t be much of an example without sample use. Here is the healthChecker / spawn code for our “party mode” socket server:

healthCheck = (port, cb) ->
  c = net.connect port, 'localhost'
  c.setEncoding "utf8"

  gotAuth = false
  c.on 'data', (data) ->
    d = null
    try
      d = JSON.parse(data)
    catch error
      c.end()
      console.error "Health check failed: bad initial response, #{data}"
      return cb(false)

    if !gotAuth
      if d.cmd == "PLSAUTH"
        gotAuth = true
        c.write JSON.stringify({cmd:"RING"}) + "\r\n"
      else
        c.end()
        console.error "Health check failed: bad initial response, #{data}"
        return cb(false)
    else
      c.end()
      console.info "Health check response", {res: d}
      return cb(true)

  c.on 'error', (e) ->
    console.error "Health check failed: error connecting #{e}"
    cb(false)

  c.setTimeout config.healthCheckTimeout, -> c.destroy()

numWorkers = 2
startPort = 31337
for i in [0..numWorkers-1]
  port = startPort + i
  child_monitor.spawnMonitoredChild './lib/sfs_socket', port, healthCheck,
    SFS_SOCKET_PORT: port
    SFS_SOCKET_HOST: socketHost

The details aren’t too important. The healthCheck just connects to the local server and sends a RING command. The socket server is supposed to send AHOY-HOY as a response. Don’t ask.

Beyond sockets, we also use child_monitor to manage a set of express.js instances that serve our HTTP actions. By listening on different ports we are able to throw nginx in front as load balancer, http cache and static file host. child_monitor ensures that our backend servers are always available for nginx to call down. I’ll try to make a follow up post with the full details.

One final tip: killing the child_process supervisor should also take down its child processes. A Linux-only prctl system call in the child process will handle this for you. Add this to the top of your child script:

FFI = require('node-ffi')
current = new FFI.Library(null, {"prctl": ["int32", ["int32", "uint32"]]})
current.prctl(1,15)

Naturally, I stole the prctl bit from this stackoverflow post.

For the latest version of this code, see the gist on github.

Connection Pooling with node-postgres

Update on June 24, 2012: I submitted a patch to node-pool that exposes this pattern through a method called pooled on generic connection pools. I’ve updated the code samples below to include the more robust pattern. Take a look for Pooled function decoration in node-pool’s README.md for more information.

Documentation on node-postgres’ connection pooling is a little sparse. It is built-in but usage is somewhat unclear. Below is a minimally intrusive way to introduce it into an app.

node-postgres’ connection pooling works through the connect function on the module object:

pg = require 'pg'
pg.connect connectionString, (err, pgClient) ->
  return console.log "Error! #{err}" if err?
  # use pgClient

pgClient above is a pooled client. When a drain event occurs on the client, it is automatically returned to the pool: drain events generally occur after query has been executed on the client, unless you have suppressed drain events during a transaction. For full information, see the node-postgres documentation

Let’s assume you have a data access module with methods like this:

exports.findWidget = findWidget = (id, callback) -> ...
exports.transactedWidget = transactedWidget = (widgetSauce, callback) -> ...

The obvious way to incorporate connection pooling is something like:

pg = require 'pg'
connectionString = "tcp://postgres:postgres@localhost/dummy_db"
exports.findWidget = findWidget = (id, callback) ->
  pg.connect connectionString, (err, pgClient) ->
    return callback(err) if err?
    ... #Use pgClient to find

exports.transactedWidget = transactedWidget = (widgetSauce, callback) ->
  pg.connect connectionString, (err, pgClient) ->
    return callback(err) if err?
    ... #Use pgClient and do some transaction stuff

Frown town: three lines of useless boilerplate to every exported method in the data layer. Furthermore, trying to re-use methods in a transaction context is impossible. We can do better – create a new file called pg_pool.coffee with the following:

pg = require 'pg'

module.exports = pooler =
   #Get a connection from the pool
  acquire: (callback) -> pg.connect "tcp://postgres:postgres@localhost/dummy_db", callback

  #Decorate a function to use the de-pooled connection as a first argument
  pooled: (fn) -> ->
    callerCallback = arguments[arguments.length - 1]
    callerHasCallback = typeof callerCallback == 'function'
    callerArgs = Array::slice.call(arguments, 0, if callerHasCallback then -1 else undefined)

    pooler.acquire (err, pgClient) ->
      return (callerCallback err if err?) if err?
      pgClient.pauseDrain()
      callerArgs.push ->
        pgClient.resumeDrain()
        callerCallback.apply(null, arguments) if callerHasCallback
      fn pgClient, callerArgs...

Also available as gist. The pooled method creates a python-style decorator that will wrap data access methods in a connection pool. Furthermore, the client is kept out of the pool until the callback is executed (this is what pgClient.pauseDrain() and pgClient.resumeDrain() do in the above example). Using this, we can replace the data layer code with:

{pooled} = require './pg_pool'
exports.findWidget = pooled findWidget = (pgClient, id, callback) ->
 ... #Use pgClient to find widget
exports.transactedWidget = pooled transactedWidget = (pgClient, widgetSauce, callback) -> 
 ... #Use pgClient to do some jazz

In addition to brevity, this enabled nested method calls in transaction-context. The non-exported versions of the methods accept a pgClient parameter. For example, if transactedWidget needed to call findWidget in a transaction:

{pooled} = require 'pg_pool'
exports.findWidget = pooled findWidget = (pgClient, id, callback) ->
 ... #Use pgClient to find widget
exports.transactedWidget = pooled transactedWidget = (pgClient, widgetSauce, callback) -> 
 pgClient.query "BEGIN", (err) ->
   return callback(err) if err?  
   findWidget pgClient, "123", (err, widget) ->
     return callback(err) if err?
     pgClient.query "COMMIT", (err) ->
       callback(err, widget)

Not the prettiest code, but it beats having to write two versions of each method. Since we are using the pooled decorator, we can guarantee that the pgClient is stable for duration of this transaction.