天天看點

java Nio 異步操作(三)

異步channel api

主要引入三個異步類: asynchronousfilechannel,asynchronoussocketchannel, and asynchronousserversocketchannel.

asynchronousfilechannel跟filechannel差別:不儲存全局的position和offset,可以制定通路位置,也支援并發通路檔案不同。

asynchronousserversocketchannel asynchronoussocketchannel:能夠綁定到一個制定線程池的組中,這個線程池能夠用future或者completionhandler來對執行結果進行處理,

asynchronouschannelgroup:執行異步io的java線程池的組類,

asynchronouschannelgroup.java:

public static asynchronouschannelgroup withfixedthreadpool(int nthreads, threadfactory threadfactory)

public static asynchronouschannelgroup withcachedthreadpool(executorservice executor,int initialsize)

public static asynchronouschannelgroup withthreadpool(executorservice executor)​​​

我們看使用示例

package com.mime;  

import java.io.ioexception;  

import java.net.inetsocketaddress;  

import java.net.standardsocketoptions;  

import java.nio.bytebuffer;  

import java.nio.channels.asynchronouschannelgroup;  

import java.nio.channels.asynchronousfilechannel;  

import java.nio.channels.asynchronousserversocketchannel;  

import java.nio.channels.asynchronoussocketchannel;  

import java.nio.channels.completionhandler;  

import java.nio.channels.filelock;  

import java.nio.charset.charset;  

import java.nio.file.path;  

import java.nio.file.paths;  

import java.nio.file.standardopenoption;  

import java.util.arraylist;  

import java.util.list;  

import java.util.set;  

import java.util.treeset;  

import java.util.concurrent.callable;  

import java.util.concurrent.executionexception;  

import java.util.concurrent.executorservice;  

import java.util.concurrent.executors;  

import java.util.concurrent.future;  

import java.util.concurrent.threadlocalrandom;  

public class nio2asynchronousfilechannel {  

    public static void main(string[] args) {  

        asyfile();  

        asyfilechannel2();  

        asyserversocketchannel();  

    }  

    // 異步檔案讀寫示例  

    public static void asyfile() {  

        bytebuffer buffer = bytebuffer.allocate(100);  

        string encoding = system.getproperty("file.encoding");  

        path path = paths.get("/tmp", "store.txt");  

        try (asynchronousfilechannel asynchronousfilechannel = asynchronousfilechannel  

                .open(path, standardopenoption.read)) {  

            future<integer> result = asynchronousfilechannel.read(buffer, 0);  

            // 讀逾時控制  

            // int count = result.get(100, timeunit.nanoseconds);  

            while (!result.isdone()) {  

                system.out.println("do something else while reading ...");  

            }  

            system.out.println("read done: " + result.isdone());  

            system.out.println("bytes read: " + result.get());  

            // 使用completionhandler回調接口異步讀取檔案  

            final thread current = thread.currentthread();  

            asynchronousfilechannel.read(buffer, 0,  

                    "read operation status ...",  

                    new completionhandler<integer, object>() {  

                        @override  

                        public void completed(integer result, object attachment) {  

                            system.out.println(attachment);  

                            system.out.print("read bytes: " + result);  

                            current.interrupt();  

                        }  

                        public void failed(throwable exc, object attachment) {  

                            system.out.println("error:" + exc);  

                    });  

        } catch (exception ex) {  

            system.err.println(ex);  

        }  

        buffer.flip();  

        system.out.print(charset.forname(encoding).decode(buffer));  

        buffer.clear();  

        // 異步檔案寫示例  

        bytebuffer buffer1 = bytebuffer  

                .wrap("the win keeps nadal at the top of the heap in men's"  

                        .getbytes());  

        path path1 = paths.get("/tmp", "store.txt");  

                .open(path1, standardopenoption.write)) {  

            future<integer> result = asynchronousfilechannel  

                    .write(buffer1, 100);  

                system.out.println("do something else while writing ...");  

            system.out.println("written done: " + result.isdone());  

            system.out.println("bytes written: " + result.get());  

            // file lock  

            future<filelock> featurelock = asynchronousfilechannel.lock();  

            system.out.println("waiting for the file to be locked ...");  

            filelock lock = featurelock.get();  

            if (lock.isvalid()) {  

                future<integer> featurewrite = asynchronousfilechannel.write(  

                        buffer, 0);  

                system.out.println("waiting for the bytes to be written ...");  

                int written = featurewrite.get();  

                // or, use shortcut  

                // int written = asynchronousfilechannel.write(buffer,0).get();  

                system.out.println("i’ve written " + written + " bytes into "  

                        + path.getfilename() + " locked file!");  

                lock.release();  

            // asynchronousfilechannel.lock("lock operation status:", new  

            // completionhandler<filelock, object>() ;  

    // public static asynchronousfilechannel open(path file, set<? extends  

    // openoption> options,executorservice executor, fileattribute<?>... attrs)  

    // throws ioexception  

    private static set withoptions() {  

        final set options = new treeset<>();  

        options.add(standardopenoption.read);  

        return options;  

    // 使用asynchronousfilechannel.open(path, withoptions(),  

    // taskexecutor))這個api對異步檔案io的處理  

    public static void asyfilechannel2() {  

        final int threads = 5;  

        executorservice taskexecutor = executors.newfixedthreadpool(threads);  

        list<future<bytebuffer>> list = new arraylist<>();  

        int sheeps = 0;  

        path path = paths.get("/tmp",  

                "store.txt");  

                .open(path, withoptions(), taskexecutor)) {  

            for (int i = 0; i < 50; i++) {  

                callable<bytebuffer> worker = new callable<bytebuffer>() {  

                    @override  

                    public bytebuffer call() throws exception {  

                        bytebuffer buffer = bytebuffer  

                                .allocatedirect(threadlocalrandom.current()  

                                        .nextint(100, 200));  

                        asynchronousfilechannel.read(buffer, threadlocalrandom  

                                .current().nextint(0, 100));  

                        return buffer;  

                    }  

                };  

                future<bytebuffer> future = taskexecutor.submit(worker);  

                list.add(future);  

            // this will make the executor accept no new threads  

            // and finish all existing threads in the queue  

            taskexecutor.shutdown();  

            // wait until all threads are finished  

            while (!taskexecutor.isterminated()) {  

                // do something else while the buffers are prepared  

                system.out  

                        .println("counting sheep while filling up some buffers!so far i counted: "  

                                + (sheeps += 1));  

            system.out.println("\ndone! here are the buffers:\n");  

            for (future<bytebuffer> future : list) {  

                bytebuffer buffer = future.get();  

                system.out.println("\n\n" + buffer);  

                        .println("______________________________________________________");  

                buffer.flip();  

                system.out.print(charset.forname(encoding).decode(buffer));  

                buffer.clear();  

    //異步server socket channel io處理示例  

    public static void asyserversocketchannel() {  

        //使用threadgroup  

//      asynchronouschannelgroup threadgroup = null;  

//      executorservice executorservice = executors  

//      .newcachedthreadpool(executors.defaultthreadfactory());  

//      try {  

//      threadgroup = asynchronouschannelgroup.withcachedthreadpool(executorservice, 1);  

//      } catch (ioexception ex) {  

//      system.err.println(ex);  

//      }  

//      asynchronousserversocketchannel asynchronousserversocketchannel =  

//              asynchronousserversocketchannel.open(threadgroup);  

        final int default_port = 5555;  

        final string ip = "127.0.0.1";  

        executorservice taskexecutor = executors.newcachedthreadpool(executors  

                .defaultthreadfactory());  

        // create asynchronous server socket channel bound to the default group  

        try (asynchronousserversocketchannel asynchronousserversocketchannel = asynchronousserversocketchannel  

                .open()) {  

            if (asynchronousserversocketchannel.isopen()) {  

                // set some options  

                asynchronousserversocketchannel.setoption(  

                        standardsocketoptions.so_rcvbuf, 4 * 1024);  

                        standardsocketoptions.so_reuseaddr, true);  

                // bind the server socket channel to local address  

                asynchronousserversocketchannel.bind(new inetsocketaddress(ip,  

                        default_port));  

                // display a waiting message while ... waiting clients  

                system.out.println("waiting for connections ...");  

                while (true) {  

                    future<asynchronoussocketchannel> asynchronoussocketchannelfuture = asynchronousserversocketchannel.accept();  

                    //使用completionhandler來處理io事件  

//                  asynchronousserversocketchannel.accept(null, new completionhandler<asynchronoussocketchannel, void>()   

                    //client使用completionhandler來處理io事件  

                    //asynchronoussocketchannel.connect(new inetsocketaddress(ip, default_port), null,new completionhandler<void, void>()   

                    try {  

                        final asynchronoussocketchannel asynchronoussocketchannel = asynchronoussocketchannelfuture  

                                .get();  

                        callable<string> worker = new callable<string>() {  

                            @override  

                            public string call() throws exception {  

                                string host = asynchronoussocketchannel  

                                        .getremoteaddress().tostring();  

                                system.out.println("incoming connection from: "  

                                        + host);  

                                final bytebuffer buffer = bytebuffer  

                                        .allocatedirect(1024);  

                                // transmitting data  

                                while (asynchronoussocketchannel.read(buffer)  

                                        .get() != -1) {  

                                    buffer.flip();  

                                }  

                                asynchronoussocketchannel.write(buffer).get();  

                                if (buffer.hasremaining()) {  

                                    buffer.compact();  

                                } else {  

                                    buffer.clear();  

                                asynchronoussocketchannel.close();  

                                system.out.println(host  

                                        + " was successfully served!");  

                                return host;  

                            }  

                        };  

                        taskexecutor.submit(worker);  

                    } catch (interruptedexception | executionexception ex) {  

                        system.err.println(ex);  

                        system.err.println("\n server is shutting down ...");  

                        // this will make the executor accept no new threads  

                        // and finish all existing threads in the queue  

                        taskexecutor.shutdown();  

                        // wait until all threads are finished  

                        while (!taskexecutor.isterminated()) {  

                        break;  

                }  

            } else {  

                        .println("the asynchronous server-socket channel cannot be opened!");  

        } catch (ioexception ex) {  

}  

輸出:

do something else while reading ...  

read done: true  

bytes read: 18  

hello,filechannel  

read operation status ...  

error:java.nio.channels.asynchronouscloseexception  

do something else while writing ...  

written done: true  

bytes written: 51  

waiting for the file to be locked ...  

waiting for the bytes to be written ...  

i’ve written 100 bytes into store.txt locked file!  

counting sheep while filling up some buffers!so far i counted: 1  

counting sheep while filling up some buffers!so far i counted: 2  

counting sheep while filling up some buffers!so far i counted: 3  

counting sheep while filling up some buffers!so far i counted: 4  

counting sheep while filling up some buffers!so far i counted: 5  

counting sheep while filling up some buffers!so far i counted: 6  

counting sheep while filling up some buffers!so far i counted: 7  

counting sheep while filling up some buffers!so far i counted: 8  

counting sheep while filling up some buffers!so far i counted: 9  

counting sheep while filling up some buffers!so far i counted: 10  

counting sheep while filling up some buffers!so far i counted: 11  

done! here are the buffers:  

java.lang.interruptedexception  

waiting for connections ...