打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
Twitter Kestrel如何使用Netty以及Netty scala压测代码

Twitter的核心队列Kestrel使用Netty作为通信模块,从另一个角度证明了Netty的性能和健壮。

Netty是否比MINA强?从底层实现,两者几乎差不多,但Netty的优势是从架构上采用事件通知机制,真正的将异步模式引入来解决各种场景。响应时间可能会加长,但优势在于系统之间的依赖减弱,自身处理能力的决定因素自封闭(瓶颈可以直接根据自身业务处理资源消耗情况估计出来)

 

我们看看Twitter是怎么用Netty。Twitter很多项目都是用scala写的,scala是很简洁的语言,直接运行在jvm上。可以直接调用Java类。下边的代码都是来自Twitter的核心队列项目Kestrel。这个项目很有意思,可能以后还会讨论,这里先说说怎么用Netty。


NettyHandler.scala是处理Netty网络事件的基类,其他具体协议实现类,MemcacheHandler和TextHandler都继承NettyHandler。NettyHandler应用Netty的ChannelUpStreamHandler接口,这个接口处理上行请求。同时继承KestrelHandler。KestrelHandler处理Kestrel消息队列的行为,包括getItem、setItem等等。



 

NettyHandler主要方法是handleUpstream。处理上行请求:MessageEvent,ChannelStatEvent,等等。这些实现基本上参照Netty官网给的sample很容易实现。方法不长,才40多行,用scala写出来,有点小清新:)

Scala代码  
  1. def handleUpstream(context: ChannelHandlerContext, event: ChannelEvent) {  
  2.     event match {  
  3.       case m: MessageEvent =>  
  4.         // 具体实现由协议实现类MemcacheHandler等实现  
  5.         handle(m.getMessage().asInstanceOf[M])  
  6.       case e: ExceptionEvent =>  
  7.         // 异常处理  
  8.         e.getCause() match {  
  9.           case _: ProtocolError =>  
  10.             handleProtocolError()  
  11.           case e: ClosedChannelException =>  
  12.             finish()  
  13.           case e: IOException =>  
  14.             log.debug("I/O Exception on session %d: %s", sessionId, e.toString)  
  15.           case e =>  
  16.             log.error(e, "Exception caught on session %d: %s", sessionId, e.toString)  
  17.             handleException(e)  
  18.         }  
  19.         e.getChannel().close()  
  20.       case s: ChannelStateEvent =>  
  21.         // 目前状态为connected但statevent.getValue is null,中断连接  
  22.         if ((s.getState() == ChannelState.CONNECTED) && (s.getValue() eq null)) {  
  23.           finish()  
  24.         } else if ((s.getState() == ChannelState.OPEN) && (s.getValue() == true)) {  
  25.           // 创建连接  
  26.           channel = s.getChannel()  
  27.           remoteAddress = channel.getRemoteAddress.asInstanceOf[InetSocketAddress]  
  28.           if (clientTimeout.isDefined) {  
  29.             channel.getPipeline.addFirst("idle", new IdleStateHandler(Kestrel.kestrel.timer, 0, 0, clientTimeout.get.inSeconds.toInt))  
  30.           }  
  31.           channelGroup.add(channel)  
  32.           // don't use `remoteAddress.getHostName` because it may do a DNS lookup.  
  33.           log.debug("New session %d from %s:%d", sessionId, remoteAddress.getAddress.getHostAddress, remoteAddress.getPort)  
  34.         }  
  35.       case i: IdleStateEvent =>  
  36.         // 增加idel监控  
  37.         log.debug("Idle timeout on session %s", channel)  
  38.         channel.close()  
  39.       case e =>  
  40.         // 其他消息继续发出upstream事件  
  41.         context.sendUpstream(e)  
  42.     }  
  43.   }  

 

MemcacheHandler和TextHandler是协议具体的实现。继承NettyHandler。因为Memcached协议比较简单,所以协议实现类就不多说了。阅读这些代码主要的障碍还是在于Java程序员对于某些scala的语法不习惯。我这里介绍个简单但是常用的:Scala的泛型。Scala创始人Martin Odersky曾说过,泛型正是他想要创建Scala语言的最重要因素之一。当然Java1.5以后已经引入了泛型,我们对这个东东已经很熟悉了。看看Twitter怎么使用Scala泛型。比教科书上生动很多。和Java使用<>指定泛型类似,NettyHandler中Scala的泛型M,放在[]里。

 

abstract class NettyHandler[M](

  val channelGroup: ChannelGroup,

  queueCollection: QueueCollection,

  maxOpenTransactions: Int,

  clientTimeout: Option[Duration])

extends KestrelHandler(queueCollection, maxOpenTransactions) with 

 

ChannelUpstreamHandler {

...

  def handleUpstream(context: ChannelHandlerContext, event: ChannelEvent) {

    event match {

      case m: MessageEvent =>

        handle(m.getMessage().asInstanceOf[M])

  }

...

}

 

在NettyHandler中,任何MessageEvent都被转换为泛型M,并交给子类处理。TextHandler和MemcacheHandler是这样给自己的泛型定义的。

class TextHandler( ...) extends NettyHandler[TextRequest](...) 

class MemcacheHandler(...) extends NettyHandler[MemcacheRequest](...) 

 

接下来我们自己写一个Scala程序。

Netty服务器压测代码网上有不少版本,基本思路就是实现一个简单的echo handler。还可以添加了一个server主动push的部分。代码用scala实现,可以作为朋友们学习scala的例子。

 

 

Scala代码  
  1. import org.jboss.netty.channel._  
  2. import org.jboss.netty.buffer._  
  3. import org.jboss.netty.bootstrap.ServerBootstrap  
  4. import java.util._  
  5. import java.util.concurrent._  
  6. import java.io._  
  7. import java.net._  
  8. import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;   
  9. import scala.collection.mutable  
  10.   
  11. object NettyLoadServer {  
  12.     def main(args: Array[String]): Unit = {  
  13.         val testServer = new NettyLoadServer();  
  14.         testServer.loadTest();  
  15.     }  
  16. }  
  17.   
  18. class NettyLoadServer {  
  19.     var channel: Channel = null  
  20.     private var remoteAddress: InetSocketAddress = null  
  21.     val channels = new mutable.ListBuffer[Channel];  
  22.     var number = 0;  
  23.       
  24.     class LoadTestHandler extends SimpleChannelHandler with ChannelUpstreamHandler {  
  25.         override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent)  
  26.         {  
  27.             e.getCause().printStackTrace();  
  28.             channels -= e.getChannel()  
  29.             e.getChannel().close();  
  30.         }  
  31.   
  32.         override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {  
  33.             e.getChannel().write(e.getMessage());  
  34.         }  
  35.           
  36.         override def handleUpstream(ctx: ChannelHandlerContext, e: ChannelEvent) {  
  37.             e match {  
  38.                 case s: ChannelStateEvent =>  
  39.                     if ((s.getState() == ChannelState.OPEN) && (s.getValue() == true)) {  
  40.                         channel = s.getChannel()  
  41.                         remoteAddress = channel.getRemoteAddress.asInstanceOf[InetSocketAddress]  
  42.                         channels += channel                                    
  43.                         System.out.println("New session from " + remoteAddress.getAddress.getHostAddress +  
  44.                             ":" + remoteAddress.getPort)  
  45.                         }  
  46.                 case e =>  
  47.                     // ignore  
  48.             }  
  49.   
  50.             super.handleUpstream(ctx, e);  
  51.         }  
  52.     }  
  53.     
  54.     class ChannelManagerThread extends Thread {   
  55.         override def run() {   
  56.             while (true) {   
  57.                 try {  
  58.                     System.out.println("channels.size() = " + channels.count(c => c.isInstanceOf[Channel]));  
  59.                       
  60.                     for(s <- channels) {  
  61.                         var cb = new DynamicChannelBuffer(256);   
  62.                         cb.writeBytes("abcd1234".getBytes());   
  63.                         s.write(cb);   
  64.                     }  
  65.                   
  66.                     Thread.sleep(500);   
  67.                 }  
  68.                 catch {   
  69.                     case e => e.printStackTrace();  
  70.                 }   
  71.             }   
  72.         }   
  73.     }   
  74.   
  75.       
  76.     def loadTest() {  
  77.         try {  
  78.             val factory = new NioServerSocketChannelFactory(Executors   
  79.               .newCachedThreadPool(), Executors.newCachedThreadPool());   
  80.             val bootstrap = new ServerBootstrap(factory);   
  81.             val handler = new LoadTestHandler();   
  82.             val pipeline = bootstrap.getPipeline();   
  83.             pipeline.addLast("loadtest", handler);   
  84.             bootstrap.setOption("child.tcpNoDelay", true);   
  85.             bootstrap.setOption("child.keepAlive", true);   
  86.             bootstrap.bind(new InetSocketAddress(8007));   
  87.               
  88.             val cmt = new ChannelManagerThread();   
  89.             cmt.start();   
  90.         }   
  91.         catch {  
  92.             case e => e.printStackTrace();  
  93.         }  
  94.     }  
  95. }  

附件里是我的scala sbt工程。

 

压测client推荐使用Jboss自己的Benchmark:

http://anonsvn.jboss.org/repos/netty/subproject/benchmark/

 

用ab也可以:

ab -n 20000 -c 20000 -k -t 999999999 -r http://192.168.1.2:8007/

 

补充:Twitter还有很多很有意思的项目,希望有兴趣的朋友一起来研究学习。

 

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
浅谈长连接保活机制
从Twitter架构变迁看Web2.0的架构技术-ChinaUnix技术开发频道
scala
Java NIO框架Netty教程(八)
Netty代码分析 | 淘宝网综合业务平台团队博客
一切从ServerBootstrap开始
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服