methaneのブログ

このブログに乗せているサンプルコードはすべてNYSLです。

マルチスレッドプログラミング

なんと、MIDPにはVectorとStackがあるのに、Queueが無い。マルチスレッドプログラミングでは良く使うコマンドキューを作るために、↓をJavaでスレッドセーフに実装してみた。
http://d.hatena.ne.jp/methane/20060419/1145460834

組み込みのマルチスレッド以外でマルチスレッドプログラミングって経験が少ないんだけど、同期の仕方やテストの仕方はこんな感じで良いのかな?スレッドセーフのテストやり方が判らん。
notifyAll()は、notify()にするとpushしたときにpop待ちスレッドじゃなくてpush待ちスレッドを起動してしまう可能性があるからダメなんだけど、待ちスレッドがたくさんいるときには無駄だよな。内部でロック用のオブジェクトを2つ作って別々にnotify()する?うー、popがpush用のロックオブジェクトにnotify()しないといけないんだけど、notify()するにはロックしないといけなくて、そーするとpushがpopオブジェクトをロックしようとしたときにデッドロックの危険ガガガ。
なんでnotify()するのにそのオブジェクトのロック取得が必要なんだろう?Javaのスレッドの概念がまだ理解できてない。

/** スレッドセーフな固定長キュー.
 * スレッド間でコマンドキューとして使うことを想定した。
 */
final class CommandQueue {
	/** コンストラクタ.
	 * @param maxSize	この数だけQueueに入れられる。超えたらpushが待たされる.
	 */
	CommandQueue(int maxSize) {
		queue = new Object[maxSize];
	}
	
	int getSize() { return size; }
	int getMaxSize() { return queue.length; }

	/** Queueに突っ込む.
	 * 
	 * Queueがいっぱいだったら、空くまで待つ.
	 * @param o
	 * @return 突っ込めたかどうか。スレッドに割り込みが来ない限りはTrue.
	 */
	synchronized boolean push(Object o) {
		try {
			while (!(size < queue.length)) wait();
			queue[(head + size) % queue.length] = o;
			size++;
			notifyAll(); // pop()待ちがいたらpop()させる.
			return true;
		} catch (InterruptedException e) {
			return false;
		}
	}

	/** Queueが空だったらwaitするpop.
	 * @return popしたもの。pop待ちで割り込みがきた場合はnull.
	 */
	synchronized Object pop() {
		try {
			while (!(size > 0)) wait();
			return popInner();
		} catch (InterruptedException e) {
			return null;
		}
	}
	
	/** Queueが空だったらnullを返すpop.
	 * @return popしたもの。空だったらnull.
	 */
	synchronized Object popNoWait() {
		if (size <= 0) return null;
		return popInner();
	}
	
	/** Queueに物が入ってて、ロックを取得しているという前提で、Queueからpopする動作.
	 * #pop() と #pop_nowait() で共通の内部動作.
	 */
	private Object popInner() {
		Object o = queue[head];
		queue[head] = null;
		head = (head + 1) % queue.length;
		size--;
		notifyAll(); //push待ちがいたらpushさせる.
		return o;
	}

	/** 内部リングバッファ */
	private Object[] queue;
	/** 次にpopする位置 head < maxSize */
	private int head = 0;
	/** キューされている数. size < maxSize */
	private int size = 0;
}

んで、JUnit4使ったテストがコレ

import static org.junit.Assert.*;
import static java.lang.Thread.*;

import org.junit.Before;
import org.junit.Test;

public class CommandQueueTest {

	CommandQueue q;
	
	@Before
	public void setUp() {
		q = new CommandQueue(5);
	}

	@Test
	public void testGetSize() {
		assertEquals(0, q.getSize());
		assertEquals(5, q.getMaxSize());
		q.push(new Integer(1));
		assertEquals(1, q.getSize());
		q.pop();
		assertEquals(0, q.getSize());
	}

	@Test
	public void testPush() throws InterruptedException {
		q.push(Integer.valueOf(1));
		q.push(Integer.valueOf(2));
		q.push(Integer.valueOf(3));
		q.push(Integer.valueOf(4));
		q.push(Integer.valueOf(5));
		assertEquals(5, q.getSize());

		Pussher p = new Pussher(1);
		Thread th = new Thread(p);
		th.start();
		waitThreadState(th, State.WAITING);
		assertEquals(5, q.getSize());
		assertEquals(Integer.valueOf(1), q.pop());
		waitThreadState(th, State.TERMINATED);
		assertEquals(5, q.getSize());
	}
	
	/** PushスレッドとPopスレッドを使って、非同期にPush/Popを行うテスト.
	 */
	@Test
	public void testPushPop() throws InterruptedException {
		final int NUM_LOOP = 100000;
		q = new CommandQueue(3);
		Pussher pusher = new Pussher(NUM_LOOP);
		Popper popper = new Popper(NUM_LOOP);
		
		Thread pushThread = new Thread(pusher);
		Thread popThread = new Thread(popper);
		
		pushThread.start();
		popThread.start();
		
		waitThreadState(popThread, State.TERMINATED);
		
		for (int i = 0; i < NUM_LOOP; ++i) {
			assertEquals(Integer.valueOf(i), popper.values[i]);
		}
	}

	@Test
	public void testPop() throws InterruptedException {
		q = new CommandQueue(5);
		q.push(Integer.valueOf(7));
		assertEquals(Integer.valueOf(7), q.pop());

		Popper p = new Popper(1);
		Thread t = new Thread(p);
		t.start();

		// Test fails for time out when thread didn't go to expected state.
		waitThreadState(t, State.WAITING);

		q.push(Integer.valueOf(13));

		waitThreadState(t, State.TERMINATED);

		assertEquals(Integer.valueOf(13), p.values[0]);
	}

	@Test
	public void testPopNoWait() {
		assertEquals(null, q.popNoWait());
		
		q.push(Integer.valueOf(17));
		assertEquals(Integer.valueOf(17), q.popNoWait());
	}

	private static void waitThreadState(Thread th, Thread.State waitFor) throws InterruptedException {
		while (th.getState() != waitFor) {
			Thread.sleep(1);
		}
	}

	class Popper implements Runnable {
		Popper(int count) {
			this.count = count;
			values = new Object[count];
		}

		public void run() {
			for (int i = 0; i < count; ++i) {
				values[i] = q.pop();
			}
			done = true;
		}

		private final int count;
		public Object[] values;
		public volatile boolean done = false;
	}

	class Pussher implements Runnable {
		public Pussher(int count) {
			this.count = count;
		}

		public void run() {
			for (int i = 0; i < count; ++i) {
				q.push(Integer.valueOf(i));
			}
			done = true;
		}

		private int count;
		public volatile boolean done = false;
	}
}

Generics使ってないのは、このCommandQueueを携帯に入れて、携帯でマルチスレッドプログラムがマトモに動くか調べるのに使うから。