/** * セマフォを用いたパイプ処理を行うプログラム * $ java SemaphorePipe で実行 */ import java.util.concurrent.Semaphore; import java.util.Random; /** * セマフォを用いたパイプ処理の送信側および受信側実行クラスの * インスタンスを生成し実行するクラス * SemaphorePipeSenderAndReceive クラスのインスタンスを2つ作り実行する */ class SemaphorePipe { public static void main (String[] args) { int bufferSize = 4; // バッファサイズ /* セマフォを生成する */ SemaphorePipeSenderAndReceiver.newSemaphore (bufferSize); /* スレッドを生成する */ Thread th_s = new SemaphorePipeSenderAndReceiver (true); // 送信側スレッド Thread th_r = new SemaphorePipeSenderAndReceiver (false); // 受信側スレッド /* スレッドの実行開始 */ th_s.start(); th_r.start(); } } /** * セマフォを用いたパイプ処理の送信側および受信側実行クラス */ class SemaphorePipeSenderAndReceiver extends Thread { static int bufferSize; // バッファサイズ static Semaphore s; // バッファの空き領域数 static Semaphore m; // バッファ中のメッセージ数 static Random random = new Random(); // ランダム遅延時間発生用 static char buffer[]; // メッセージバッファ boolean isSender; // 送信側か受信側が (true なら送信側) /** * セマフォを生成する * @param bf バッファサイズ */ static void newSemaphore (int bf) { bufferSize = bf; // バッファサイズ buffer = new char[bf]; s = new Semaphore (bufferSize); // バッファの空き領域数 m = new Semaphore (0); // バッファ中のメッセージ数 } /** * コンストラクタ * @param isSender 送信側か受信側が (true なら送信側) */ SemaphorePipeSenderAndReceiver (boolean isSender) { /* 各フィールドに値設定 */ this.isSender = isSender; } /** * 送信側, 受信側それぞれのパイプ処理の実行 */ public void run () { if (isSender) { send(); } else { receive(); } } /** * 送信側スレッドのパイプ処理の実行 */ void send () { int nextWriteIndex=0; // 次に書き込むバッファの位置 while (true) { char mes = createMessage(); // メッセージ作成 wait (s); // バッファの空き領域数が1以上になるまで待つ write (nextWriteIndex, mes); // バッファへの書き込み signal (m); // バッファ内のメッセージ数を1増やす nextWriteIndex = (nextWriteIndex+1) % bufferSize; // 書き込み位置を1つ後ろ(末尾まで行けば先頭)に ncs(); } } /** * 受信側スレッドのパイプ処理の実行 */ void receive () { int nextReadIndex=0; // 次に読み出すバッファの位置 while (true) { wait (m); // バッファ内のメッセージ数が1以上になるまで待つ read (nextReadIndex); // バッファからの読み出し signal (s); // バッファの空き領域数を1増やす nextReadIndex = (nextReadIndex+1) % bufferSize; // 読み込み位置を1つ後ろ(末尾まで行けば先頭)に ncs(); } } /** * バッファへの書き込みを行ったことを表示する * @param i バッファへの書き込み位置 * @param char mes バッファに書き込むメッセージ */ void write (int i, char mes) { buffer[i] = mes; System.out.println("b[" + i + "] <- '" + mes + "'"); cs(); } /** * バッファからの読み出しを行ったことを表示する * @param j バッファからの読み出し位置 * @return バッファから読み出したメッセージ */ char read (int j) { char mes = buffer[j]; System.out.println("\t\tb[" + j + "] -> '" + mes + "'"); cs(); return mes; } /** * 'a'-'z'のランダムな文字を1つ返す * @return ランダムな文字 */ private char createMessage() { return (char) ('a'+random.nextInt(26)); } /** * セマフォに対して wait 命令を出す * @param semaphore セマフォ */ void wait (Semaphore semaphore) { try { semaphore.acquire(); // セマフォへの wait 命令 } catch (InterruptedException error_report) { System.out.println (error_report); System.exit (1); } } /** * セマフォに対して signal 命令を出す * @param semaphore セマフォ */ void signal (Semaphore semaphore) { semaphore.release(); // セマフォへの signal 命令 } /** * 臨界領域 * latencyCS() で指定した時間待機する */ void cs() { pass (latencyCS()); } /** * 非臨界領域 * latencyNCS() で指定した時間待機する */ void ncs() { pass (latencyNCS()); } /** * 臨界領域の遅延時間をランダムに設定する * @return 1000未満のランダムな値 */ long latencyCS() { return (long) random.nextInt(1000); } /** * 非臨界領域の遅延時間をランダムに設定する * @return 5000未満のランダムな値 */ long latencyNCS() { return (long) random.nextInt(2000); } /** * 指定した時間待機する * @param latency 待機する時間(ミリ秒) */ void pass (long latency) { try { sleep (latency); // Threadのメソッド 指定したミリ秒の間一時停止する } catch (InterruptedException error_report) { System.out.println (error_report); System.exit (1); } } }