注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

千鸟

本blog所有日志均系原创 转载请注明出处

 
 
 

日志

 
 

java 网络1.为什么要非阻塞式?  

2007-12-03 22:20:45|  分类: J2SE |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |
 

先看看多种java通信方式的特点

1.阻塞式

   服务器收到了一个连接,当通信完毕后释放连接,接收新的连接

2.阻塞式+线程池

   可利用java库类中现成的线程池,做起来比较简单

为每个用户分配一个线程

缺点:虽然可以通过线程池限制线程数防止服务器效率过低,但是线程调度毕竟要消耗资源

3.非阻塞式 用A,B方式

   A 用1个线程

   当用一个线程时,采用轮询的方式。看连接,可读,可写3个事件有谁就绪了,就处理谁。例如:如果一个线程连接就绪了,但是可读没有就绪,此时是接受新的连接任务的。

    对比1(阻塞式)

    不用像阻塞式一样把一个连接的3个事件(连接,可读,可写就绪)处理完了,才能处理下一个连接

    对比2(阻塞式+线程池)

    少了线程调度的开销,但是效率不一定比2(阻塞式+线程池)高。所以有了下面的连接方式

B 用2个线程

采用一个线程接收客户连接就绪,一个处理(可读,可写就绪)

    对比 A (用1个线程的阻塞式)

    当数据处理的时间较大时,现在可以一边处理数据,一边接受新的连接。 

    少了一次对连接就绪的轮询

    对比2(阻塞式+线程池)

    B只用了两个线程,减小了线程调度开销

以下代码出自 孙卫琴《java 网络编程精解》

方式一 阻塞式

package block;

import java.io.*;

import java.nio.*;

import java.nio.channels.*;

import java.nio.charset.*;

import java.net.*;

import java.util.*;

import java.util.concurrent.*;

public class EchoServer {

  private int port=8000;

  private ServerSocketChannel serverSocketChannel = null;

  private ExecutorService executorService;

  private static final int POOL_MULTIPLE = 4;

  public EchoServer() throws IOException {

    executorService= Executors.newFixedThreadPool(

     Runtime.getRuntime().availableProcessors() * POOL_MULTIPLE);

    serverSocketChannel= ServerSocketChannel.open();

    serverSocketChannel.socket().setReuseAddress(true);

    serverSocketChannel.socket().bind(new InetSocketAddress(port));

    System.out.println("服务器启动");

  }

  public void service() {

    while (true) {

      SocketChannel socketChannel=null;

      try {

        socketChannel = serverSocketChannel.accept();

        executorService.execute(new Handler(socketChannel));

      }catch (IOException e) {

         e.printStackTrace();

      }

    }

  }

  public static void main(String args[])throws IOException {

    new EchoServer().service();

  }

}

class Handler implements Runnable{

  private SocketChannel socketChannel;

  public Handler(SocketChannel socketChannel){

    this.socketChannel=socketChannel;

  }

  public void run(){

    handle(socketChannel);

  }

  public void handle(SocketChannel socketChannel){

    try {

        Socket socket=socketChannel.socket();

        System.out.println("接收到客户连接,来自: " +

        socket.getInetAddress() + ":" +socket.getPort());

        BufferedReader br =getReader(socket);

        PrintWriter pw = getWriter(socket);

        String msg = null;

        while ((msg = br.readLine()) != null) {

          System.out.println(msg);

          pw.println(echo(msg));

          if (msg.equals("bye"))

            break;

        }

      }catch (IOException e) {

         e.printStackTrace();

      }finally {

         try{

           if(socketChannel!=null)socketChannel.close();

         }catch (IOException e) {e.printStackTrace();}

      }

  }

  private PrintWriter getWriter(Socket socket)throws IOException{

    OutputStream socketOut = socket.getOutputStream();

    return new PrintWriter(socketOut,true);

  }

  private BufferedReader getReader(Socket socket)throws IOException{

    InputStream socketIn = socket.getInputStream();

    return new BufferedReader(new InputStreamReader(socketIn));

  }

  public String echo(String msg) {

    return "echo:" + msg;

  }

}

2.阻塞式+线程池

package multithread4;

import java.io.*;

import java.net.*;

import java.util.concurrent.*;

public class EchoServer {

  private int port=8000;

  private ServerSocket serverSocket;

  private ExecutorService executorService; //线程池

  private final int POOL_SIZE=4;  //单个CPU时线程池中工作线程的数目

  

  private int portForShutdown=8001;  //用于监听关闭服务器命令的端口

  private ServerSocket serverSocketForShutdown;

  private boolean isShutdown=false; //服务器是否已经关闭

  private Thread shutdownThread=new Thread(){   //负责关闭服务器的线程

    public void start(){

      this.setDaemon(true);  //设置为守护线程(也称为后台线程)

      super.start();

    }

    public void run(){

      while (!isShutdown) {

        Socket socketForShutdown=null;

        try {

          socketForShutdown= serverSocketForShutdown.accept();

          BufferedReader br = new BufferedReader(

                            new InputStreamReader(socketForShutdown.getInputStream()));

          String command=br.readLine();

         if(command.equals("shutdown")){

            long beginTime=System.currentTimeMillis(); 

            socketForShutdown.getOutputStream().write("服务器正在关闭\r\n".getBytes());

            isShutdown=true;

            //请求关闭线程池

//线程池不再接收新的任务,但是会继续执行完工作队列中现有的任务

            executorService.shutdown();  

            

            //等待关闭线程池,每次等待的超时时间为30秒

            while(!executorService.isTerminated())

              executorService.awaitTermination(30,TimeUnit.SECONDS); 

            

            serverSocket.close(); //关闭与EchoClient客户通信的ServerSocket 

            long endTime=System.currentTimeMillis(); 

            socketForShutdown.getOutputStream().write(("服务器已经关闭,"+

                "关闭服务器用了"+(endTime-beginTime)+"毫秒\r\n").getBytes());

            socketForShutdown.close();

            serverSocketForShutdown.close();

            

          }else{

            socketForShutdown.getOutputStream().write("错误的命令\r\n".getBytes());

            socketForShutdown.close();

          }  

        }catch (Exception e) {

           e.printStackTrace();

        } 

      } 

    }

  };

  public EchoServer() throws IOException {

    serverSocket = new ServerSocket(port);

    serverSocket.setSoTimeout(60000); //设定等待客户连接的超过时间为60秒

    serverSocketForShutdown = new ServerSocket(portForShutdown);

    //创建线程池

    executorService= Executors.newFixedThreadPool( 

     Runtime.getRuntime().availableProcessors() * POOL_SIZE);

    

    shutdownThread.start(); //启动负责关闭服务器的线程

    System.out.println("服务器启动");

  }

  

  public void service() {

    while (!isShutdown) {

      Socket socket=null;

      try {

        socket = serverSocket.accept();  //可能会抛出SocketTimeoutException和SocketException

        socket.setSoTimeout(60000);  //把等待客户发送数据的超时时间设为60秒          

        executorService.execute(new Handler(socket));  //可能会抛出RejectedExecutionException

      }catch(SocketTimeoutException e){

         //不必处理等待客户连接时出现的超时异常

      }catch(RejectedExecutionException e){

         try{

           if(socket!=null)socket.close();

         }catch(IOException x){}

         return;

      }catch(SocketException e) {

         //如果是由于在执行serverSocket.accept()方法时,

         //ServerSocket被ShutdownThread线程关闭而导致的异常,就退出service()方法

         if(e.getMessage().indexOf("socket closed")!=-1)return;

       }catch(IOException e) {

         e.printStackTrace();

      }

    }

  }

  public static void main(String args[])throws IOException {

    new EchoServer().service();

  }

}

class Handler implements Runnable{

  private Socket socket;

  public Handler(Socket socket){

    this.socket=socket;

  }

  private PrintWriter getWriter(Socket socket)throws IOException{

    OutputStream socketOut = socket.getOutputStream();

    return new PrintWriter(socketOut,true);

  }

  private BufferedReader getReader(Socket socket)throws IOException{

    InputStream socketIn = socket.getInputStream();

    return new BufferedReader(new InputStreamReader(socketIn));

  }

  public String echo(String msg) {

    return "echo:" + msg;

  }

  public void run(){

    try {

      System.out.println("New connection accepted " +

      socket.getInetAddress() + ":" +socket.getPort());

      BufferedReader br =getReader(socket);

      PrintWriter pw = getWriter(socket);

      String msg = null;

      while ((msg = br.readLine()) != null) {

        System.out.println(msg);

        pw.println(echo(msg));

        if (msg.equals("bye"))

          break;

      }

    }catch (IOException e) {

       e.printStackTrace();

    }finally {

       try{

         if(socket!=null)socket.close();

       }catch (IOException e) {e.printStackTrace();}

    }

  }

}

3. A 非阻塞式用1个线程

package nonblock;

import java.io.*;

import java.nio.*;

import java.nio.channels.*;

import java.nio.charset.*;

import java.net.*;

import java.util.*;

public class EchoServer{

  private Selector selector = null;

  private ServerSocketChannel serverSocketChannel = null;

  private int port = 8000;

  private Charset charset=Charset.forName("GBK");

  public EchoServer()throws IOException{

    selector = Selector.open();

    serverSocketChannel= ServerSocketChannel.open();

    serverSocketChannel.socket().setReuseAddress(true);

    serverSocketChannel.configureBlocking(false);

    serverSocketChannel.socket().bind(new InetSocketAddress(port));

    System.out.println("服务器启动");

  }

  public void service() throws IOException{

    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT );

    while (selector.select() > 0 ){

      Set readyKeys = selector.selectedKeys();

      Iterator it = readyKeys.iterator();

      while (it.hasNext()){

         SelectionKey key=null;

         try{

            key = (SelectionKey) it.next();

            it.remove();

            if (key.isAcceptable()) {

              ServerSocketChannel ssc = (ServerSocketChannel) key.channel();

              SocketChannel socketChannel = (SocketChannel) ssc.accept();

              System.out.println("接收到客户连接,来自:" +

                                 socketChannel.socket().getInetAddress() +

                                 ":" + socketChannel.socket().getPort());

              socketChannel.configureBlocking(false);

              ByteBuffer buffer = ByteBuffer.allocate(1024);

              socketChannel.register(selector,

                                     SelectionKey.OP_READ |

                                     SelectionKey.OP_WRITE, buffer);

            }

            if (key.isReadable()) {

                receive(key);

            }

            if (key.isWritable()) {

                send(key);

            }

        }catch(IOException e){

           e.printStackTrace();

           try{

               if(key!=null){

                   key.cancel();

                   key.channel().close();

               }

           }catch(Exception ex){e.printStackTrace();}

        }

      }//#while

    }//#while

  }

  public void send(SelectionKey key)throws IOException{

    ByteBuffer buffer=(ByteBuffer)key.attachment();

    SocketChannel socketChannel=(SocketChannel)key.channel();

    buffer.flip();  //把极限设为位置,把位置设为0

    String data=decode(buffer);

    if(data.indexOf("\r\n")==-1)return;

    String outputData=data.substring(0,data.indexOf("\n")+1);

    System.out.print(outputData);

    ByteBuffer outputBuffer=encode("echo:"+outputData);

    while(outputBuffer.hasRemaining())

      socketChannel.write(outputBuffer);

    ByteBuffer temp=encode(outputData);

    buffer.position(temp.limit());

    buffer.compact();

    if(outputData.equals("bye\r\n")){

      key.cancel();

      socketChannel.close();

      System.out.println("关闭与客户的连接");

    }

  }

  public void receive(SelectionKey key)throws IOException{

    ByteBuffer buffer=(ByteBuffer)key.attachment();

    SocketChannel socketChannel=(SocketChannel)key.channel();

    ByteBuffer readBuff= ByteBuffer.allocate(32);

    socketChannel.read(readBuff);

    readBuff.flip();

    buffer.limit(buffer.capacity());

    buffer.put(readBuff);

  }

  public String decode(ByteBuffer buffer){  //解码

    CharBuffer charBuffer= charset.decode(buffer);

    return charBuffer.toString();

  }

  public ByteBuffer encode(String str){  //编码

    return charset.encode(str);

  }

  public static void main(String args[])throws Exception{

    EchoServer server = new EchoServer();

    server.service();

  }

}

3. B 非阻塞式用2个线程

package thread2;

import java.io.*;

import java.nio.*;

import java.nio.channels.*;

import java.nio.charset.*;

import java.net.*;

import java.util.*;

public class EchoServer{

  private Selector selector = null;

  private ServerSocketChannel serverSocketChannel = null;

  private int port = 8000;

  private Charset charset=Charset.forName("GBK");

  public EchoServer()throws IOException{

    selector = Selector.open();

    serverSocketChannel= ServerSocketChannel.open();

    serverSocketChannel.socket().setReuseAddress(true);

    serverSocketChannel.socket().bind(new InetSocketAddress(port));

    System.out.println("服务器启动");

  }

  public void accept(){

      for(;;){

        try{

            SocketChannel socketChannel = serverSocketChannel.accept();

            System.out.println("接收到客户连接,来自:" +

                               socketChannel.socket().getInetAddress() +

                               ":" + socketChannel.socket().getPort());

            socketChannel.configureBlocking(false);

            ByteBuffer buffer = ByteBuffer.allocate(1024);

            synchronized(gate){

                selector.wakeup();

                socketChannel.register(selector,

                                       SelectionKey.OP_READ |

                                       SelectionKey.OP_WRITE, buffer);

            }

        }catch(IOException e){e.printStackTrace();}

      }

  }

  private Object gate=new Object();

  public void service() throws IOException{

    for(;;){

      synchronized(gate){}

      int n = selector.select();

      if(n==0)continue;

      Set readyKeys = selector.selectedKeys();

      Iterator it = readyKeys.iterator();

      while (it.hasNext()){

        SelectionKey key=null;

        try{

            key = (SelectionKey) it.next();

            it.remove();

            if (key.isReadable()) {

                receive(key);

            }

            if (key.isWritable()) {

                send(key);

            }

        }catch(IOException e){

           e.printStackTrace();

           try{

               if(key!=null){

                   key.cancel();

                   key.channel().close();

               }

           }catch(Exception ex){e.printStackTrace();}

        }

      }//#while

    }//#while

  }

  public void send(SelectionKey key)throws IOException{

    ByteBuffer buffer=(ByteBuffer)key.attachment();

    SocketChannel socketChannel=(SocketChannel)key.channel();

    buffer.flip();  //把极限设为位置

    String data=decode(buffer);

    if(data.indexOf("\n")==-1)return;

    String outputData=data.substring(0,data.indexOf("\n")+1);

    System.out.print(outputData);

    ByteBuffer outputBuffer=encode("echo:"+outputData);

    while(outputBuffer.hasRemaining())

      socketChannel.write(outputBuffer);

    ByteBuffer temp=encode(outputData);

    buffer.position(temp.limit());

    buffer.compact();

    if(outputData.equals("bye\r\n")){

      key.cancel();

      socketChannel.close();

      System.out.println("关闭与客户的连接");

    }

  }

  public void receive(SelectionKey key)throws IOException{

    ByteBuffer buffer=(ByteBuffer)key.attachment();

    SocketChannel socketChannel=(SocketChannel)key.channel();

    ByteBuffer readBuff= ByteBuffer.allocate(32);

    socketChannel.read(readBuff);

    readBuff.flip();

    buffer.limit(buffer.capacity());

    buffer.put(readBuff);

  }

  public String decode(ByteBuffer buffer){  //解码

    CharBuffer charBuffer= charset.decode(buffer);

    return charBuffer.toString();

  }

  public ByteBuffer encode(String str){  //编码

    return charset.encode(str);

  }

  public static void main(String args[])throws Exception{

    final EchoServer server = new EchoServer();

    Thread accept=new Thread(){

        public void run(){

            server.accept();

        }

    };

    accept.start();

    server.service();

  }

}

  评论这张
 
阅读(1917)| 评论(0)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017