오늘은 어디로 갈까...

멀티 쓰레드(Multi Thread) 프로그래밍 본문

낙서

멀티 쓰레드(Multi Thread) 프로그래밍

剛宇 2009. 3. 24. 07:33


멀티 쓰레드(Multi Thread)란 프로세스(Process)안에서 여러개의 쓰레드가 동시에 동작하는것을 말한다.
자바 1.5부터 java.util.concurrent 패키지가 추가됨으로서 멀티 쓰레드 프로그래밍을 아주 손쉽게 할 수 있다.
(아득히 먼(?) 옛날에는 멀티 쓰레드를 잘(?) 사용하기 위해서 ThreadPool, BlockingQueue도 직접 만들어 사용하는등 많은 삽질이 필요했는데, 지금은 너무나 간단하게 구현이 가능하다. 물론 Thread에 대한 기본 지식은 가지고 있어야한다.)
 자바를 실행하면 main 쓰레드라 불리우는 놈이 해당 객체의 main(String[]) 메소드를 실행해준다. 즉, 프로그램이 실행되면, 최소한 한 개 이상의 쓰레드 동작중인것이다.
 
1. Thread 생성 및 실행
  - Thread 생생은 java.lang.Runnable 인터페이스를 구현하거나, java.lang.Thread를 상속받는 방법이 있다.
  - 이 생성한 Thread를 start() 메소드를 이용해서 작동시킨다. 이 메소드가 호출되면 자동으로 run()메소드가 실행이 되는것이다. 그래서 어떤 작업을 하고 싶으면, run() 메소드 안에 해당 작업을 구현해주어야하는것이다.

package test.thread;

public class SimpleThread extends Thread {

	public void run() {
		for (int i = 0; i < 10; i++) {
			System.out.printf("[%s] %d번째 실행입니다.%n", Thread.currentThread().getName(), i);
		}
	}
	
	public static void main(String[] args) throws InterruptedException {
		Thread thread0 = new SimpleThread();
		Thread thread1 = new SimpleThread();
		thread0.start();
		thread1.start();
	}
}

package test.thread;

public class SimpleRunnable implements Runnable {

	public void run() {
		for (int i = 0; i < 10; i++) {
			System.out.printf("[%s] %d번째 실행입니다.%n", Thread.currentThread().getName(), i);
		}
	}
	
	public static void main(String[] args) throws InterruptedException {
		Thread thread0 = new Thread(new SimpleRunnable());
		Thread thread1 = new Thread(new SimpleRunnable());
		thread0.start();
		thread1.start();
	}
}


2. 동기화
 - 기본적인 개념은 "volatile 에 대한 단상"(http://blog.kangwoo.kr/43)을 참고 바란다.
 - java.util.concurrent.Semaphore 클래스를 이용하면 자원 사용을 제어할 수있다. 즉, 사용할 수 있는 자원이 최대 N개인데, N개보다 많은 수의 쓰레드가 그 자원을 필요로 할 경우 제어할 수 있는것이다.
   (http://java.sun.com/javase/6/docs/api/java/util/concurrent/Semaphore.html)
 - java.util.concurrent.locks.ReentrantLock : 읽기/쓰기 락을 제어할 수 있다.
   (http://java.sun.com/javase/6/docs/api/java/util/concurrent/locks/ReentrantReadWriteLock.html)



3. Executor를 이용한 Thread 관리
  - 보통 자원의 효율적 이용을 위해서 Thread Pool과 Queue를 만들어서 사용하는데, 자바 1.5에서는 기본적으로 지원한다. java.util.concurrent.Executors을 이용하면 너무나 쉽게 사용할 수 있다.
   + java.util.concurrent.TimeUnit : 시간 단위를 나타낸다. DAYS, HOURS, MICROSECONDS, MILLISECONDS, MINUTES, NANOSECONDS, SECONDS 가 있다.
   + java.util.concurrent.Executor : 처리할 작업을 실행할 수 있는 기능을 정의하고 있다.
     + execute(Runnable) : 해당 작업을 실행한다. 쓰레드 풀이 구현된경우에는 전달받은 작업을 큐(Queue)에 넣은 후, 가용 쓰레드가 존재할 경우 작업을 실행한다.
   + java.util.concurrent.ExecutorService : Executor의 하위 인터페이스로서 생명주기(shutdown(), shutdownNow()를 관리할 수 있는 기능과 Callable을 사용할수 있는 기능을 정의하고 있다.
     + shutdown() : 중지(shutdown) 명령을 내린다. 대기중인 작업은 실행되지만, 새로운 작업은 추가되지 않는다.
     + List<Runnable> shutdownNo() : 현재 실행중인 모든 작업 및 대기중인 작업 모두를 중지시킨다. 대기중인 작업 목록을 반환한다.
     + boolean isShutdown() : Executor가 중지(shutdown)가 되었는지 여부를 판단한다.
     + boolean isTerminated() : 모든 작업이 종료되었는지 여부를 판단한다.
     + boolean awaitTermination(long, TimeUnit) : 중지(shutdown) 명령을 내린후, 지정한 시간 동안 모든 작업이 종료될때까지 대기한다. 지정한 시간이내에 작업이 종료되면 true, 아니면 false를 반환한다.
     + <T> Future<T> submit(Callable<T> task) : 결과값을 반환할수 있는 Callable 작업을 실행한다. (Callable에 대해서는 뒤에서 다루겠다.)
   + java.util.concurrent.ScheduledExecutorService : ExecutorService의 하위 인터페이스로서 스케줄에 따라 작업을 실행할 수 있는 기능을 정의하고 있다.
     + ScheduledFuture<?> schedule(Runnable, long, TimeUnit)  : 지정한 시간 이후에 작업을 실행한다.
  
   + java.util.concurrent.Executors : 자바 1.5에서 기본적으로 제공하는 Executor 구현체를 생성할 수 있는 메소드를 제공하는 유틸리티 클래스이다.
     + ExecutorService newFixedThreadPool(int) : 지정한 갯수만큼 쓰레드를 가질 수 있는 쓰레들 풀을 생성한다. (ThreadPoolExecutor)
     + ExecutorService newCachedThreadPool() : 재사용이 가능한 쓰레드 풀을 생성한다. (ThreadPoolExecutor)
     + ExecutorService newSingleThreadExecutor() : 단일 쓰레드만을 사용하는 ExecutorService 를 생성한다.
     + ScheduledExecutorService newScheduledThreadPool(int) : 스케줄 가능한 쓰레드 풀을 생성한다. (ScheduledThreadPoolExecutor)
     + ScheduledExecutorService newSingleThreadScheduledExecutor() : 단일 쓰레드만을 사용하는 스케줄 가능한 ExecutorService 를 생성한다.

   + java.util.concurrent.ThreadPoolExecutor
   + java.util.concurrent.ScheduledThreadPoolExecutor


 - Executors 클래스를 가지고 간단한 멀티 쓰레드 프로그램을 만들어보자.
package test.thread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MultiThread1 implements Runnable {
	
	private int total;
	

	public void run() {
		for (int i = 1; i <= 20000; i++) {
			total += i;
		}
		System.out.printf("[%s] 1에서 20000까지의 총 합은 %d입니다.%n", Thread.currentThread().getName(), total);
	}
	
	public static void main(String[] args) throws InterruptedException {
		ExecutorService executorService = Executors.newFixedThreadPool(3);
		executorService.execute(new MultiThread1());
		executorService.execute(new MultiThread1());
		executorService.execute(new MultiThread1());
		executorService.execute(new MultiThread1());
		executorService.execute(new MultiThread1());
		executorService.execute(new MultiThread1());
		executorService.shutdown();
	}
}

* 출력 결과
[pool-1-thread-1] 1에서 20000까지의 총 합은 200010000입니다.
[pool-1-thread-2] 1에서 20000까지의 총 합은 200010000입니다.
[pool-1-thread-3] 1에서 20000까지의 총 합은 200010000입니다.
[pool-1-thread-1] 1에서 20000까지의 총 합은 200010000입니다.
[pool-1-thread-2] 1에서 20000까지의 총 합은 200010000입니다.
[pool-1-thread-3] 1에서 20000까지의 총 합은 200010000입니다.


 - 뭐 이상없이 잘 돌아 갈것이다. ^^;(이 얼마나 심플한가~ 예술이다. T_T)

 - 참고로, 해당 데몬(daemon)을 종료하게 되면, 더 이상을 요청을 받아서는 안된다. 그리고 기존에 받은 요청은 지정한 시간내에 처리해야할것이다. 이럴경우 아래처럼 처리할 수 있다. (출처:JDK API Doc)

 // From JDK API Document
 void shutdownAndAwaitTermination(ExecutorService pool) {
   pool.shutdown(); // Disable new tasks from being submitted
   try {
     // Wait a while for existing tasks to terminate
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
     }
   } catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
   }
 }

 - 위의 멀티 쓰레드 소스를 아래처럼 바꿔보자. Runnable 인스턴스를 하나만 생성한다음, 다중 쓰레드로 실행하는것이다.
package test.thread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MultiThread2 implements Runnable {
	
	private int total;
	
	public void run() {
		for (int i = 1; i <= 20000; i++) {
			total += i;
		}
		System.out.printf("[%s] 1에서 20000까지의 총 합은 %d입니다.%n", Thread.currentThread().getName(), total);
	}
	
	public static void main(String[] args) throws InterruptedException {
		ExecutorService executorService = Executors.newFixedThreadPool(3);
		MultiThread2 runnable = new MultiThread2();
		executorService.execute(runnable);
		executorService.execute(runnable);
		executorService.execute(runnable);
		executorService.execute(runnable);
		executorService.execute(runnable);
		executorService.execute(runnable);
		executorService.shutdown();
	}
}

실행할때마다 엉뚱한결과가 나올것이다.(컴퓨터 성능이 너무 좋으면 정상적인 결과가 나올수도 있다 --;)
그 이유는 total 이라는 int 변수가 각 쓰레드에서 공유(?)되어 사용되기 때문이다. 이럴경우 조심해서 사용해야한다.
(많이 사용하는 웹어플리케이션의 Servlet이 실행되는 구조와 동일하다. 즉, 다중의 쓰레드들이 Servlet 인터페이스의 doService() 메소드를 호출하여 실행하는것이다. 그래서 해당 Servlet에 멤버변수를 선언해서 잘못(?) 사용할 경우 엉뚱한 결과가 나올 수도 있는것이다. 혼자서 테스트 할때는 잘 되는데, 사용자가 동시접속하면 엉뚱한 결과가 나온다던지 하는 재밌는 상황이 연출되는것이다.)



4. Callable, Future
 - Runnable 인퍼페이스는 처리 결과 값을 반환해줄 수가 없다. (뭐 show~를 하면 받을 수 있긴하다.) 이런 단점을 보완하기 위해서 추가된것은 java.util.concurrent.Callable 인터페이스 이다.

public interface Callable {
    // 작업을 실행시킨후 결과 값을 반환한다.
    V call() throws Exception;
}

 - Callable 인터페이스의 call() 메소드는 결과값을 반환하도록 되어있다.  그런데 call() 메소드를 이용해서 결과값을 반환받을때까지 기다린다면, 그건 있으나마나하다. 그래서 존재하는 것이 Future 인터페이스이다.


public interface Future {
    // 작업을 취소한다.
    boolean cancel(boolean mayInterruptIfRunning);

    // 작업이 취소되었는지 여부를 판단한다.
    boolean isCancelled();

    // 작업이 정상적으로 완료되었는지 여부를 판단한다.
    boolean isDone();

    // 작업 결과를 반환한다. 작업이 실행중이면 완료될때까지 블럭킹된다.
    V get() throws InterruptedException, ExecutionException;

    // 지정한 시간동안 작업 결과를 기다린다. 지정시간내에 작업이 완료되면 결과를 반환하고, 시간이 초과하면 TimeoutException을 발생시킨다.
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

- Callabe로 한번 구현해보자
package test.thread;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class MultiThread3 implements Callable {
	
	private int total;
	
	public Integer call() throws Exception {
		for (int i = 1; i <= 20000; i++) {
			total += i;
		}
		
		return total;
	}

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		ExecutorService executorService = Executors.newFixedThreadPool(3);
		Future f1 = executorService.submit(new MultiThread3());
		Future f2 = executorService.submit(new MultiThread3());
		Future f3 = executorService.submit(new MultiThread3());
		Future f4 = executorService.submit(new MultiThread3());
		Future f5 = executorService.submit(new MultiThread3());
		Future f6 = executorService.submit(new MultiThread3());
		System.out.printf("[%s] 1에서 20000까지의 총 합은 %d입니다.%n", "f1", f1.get());
		System.out.printf("[%s] 1에서 20000까지의 총 합은 %d입니다.%n", "f2", f2.get());
		System.out.printf("[%s] 1에서 20000까지의 총 합은 %d입니다.%n", "f3", f3.get());
		System.out.printf("[%s] 1에서 20000까지의 총 합은 %d입니다.%n", "f4", f4.get());
		System.out.printf("[%s] 1에서 20000까지의 총 합은 %d입니다.%n", "f5", f5.get());
		System.out.printf("[%s] 1에서 20000까지의 총 합은 %d입니다.%n", "f6", f6.get());
		executorService.shutdown();
	}
}

정말 구현하기 쉬워졌다.. 역시 세상은 오래살고 볼일인가.. ^^; 하지만, 기반기술에서 멀어저가는 나의 모습을 볼때마다 한숨만 느는것은 무엇때문일까...

* 참고 :
  http://java.sun.com/docs/books/tutorial/essential/concurrency/
  http://java.sun.com/javase/6/docs/api/java/util/concurrent/Executor.html

-------------------------------------------------------------
시간 단위

yoctoseconds : 1/1,000,000,000,000,000,000,000,000 
zeptoseconds : 1/1,000,000,000,000,000,000,000 
attoseconds  : 1/1,000,000,000,000,000,000 
femtoseconds : 1/1,000,000,000,000,000 
picoseconds  : 1/1,000,000,000,000 
nanoseconds  : 1/1,000,000,000 
microseconds : 1/1,000,000 
milliseconds : 1/1,000 
centiseconds : 1/100 
second       : 1 
killoseconds : 1,000 
megaseconds  : 1,000,000 
gigaseconds  : 1,000,000,000 
teraseconds  : 1,000,000,000,000 
petaseconds  : 1,000,000,000,000,000
exaseconds   : 1,000,000,000,000,000,000