blog podcast

The Pipeline Pattern

I’ve been fascinated by design patterns for a while. I’ve found that the patterns that are outlined in for example Heads First Design Patterns in theory seem really nice, but in reality don’t really match the kind of problems that I encounter in my every day corporate coding. I’ve attributed this to the fact that the classic design patterns were developed at a different time, where a different type of applications were developed.

Then suddenly I started coding on a different project alas not at work, but at home. The project was a simple proxy service that would proxy data streams. I got the privilege to go a bit more low level compared to the typical spring boot application. I used Netty for the application. I still wouldn’t consider Netty as being very low level, but at least I got to encounter a new problem domain. Now I got the raw byte stream of a request coming in, and now it was up to me what to do with it..

Now, Netty resolves around the idea of pipelines. A bytestream comes in. It gets handled by a handler and then the results gets passed on to the next handler etc. It’s a nice way of separating different concerns into separate units of code, and for the first time in a long while I had a feeling I was looking at a design pattern. Finally!

Code of the Day

As you can see from the code, the ChannelPipeline has many separate steps. In here it is mixed with handlers for outbound as well as inbound request handling. You can see for example the HttpRequestDecoder, which converts the bytes into a HttpRequest, the ensuing handlers can then operate on a HttpRequest instead. Pretty neat.

val b = ServerBootstrap()
b.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel::class.java)
    .option(ChannelOption.SO_BACKLOG, 100)
    .childHandler(object:ChannelInitializer<SocketChannel>() {
        override fun initChannel(ch: SocketChannel?) {
            val p: ChannelPipeline = ch!!.pipeline()
            if(sslCtx != null) {
                p.addLast(sslCtx!!.newHandler(ch.alloc()))
            }
            val byteStatisticsHandler = SinkStatisticsHandler()
            p.addLast(SinkStatisticsOutboundHandler(byteStatisticsHandler))
            p.addLast(LagLimitOutboundHandler())
            p.addLast(HttpRequestDecoder())
            p.addLast(StreamSubscriptionInboundHandler(subscriptionManager))
            p.addLast(SinkStatisticsInboundHandler(byteStatisticsHandler))
            p.addLast(statisticsRequestHandler)
            p.addLast(notFoundHandler)
        }
    })