개발자는 기록이 답이다

패스트캠퍼스 환급챌린지 4일차 미션 (2월 4일) : Spring Webflux 완전 정복 : 코루틴부터 리액티브 MSA 프로젝트까지 강의 후기 본문

패스트캠퍼스

패스트캠퍼스 환급챌린지 4일차 미션 (2월 4일) : Spring Webflux 완전 정복 : 코루틴부터 리액티브 MSA 프로젝트까지 강의 후기

slow-walker 2024. 2. 4. 21:36

 

Java AIO

  • Java 1.7부터 NIO2를 지원
  • AsynchronousChannel을 지원
    • AsynchronousSocketChannel
    • AsynchronousServerSocketChannel
    • AsynchronousFileChannel 등
  • callback과 future 지원

  • 내부적으로 Thread pool과 epoll, kqueue 등의 이벤트 알림 system call을 이용해서 IO를 비동기 적으로 처리
  • I/O가 준비되었을때, Future 혹은 callback으로 비동기적인 로직 처리 가능

 

  1. read나 write처럼 IO관련된 요청이 들어오면 비동기 채널은 바로 반환한다.
  2. Future을 반환하거나 아무것도 반환하지 않고 즉시 제어권은 caller한테 돌려준다.
    • caller는 본인의 일을 하면 되기 때문에 넌블로킹이다.
  3. 들어온 IO요청에 대해 전부 task Queue에 쌓는다.
    • task Queue는 Thread Pool안에 있는 task Queue이고, 비동기 채널의 요청들이 한곳에 쌓인다
  4. task Queue에 있는걸 Thread Pool안에 있는 Thread들이 받아서 처리하게 된다
  5. Thread Pool에서 해당 IO가 준비가 됬다고 판단되면 callback을 실행하면서 IO에 준비가 됬다고 알린다
  6. callback을 실행하기 때문에 caller는 이 사실을 모른채로 처리가 될 수도 있고, 혹은 Future를 반환했다면 Future를 통해서 결과를 볼 수 있다.

 

2. AsynchronousFileChannel - callback

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors;

@Slf4j
public class AsyncFileChannelReadCallbackExample {
    @SneakyThrows
    public static void main(String[] args) {
        log.info("start main");
        var file = new File(AsyncFileChannelReadCallbackExample.class
                .getClassLoader()
                .getResource("hello.txt")
                .getFile());

        var channel = AsynchronousFileChannel.open(file.toPath());
        ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
        channel.read(buffer, 0, null, new CompletionHandler<>() {
            @SneakyThrows
            @Override
            public void completed(Integer result, Object attachment) {
                buffer.flip();
                var resultString = StandardCharsets.UTF_8.decode(buffer);
                log.info("result: {}", resultString);
                channel.close();
            }

            @Override
            public void failed(Throwable ex, Object attachment) { /* do nothing */ }
        });

        while (channel.isOpen()) {
            log.info("Reading...");
        }
        log.info("end main");
    }
}

 

 

  1. 특정 파일을 오픈하고, 바이트 버퍼도 1kb만큼 미리 할당을 받는다.
  2. read할때 Java IO랑 JavaNIO와 다르게 attachment와 completionHandler를 보내게 된다.
    • attachment와 completionHandler는 callback을 같이 넘겨주는 것
  3.  Java IO랑 JavaNIO는 buffer만 보냄

3. AsynchronousFileChannel - funture

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Future;

@Slf4j
public class AsyncFileChannelReadFutureExample {
    @SneakyThrows
    public static void main(String[] args) {
        log.info("start main");
        var file = new File(AsyncFileChannelReadFutureExample.class
                .getClassLoader()
                .getResource("hello.txt")
                .getFile());

        try(var channel = AsynchronousFileChannel.open(file.toPath())) {
            var buffer = ByteBuffer.allocateDirect(1024);
            Future<Integer> channelRead = channel.read(buffer, 0);
            while (!channelRead.isDone()) {
                log.info("Reading...");
            }
            buffer.flip();
            var result = StandardCharsets.UTF_8.decode(buffer);
            log.info("result: {}", result);
        }
        log.info("end main");
    }
}
  1. 특정 파일을 오픈하고
  2. read할때 그 결과를 Future로 받는다
  3. Future를 받는 시점에는 아직 완료되지 않았기 때문에 while문을 이용해서 read가 됬는지 확인하면서 대기한다.
  4. 코드는 callback보다 깔끔해졌지만, main함수인 caller가 결과에 관심이 있기 때문에 주기적으로 체크를해야 한다
  5. 끝나는 시점에 값을 구해서 출력하는 코드이다
  6. 코드상으로는 더 간결하지만, isDone으로 체크하는 부분에서 CPU낭비가 생길 수 있다.

 

4. AsynchronousServerSocektChannel - callback

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;

@Slf4j
public class AsyncServerSocketCallbackExample {
    @SneakyThrows
    public static void main(String[] args) {
        log.info("start main");
        var serverSocketChannel = AsynchronousServerSocketChannel.open();
        var address = new InetSocketAddress("localhost", 8080);
        serverSocketChannel.bind(address);

        serverSocketChannel.accept(null, new CompletionHandler<>() {
            @Override
            public void completed(AsynchronousSocketChannel clientSocket, Object attachment) {
                log.info("accepted");
                var requestBuffer = ByteBuffer.allocateDirect(1024);

                clientSocket.read(requestBuffer, null, new CompletionHandler<>() {
                    @SneakyThrows
                    @Override
                    public void completed(Integer a, Object attachment) {
                        requestBuffer.flip();
                        var request = StandardCharsets.UTF_8.decode(requestBuffer);
                        log.info("request: {}", request);

                        var response = "This is server.";
                        var responseBuffer = ByteBuffer.wrap(response.getBytes());
                        clientSocket.write(responseBuffer);
                        clientSocket.close();
                        log.info("end client");
                    }

                    @Override
                    public void failed(Throwable ex, Object attachment) { /* do nothing */ }
                });
            }

            @Override
            public void failed(Throwable ex, Object attachment) { /* do nothing */ }
        });

        Thread.sleep(100_000);
        log.info("end main");
}
}

5. AsynchronousServerSocektChannel - future

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Future;

@Slf4j
public class AsyncServerSocketFutureExample {
    @SneakyThrows
    public static void main(String[] args) {
        log.info("start main");
        var serverSocketChannel = AsynchronousServerSocketChannel.open();
        var address = new InetSocketAddress("localhost", 8080);
        serverSocketChannel.bind(address);

        Future<AsynchronousSocketChannel> clientSocketFuture = serverSocketChannel.accept();
        while (!clientSocketFuture.isDone()) {
            Thread.sleep(100);
            log.info("Waiting...");
        }
        var clientSocket = clientSocketFuture.get();

        var requestBuffer = ByteBuffer.allocateDirect(1024);
        Future<Integer> channelRead = clientSocket.read(requestBuffer);
        while (!channelRead.isDone()) {
            log.info("Reading...");
        }

        requestBuffer.flip();
        var request = StandardCharsets.UTF_8.decode(requestBuffer);
        log.info("request: {}", request);

        var response = "This is server.";
        var responseBuffer = ByteBuffer.wrap(response.getBytes());
        clientSocket.write(responseBuffer);
        clientSocket.close();
        log.info("end client");
}
}

 


※ 본 포스팅은 패스트캠퍼스 환급 챌린지 참여를 위해 작성하였습니다. https://bit.ly/48sS29N