注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

千鸟

本blog所有日志均系原创 转载请注明出处

 
 
 

日志

 
 

java 网络3.非阻塞式http服务器  

2008-01-08 11:17:09|  分类: J2EE |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |

 

预备知识:

Selector

ServerSocketChannel 监控 :接收 ,连接 就绪事件

   SocketChannel,监控: 连接就绪,读写就绪事件

SelectionKey

   Selector 被注册一个事件,就会产生一个SelectionKey 句柄。所有句柄放在Selector 的all_keys 集合中。当所注册的事件发生时,该SelectionKey会被添加到Selector selected_keys 集合中

   判断一个有选择事件(selected)发生并移到消键集( canceled_keys)中

     Set readyKeys = selector.selectedKeys(); //取得selected_keys 

      Iterator it = readyKeys.iterator();

      while (it.hasNext()){   //selected_keys 中有就绪的事件

        SelectionKey key=null;

        key = (SelectionKey) it.next();

        it.remove();  //移到canceled_key

A.事件业务处理设计:

Handler<------+------RequestHandler  //接受http请求和发送http 响应

            |-------AcceptHandler  //处理接受连接就绪事件

public interface Handler {

    public void handle(SelectionKey key) throws IOException;

}

B.服务器与客户端的连接核心流程

1. 给服务器注册连接就绪事件,

serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT,new AcceptHandler());

//注册  AcceptHandle

有连接就绪可以读出上面注册的 自定处理对象,做自己的进一步处理

Handler handler = (Handler)key.attachment();

3. 用在第2步获得的SelectionKey,注册读就绪事件

      RequestHandler rh = new RequestHandler(cio);

     socketChannel.register(key.selector(), SelectionKey.OP_READ, rh);//相当于注册到了serverSocketChannel中

//注册  RequestHandle

C.接流程中处理事件业务

final Handler handler = (Handler)key.attachment();

handler.handle(key);

因为在B.中RequestHandler和AcceptHandler 都被注册了,所以可以有相同的处理流程

D.分别设计RequestHandler和AcceptHandler

import java.io.*;

import java.nio.*;

import java.nio.channels.*;

public class RequestHandler implements Handler {

  private ChannelIO channelIO;

  private ByteBuffer requestByteBuffer = null;  //存放HTTP请求的缓冲区

  private boolean requestReceived = false;  //是否已经接收到了所有的HTTP请求

  private Request request = null;  //表示HTTP请求

  private Response response = null;  //表示HTTP响应

  RequestHandler(ChannelIO channelIO) {

    this.channelIO = channelIO;

  }

  /* 

   * 接收HTTP请求,如果已经接收到了所有的HTTP请求数据,就返回true,否则返回false

   */

  private boolean receive(SelectionKey sk) throws IOException {

    ByteBuffer tmp = null;

    if (requestReceived)return true;  //如果已经接收到所有HTTP请求数据,返回true

    

    //如果已经读到通道的末尾,或者已经读到HTTP请求数据的末尾标志“\r\n”,就返回true

    if ((channelIO.read() < 0) || Request.isComplete(channelIO.getReadBuf())) {

      requestByteBuffer = channelIO.getReadBuf();

      return (requestReceived = true);

    }

    return false;

  }

  /*

   * 通过Request类的parse()方法,解析requestByteBuffer中的HTTP请求数据,构造相应的Request对象 

   */

  private boolean parse() throws IOException {

    try {

      request = Request.parse(requestByteBuffer);

      return true;

    } catch (MalformedRequestException x) {  

      //如果HTTP请求的格式不正确,就发送错误信息

      response = new Response(Response.Code.BAD_REQUEST,

                          new StringContent(x));

    }

    return false;

  }

  /*

   * 创建HTTP响应 

   */

  private void build() throws IOException {

    Request.Action action = request.action();

    //仅仅支持GET和HEAD请求方式

    if ((action != Request.Action.GET) &&

            (action != Request.Action.HEAD)){

       response = new Response(Response.Code.METHOD_NOT_ALLOWED,

                          new StringContent("Method Not Allowed"));

    }else{

       response = new Response(Response.Code.OK,

                      new FileContent(request.uri()), action);

    }

  }

  

  /*  接收HTTP请求,发送HTTP响应 */

  public void handle(SelectionKey sk) throws IOException {

    try {

        if (request == null) { //如果还没有接收到HTTP请求的所有数据

            //接收HTTP请求      

            if (!receive(sk))return;

            requestByteBuffer.flip();

     

            //如果成功解析了HTTP请求,就创建一个Response对象

            if (parse())build();

     

            try {

                response.prepare();  //准备HTTP响应的内容

            } catch (IOException x) {

                response.release();  

                response = new Response(Response.Code.NOT_FOUND,

                                  new StringContent(x));

                response.prepare();

            }

            if (send()) {  

               //如果HTTP响应没有发送完毕,则需要注册写就绪事件,

               //以便在写就绪事件发生时继续发送数据

               sk.interestOps(SelectionKey.OP_WRITE);

            } else {

               //如果HTTP响应发送完毕,就断开底层的连接,并且释放Response占用的资源

               channelIO.close();

               response.release();

            }

        } else {  //如果已经接收到HTTP请求的所有数据

            if (!send()) {  //如果HTTP响应发送完毕

              channelIO.close();

              response.release();

            }

        }

    } catch (IOException x) {

        x.printStackTrace();

        channelIO.close();

        if (response !=  null) {

            response.release();

        }

    }

  }

  /* 发送HTTP响应,如果全部发送完毕,就返回false,否则返回true */  

  private boolean send() throws IOException {

    return response.send(channelIO);

  }

}

import java.io.*;

import java.nio.*;

import java.nio.channels.*;

public class AcceptHandler implements Handler {

  public void handle(SelectionKey key) throws IOException {

    ServerSocketChannel serverSocketChannel=(ServerSocketChannel)key.channel();

    SocketChannel socketChannel = serverSocketChannel.accept();

    if (socketChannel== null)return;

    System.out.println("接收到客户连接,来自:" +

                   socketChannel.socket().getInetAddress() +

                   ":" + socketChannel.socket().getPort());

    ChannelIO cio =new ChannelIO(socketChannel, false/*非阻塞模式*/);

    RequestHandler rh = new RequestHandler(cio);

    socketChannel.register(key.selector(), SelectionKey.OP_READ, rh);

  }

}

E.可自动扩大2倍的缓冲区类设计

import java.io.*;

import java.nio.*;

import java.nio.channels.*;

public class ChannelIO {

  protected SocketChannel socketChannel;

  protected ByteBuffer requestBuffer;

  private static int requestBufferSize = 4096;

  public ChannelIO(SocketChannel socketChannel, boolean blocking)

    throws IOException {

    this.socketChannel = socketChannel;

    socketChannel.configureBlocking(blocking); //设置阻塞模式

    requestBuffer = ByteBuffer.allocate(requestBufferSize);

  }

  public SocketChannel getSocketChannel() {

    return socketChannel;

  }

  /*

   * 如果原缓冲区的剩余容量不够,就创建一个新的缓冲区,容量为原来的两倍,

   * 把原来缓冲区的数据拷贝到新缓冲区

   */

  protected void resizeRequestBuffer(int remaining) {

    if (requestBuffer.remaining() < remaining) {

      // 把容量增大到原来的两倍

      ByteBuffer bb = ByteBuffer.allocate(requestBuffer.capacity() * 2);

      requestBuffer.flip();

      bb.put(requestBuffer);  //把原来缓冲区中的数据拷贝到新的缓冲区

      requestBuffer = bb;

    }

  }

  /*

   * 接收数据,把它们存放到requestBuffer中,如果requsetBuffer的剩余容量不足5%,

   * 就调用resizeRequestBuffer()方法扩充容量

   */

  public int read() throws IOException {

    resizeRequestBuffer(requestBufferSize/20);

    return socketChannel.read(requestBuffer);

  }

  /*

   * 返回requestBuffer,它存放了所有的请求数据

   */

  public ByteBuffer getReadBuf() {

      return requestBuffer;

  }

  /*

   * 发送参数指定的ByteBuffer中的数据

   */

  public int write(ByteBuffer src) throws IOException {

    return socketChannel.write(src);

  }

  /*

   * 把FileChannel中的数据写到SocketChannel中

   */

  public long transferTo(FileChannel fc, long pos, long len) throws IOException {

    return fc.transferTo(pos, len, socketChannel);

  }

  /*

   * 关闭SocketChannel

   */

  public void close() throws IOException {

    socketChannel.close();

  }

}

 这个例子在你的jdk的demo中找到:

 {JAVA_HOME}\sample\nio\server

完整代码下载:http://soulnew.googlepages.com/chapter05.rar

里面有孙卫琴对sun 的nio例子的中文翻译

  评论这张
 
阅读(1225)| 评论(0)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017