打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
实现基于netty的web框架,了解一下

上一篇写了,基于netty实现的rpc的微框架,其中详细介绍netty的原理及组件,这篇就不过多介绍

这篇实现基于netty的web框架,你说netty强不强,文中有不对的地方,欢迎大牛指正

先普及几个知识点

@sharable

标注一个channel handler可以被多个channel安全地共享。ChannelHandlerAdapter还提供了实用方法isSharable()。如果其对应的实现被标注为Sharable,那么这个方法将返回true,
表示它可以被添加到多个ChannelPipeline中。因为一个ChannelHandler可以从属于多个ChannelPipeline,所以它也可以绑定到多个ChannelHandlerContext实例。
用于这种用法的ChannelHandler必须要使用@Sharable注解标注;否则,试图将它添加到多个ChannelPipeline时将会触发异常。
显而易见,为了安全地被用于多个并发的Channel(即连接),这样的ChannelHandler必须是线程安全的。

AtomicInteger:这个类的存在是为了满足在高并发的情况下,原生的整形数值自增线程不安全的问题,在Java语言中,++i和i++操作并不是线程安全的,在使用的时候,不可避免的会用到synchronized关键字。而AtomicInteger则通过一种线程安全的加减操作接口。AtomicInteger为什么能够达到多而不乱,处理高并发应付自如呢?

  这是由硬件提供原子操作指令实现的,这里面用到了一种并发技术:CAS。在非激烈竞争的情况下,开销更小,速度更快

TimeUnit: 

TimeUnit是Java.util.concurrent包下面的一个类。它提供了两大功能:

1)提供了可读性更好的线程暂停操作,通常用来替换Thread.sleep(); 

2)提供了便捷方法用于把时间转换成不同单位,如把秒转换成毫秒;

TimeUnit.MINUTES.sleep(4);  // sleeping for 4 minutesThread.sleep(4*60*1000);

项目的目录结构

 

上代码,分享一些关键的代码,后续的giuhub上的demo的注释很详细

//Netty 事件回调类@Sharablepublic class MessageCollector extends ChannelInboundHandlerAdapter {    private final static Logger LOG = LoggerFactory.getLogger(MessageCollector.class);    //业务线程池    private ThreadPoolExecutor[] executors;    private RequestDispatch requestDispatch;    //业务队列最大值    private int requestsMaxInflight=1000;    public MessageCollector(int workerThreads,RequestDispatch dispatch){        //给业务线程命名        ThreadFactory factory =new ThreadFactory() {            AtomicInteger seq=new AtomicInteger();            @Override            public Thread newThread(Runnable r) {                Thread thread =new Thread(r);                thread.setName("http-"+seq.getAndIncrement());                return thread;            }        };        this.executors=new ThreadPoolExecutor[workerThreads];        for(int i=0;i<workerThreads;i++){            ArrayBlockingQueue queue=new ArrayBlockingQueue<Runnable>(requestsMaxInflight);            ////闲置时间超过30秒的线程就自动销毁            this.executors[i]=new ThreadPoolExecutor(1,1,                    30, TimeUnit.SECONDS, queue,factory,new CallerRunsPolicy());        }        this.requestDispatch=dispatch;    }    public  void  closeGracefully(){        //优雅一点关闭,先通知,再等待,最后强制关闭        for (int i=0;i<executors.length;i++){            ThreadPoolExecutor executor=executors[i];            try {                executor.awaitTermination(10,TimeUnit.SECONDS);            } catch (InterruptedException e) {                e.printStackTrace();            }            executor.shutdownNow();        }    }    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        //客户端来了一个新的连接       LOG.info("connection comes {}",ctx.channel().remoteAddress());    }    @Override    public void channelInactive(ChannelHandlerContext ctx) throws Exception {        //客户端走了一个        LOG.info("connection leaves {}",ctx.channel().remoteAddress());    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        if (msg instanceof FullHttpRequest){            FullHttpRequest req= (FullHttpRequest) msg;            CRC32 crc32=new CRC32();            crc32.update(ctx.hashCode());            int idx =(int) (crc32.getValue()%executors.length);            //用业务线程处理消息            this.executors[idx].execute(() ->{                requestDispatch.dispatch(ctx,req);            });        }    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        //此处可能因为客户机器突发重启        //也可能客户端连接时间超时,后面的REadTimeoutHandle抛出异常        //也可能消息协议错误,序列化异常        ctx.close();    }}
HttpServer
public class HttpServer {    private final static Logger LOG= LoggerFactory.getLogger(HttpServer.class);    private String ip;    private int port;  //端口    private int ioThreads;  //IO线程数,用于处理套接字读写,由Netty内部管理    private int workerThreads;  //业务线程数,专门处理http请求,由我们本省框架管理    private RequestDispatch requestDispatch;//请求配发器对象    public HttpServer() {    }    public HttpServer(String ip, int port, int ioThreads,                      int workerThreads, RequestDispatch requestDispatch) {        this.ip = ip;        this.port = port;        this.ioThreads = ioThreads;        this.workerThreads = workerThreads;        this.requestDispatch = requestDispatch;    }    //用于服务端,使用一个ServerChannel接收客户端的连接,    // 并创建对应的子Channel    private ServerBootstrap bootstrap;    //包含多个EventLoop    private EventLoopGroup group;    //代表一个Socket连接    private Channel serverChannel;    //    private MessageCollector collector;    public  void start(){        bootstrap=new ServerBootstrap();        group=new NioEventLoopGroup(ioThreads);        bootstrap.group(group);        collector=new MessageCollector(workerThreads,requestDispatch);        bootstrap.channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {            @Override            protected void initChannel(SocketChannel socketChannel) throws Exception {                ChannelPipeline pipeline=socketChannel.pipeline();                //如果客户端60秒没有任何请求,就关闭客户端连接                pipeline.addLast(new ReadTimeoutHandler(10));                //客户端和服务器简单的编解码器:HttpClientCodec和HttpServerCodec。                //ChannelPipelien中有解码器和编码器(或编解码器)后就可以操作不同的HttpObject消息了;但是HTTP请求和响应可以有很多消息数据,                // 你需要处理不同的部分,可能也需要聚合这些消息数据                pipeline.addLast(new HttpServerCodec());                //通过HttpObjectAggregator,Netty可以聚合HTTP消息,                // 使用FullHttpResponse和FullHttpRequest到ChannelPipeline中的下一个ChannelHandler,这就消除了断裂消息,保证了消息的完整                pipeline.addLast(new HttpObjectAggregator(1 << 30)); // max_size = 1g                //允许通过处理ChunkedInput来写大的数据块                pipeline.addLast(new ChunkedWriteHandler());                //将业务处理器放到最后                pipeline.addLast(collector);            }        });    }    public void stop() {        // 先关闭服务端套件字        serverChannel.close();        // 再斩断消息来源,停止io线程池        group.shutdownGracefully();        // 最后停止业务线程        collector.closeGracefully();    }}

 

RequestDispatcherImpl 是请求派发器,用于将收到的HTTP请求对象扔给响应的RequestHandler进行处理。
public class RequestDispatcherImpl implements RequestDispatch {    private final static Logger LOG = LoggerFactory.getLogger(RequestDispatcherImpl.class);    private String contextRoot;    private Router router;    private Map<Integer, WebExceptionHandler> exceptionHandlers = new HashMap<>();    private WebExceptionHandler defaultExceptionHandler = new DefaultExceptionHandler();    private WebTemplateEngine templateEngine = new WebTemplateEngine() {    };    static class DefaultExceptionHandler implements WebExceptionHandler {        @Override        public void handle(ApplicationContext ctx, AbortException e) {            if (e.getStatus().code() == 500) {                LOG.error("Internal Server Error", e);            }            ctx.error(e.getContent(), e.getStatus().code());        }    }    public RequestDispatcherImpl(Router router) {        this("/", router);    }    public RequestDispatcherImpl(String contextRoot, Router router) {        this.contextRoot = CurrentUtil.normalize(contextRoot);        this.router = router;    }    public RequestDispatcherImpl templateRoot(String templateRoot) {        this.templateEngine = new FreemarkerEngine(templateRoot);        return this;    }    public String root() {        return contextRoot;    }    public RequestDispatcherImpl exception(int code, WebExceptionHandler handler) {        this.exceptionHandlers.put(code, handler);        return this;    }    public RequestDispatcherImpl exception(WebExceptionHandler handler) {        this.defaultExceptionHandler = handler;        return this;    }    @Override    public void dispatch(ChannelHandlerContext channelCtx, FullHttpRequest req) {        ApplicationContext ctx = new ApplicationContext(channelCtx, contextRoot, templateEngine);        try {            this.handleImpl(ctx, new Request(req));        } catch (AbortException e) {            this.handleException(ctx, e);        } catch (Exception e) {            this.handleException(ctx, new AbortException(HttpResponseStatus.INTERNAL_SERVER_ERROR, e));        } finally {            req.release();        }    }    private void handleException(ApplicationContext ctx, AbortException e) {        WebExceptionHandler handler = this.exceptionHandlers.getOrDefault(e.getStatus().code(), defaultExceptionHandler);        try {            handler.handle(ctx, e);        } catch (Exception ex) {            this.defaultExceptionHandler.handle(ctx, new AbortException(HttpResponseStatus.INTERNAL_SERVER_ERROR, ex));        }    }    private void handleImpl(ApplicationContext ctx, Request req) throws Exception {        if (req.decoderResult().isFailure()) {            ctx.abort(400, "http protocol decode failed");        }        if (req.relativeUri().contains("./") || req.relativeUri().contains(".\\")) {            ctx.abort(400, "unsecure url not allowed");        }        if (!req.relativeUri().startsWith(contextRoot)) {            throw new AbortException(HttpResponseStatus.NOT_FOUND);        }        req.popRootUri(contextRoot);        router.handle(ctx, req);    }}

 

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【RPC 专栏】深入理解 RPC 之传输篇
Netty
netty+mqtt
Netty(一) SpringBoot 整合长连接心跳机制
netty客户端同步发送接收结果
使用netty进行服务端网络编程及数据高效分发功能实现
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服