Programming/Process & Multi Threading

[Java] Executors로 스레드 관리하기

dev.pudding 2024. 2. 4. 12:51
728x90

Thread Pool 과 Executor Framework를 사용하는 이유?

- 자바에서 Executors는 task를 간단하게 비동기로 처리할 수 있게 해주는 thread-pool과 API를 제공하는 framework이다. 각각의 프로세스에 스레드를 새로 생성하고 관리하는 작업은 메모리를 소비하고, CPU에서는 컨텍스트 스위칭(Context Switching)이 일어나 그 만큼의 시간을 소비하게 된다.  이 때 Thread Pool을 사용한다면 스레드를 재사용하여 효율적으로 스레드를 관리할 수 있다. Thread Pool은 Queue 자료구조로 관리된다.

 

Executor는 크게 4 타입으로 구분된다.

 

1.) SingleThreadExecutor

- task를 순차적으로(sequential)로 실행하기 위해 단일 스레드를 생성한다. 모든 프로세스는 단일 스레드에 의해 실행되며 동시성이나 병렬성이 필요하지 않는 상황에서 사용된다.

2.) FixedThreadPool(n)

- n개의 스레드로 Thread pool을 생성한다. 보통 n은 CPU의 코어의 갯수로 지정된다. n개 이상의 task가 들어온다면 여분의 task들은 LinkedBlockingQueue 자료구조에 저장된다.

3.) CachedThreadPool

- 스레드 갯수에 제한이 없는 Thread Pool을 생성한다. 새로운 task를 실행할 수 있는 스레드가 없다면 Thread Pool에서 새로운 스레드를 생성하여 해당 task를 처리한다. 반대로 스레드가 60초 동안 idle(어떤 작업도 수행하지 않는 대기상태) 상태에 있다면 해당 스레드는 삭제된다. 스레드가 갯수 제한없이 생성되기 때문에, 스레드가 많아지면 CPU의 context switching으로 인해 오버헤드가 발생한다. 따라서 CachedThreadPool은 짧은 시간의 병렬처리에서 주로 이용한다.

4.) ScheduledExecutor

주어진 간격에 따라 작업을 실행한다. 

 

SingleThreadExecutor

class Task implements Runnable{
    //Task의 id
    private int id;

    public Task(int id){
      this.id = id;
    }
    
    @Override
    public void run(){
    
          System.out.println("Task id :" + id + ",Thread id : " + Thread.currentThread().getName());
          long duration = (long)(Math.random * 5);
          
          try{
             //0~4초 사이 스레드 일시중단
             TimeUnit.SECONDS.sleep(duration);
          }catch(InterruptedException e){
             e.printStackTrace();
          }
    }
}

Task의 id와 Thread의 이름이 출력되도록 run()메소드를 오버라이드했다. 

 

 public class SingleThreadExecutor{
 
    public static void main(String[] args){
    
        //ExecutorService 구현하여 SingleThreadExecutor 사용 
        ExecutorService executor = Executors.newSingleThreadExecutor();
        
        //5번의 작업을 스레드풀에게 지시
        for(int i=0;i<5;i++){
           executor.execute(new Task(i));
        }
    }
 }

Executor 클래스는 팩토리 메소드 패턴으로 구현되어있는데, newSingleThreadExecutor()는 ExecutorService의 구현체 중 SingleThreadExecutor를 반환한다. 해당 구현체는 큐에 있는 작업을 순차적으로 처리하며 하나의 스레드에서 작업을 실행한다.

 

for문으로 executor를 사용하여 5번의 작업을 실행하도록 하였다. 각각의 반복에서 execute 메소드를 호출하여 Task클래스의 인스턴스를 새로운 작업으로 제출한다.  이 때, 스레드 풀이 몇 개의 스레드로 구성되어있는지에 따라 작업이 동시에 실행될 수도 있고 순차적으로 실행될 수도 있는데 SingleThreadExecutor는 하나의 스레드만 생성하기 때문에 순차적으로 실행된다.

 

결과

-> Task는 순차적으로 여러번 실행됬지만 스레드 id는 동일하다. 즉 하나의 스레드에서 task가 실행되었다.

 

 

FixedThreadPool(n)

public class FixedThreadPool{
       public static void main(String[] args){
          
           // ExecutorService구현하여 FixedThreadPool사용 
           // 작업 실행시 2개의 스레드를 사용한다.
           ExecutorService executor = Executors.newFixedThreadPool(2);
           
           for(int i=0;i<5;i++){
               executor.execute(new Task(i));
           }
       
       }
}

newFixedThreadPool(n)은 FixedThreadPool을 반환하는데, n의 인자값으로 2가 들어갔기 때문에, Thread Pool에는 2개의 스레드가 생성되며, task 실행시 두개의 스레드로 실행된다.

 

결과

-> thread 15,16이 번갈아가면서 실행되었다.

 

 

 

ScheduledThreadPool

class StockUpdator implements Runnable{

    @Override
    public void run(){
       System.out.println("update stock data");
    }
}


public class ScheduledThreadPool{

    public static void main(String[] args){
       
       //1개의 스레드로 ScheduledThreadPool 생성
       ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
    
       // 지정된 간격으로 스레드 실행 
       executor.scheduledAtFixedRate(new StockUpdator(),1000,2000,TimeUnit.MILISECONDS);
       
    }

}

 

ScheduledExecutorService 인터페이스의 scheduledAtFixedFate 메소드는 주기적인 작업을 실행하기 위해 사용되는 메소드이다. 

scheduledAtFixedRate(Runnable command, long initDelay, long period, TimeUnit unit);

 

command : 주기적으로 실행될 작업을 나타내며 Runnable 인터페이스 구현 객체가 사용된다. 코드 예시에서는 Runnable 인터페이스를 구현한 StockUpdator 클래스가 주기적으로 실행되도록 예약되었다.

initDelay : 첫번쨰 작업이 시작되기 전까지의 지연시간을 나타낸다. 코드 예시는 밀리세컨즈 단위로 지정했기 때문에 1초가 지나서 첫번쨰 작업이 실행된다.

period  : 연속적인 작업 간의 실행 간격을 나타낸다. 코드 예시에서는 2초마다 실행되도록 하였다.

unit : initDelay 및 period 파라메터의 시간 단위를 지정한다. 

 

결과

1초가 지나서 StockUpdator가 실행되어 2초 간격마다 update stock data가 출력되었다. 

 

Executors 중지하기

class Work implements Runnable{
   private int id;
   
   public Work(int id){
       this.id = id;
   }
   
   
   @Override
   public void run(){
   
       System.out.println("Task with Id " + id + ",Thread id : " + Thread.currentThread().getId());
       long duration = (long)(Math.random()*5);
       
       try{
          TimeUnit.SECONDS.sleep(duration);
       }catch(InterruptedExeption e){
       	 //0~4초 랜덤대기 후 스레드 interrupt
          Thread.currentThread().interrupt();
       }
   }
}

 

0~4초 후 , 스레드가 interrupt 되도록 설정하였다.

 

 

 

public class ThreadPool{
     public static void main(String[] args){
      
         ExecutorService executor = Executors.newFixedThreadPool(2);
         
         //100번 실행
         for(int i=0;i<100;i++){
              executor.execute(new Work(i));
         }
         
         //ExecutorService 종료 
         executor.shutdown();
         
         try{
             //큐에 task가 있다면
             if(!executor.awaitTermination(1000,TimeUnit.MILISECONDS)){
                   //대기중인 task종료 
                   executor.shutdownNow();
             }
         }catch(InterruptedException e){
              executor.shutdownNow();
         }
     }
}

executor.shutdown() : ExecutorService를 종료하여 추가적인 작업 수행을 막는다. 하지만 이미 큐에 있는 작업들은 계속해서 실행된다. 

executor.awaitTermination(long timeout, TimeUnit unit) : ExecutorService가 종료될 떄까지 대기한다. timeout은 최대 기다릴 시간, unit은 timeout의 시간단위이다. 코드예시에서는 1초동안 기다리도록 되있으며, 만약 ExecutorService가 지정된 시간(1초) 내에 종료되면 true를 반환하고, 타임아웃이 발생하면 false를 반환한다.

if 조건문은 ExecutorService가 아직 실행중인 경우(큐에 task가 남아있는 경우) executor.shutdownNow()를 실행하여 실행중인 모든 작업들을 취소한다. 즉 task 100개가 다 실행되기 전에 ExecutorService가 종료된다. 

 

 run메소드 오버라이드시 스레드 대기상태가 지나면 스레드가 interrupt되도록 설정하였기 때문에 만약 task 100개를 모두 실행시키고 싶다면 if조건문을 실행하지 않고 catch문의 executor.shutdownNow()만 실행되도록 설정한다. 

 try {
       //1초 기다린 후에도 큐에 task가 있으면 false반환
       if (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
            // 주석처리하여 실행되지 않도록 함.
            // executor.shutdownNow();
         }
} catch (InterruptedException e) {
            //큐에 있는 task 100개 모두 실행후 ExecutorService 중단
           executor.shutdownNow();
 }

 

 

 

결론 

  • Executors 클래스를 사용하여 Thread Pool을 생성하고 스레드를 효율적으로 관리할 수 있다.
  • shutdown() 메소드는 Queue에 대기하고 있는 task까지는 중단시키지 않기 때문에 shutdownNow() 메소드를 사용해야 대기하고 있는 task까지 종료된다.

 

출처 

https://veneas.tistory.com/entry/Java-Executors-Thread-%EC%82%AC%EC%9A%A9%EB%B2%95

https://www.geeksforgeeks.org/what-is-java-executor-framework/

https://engkimbs.tistory.com/entry/Java-Library-Executor-Framework