/** * モニタを用いたパイプ処理を行うプログラム * $ java MonitorPipe で実行 */ import java.util.Random; /** * モニタを用いたパイプ処理の送信側および受信側実行クラスの * インスタンスを生成し実行するクラス * MonitorPipeSenderAndReceive クラスのインスタンスを2つ作り実行する */ class MonitorPipe { public static void main (String[] args) { int bufferSize = 4; // バッファサイズ /* モニタを生成する */ MonitorPipeSenderAndReceiver.newMonitor (bufferSize); /* スレッドを生成する */ Thread th_s = new MonitorPipeSenderAndReceiver (true); // 送信側スレッド Thread th_r = new MonitorPipeSenderAndReceiver (false); // 受信側スレッド /* スレッドの実行開始 */ th_s.start(); th_r.start(); } } /** * バッファを管理するモニタ */ class BufferMonitor extends Thread { private int nextReadIndex; // 次の読み出し位置 private int nextWriteIndex; // 次の書き込み位置 private int messageNum; // バッファ内のメッセージ数 private int bufferSize; // バッファサイズ private char[] buffer; // メッセージバッファ BufferMonitor (int s) { nextReadIndex = 0; nextWriteIndex = 0; messageNum = 0; bufferSize = s; buffer = new char[s]; } /** * バッファにメッセージを書き込む */ public void putMessage (char mes) { boolean waitResource = true; while (true) { synchronized (BufferMonitor.class) { /* この中には同時に1スレッドしか入れない */ if (messageNum < bufferSize) { ++messageNum; /* メッセージ数を増やす */ buffer[nextWriteIndex] = mes; /* バッファにメッセージを書き込む */ nextWriteIndex = (nextWriteIndex+1) % bufferSize; /* 次の書き込み位置を1つずらす */ return; /* 書き込めれば帰る */ } } pass (1000L); /* 書き込めなければ待つ */ } } /** * バッファからメッセージを読み出す */ public char getMessage() { boolean waitResource = true; while (true) { synchronized (BufferMonitor.class) { /* この中には同時に1スレッドしか入れない */ if (messageNum > 0) { --messageNum; /* メッセージ数を増やす */ char mes = buffer[nextReadIndex]; /* バッファからメッセージを読み出す */ nextReadIndex = (nextReadIndex+1) % bufferSize; /* 次の読み出し位置を1つずらす */ return mes; /* 読み出せれば帰る */ } } pass (1000L); /* 読み込めなければ待つ */ } } /** * 次の書き込み位置を得る * @return 次の書き込み位置 */ public int getNextWriteIndex () { return nextWriteIndex; } /** * 次の読み出し位置を得る * @return 次の読み出し位置 */ public int getNextReadIndex () { return nextReadIndex; } /** * 指定した時間待機する * @param latency 待機する時間(ミリ秒) */ void pass (long latency) { try { sleep (latency); // Thread のメソッド 指定したミリ秒の間一時停止する } catch (InterruptedException error_report) { System.out.println (error_report); System.exit (1); } } } /** * モニタを用いたパイプ処理の送信側および受信側実行クラス */ class MonitorPipeSenderAndReceiver extends Thread { static int bufferSize; // バッファサイズ static Random random = new Random(); // ランダム遅延時間発生用 static BufferMonitor monitor; // バッファモニタ boolean isSender; // 送信側か受信側が (true なら送信側) /** * モニタを生成する * @param bf バッファサイズ */ static void newMonitor (int bf) { bufferSize = bf; // バッファサイズ monitor = new BufferMonitor (bf); } /** * コンストラクタ * @param isSender 送信側か */ MonitorPipeSenderAndReceiver (boolean isSender) { /* 各フィールドに値設定 */ this.isSender = isSender; } /** * 送信側, 受信側それぞれのパイプ処理の実行 */ public void run () { if (isSender) { send(); } else { receive(); } } /** * 送信側スレッドのパイプ処理の実行 */ void send () { while (true) { char mes = createMessage(); // メッセージ作成 write (mes); // バッファに書き込み ncs(); } } /** * 受信側スレッドのパイプ処理の実行 */ void receive () { while (true) { read (); // バッファからの読み出し ncs(); } } /** * バッファへの書き込みを行ったことを表示する * @param char mes バッファに書き込むメッセージ */ void write (char mes) { int i = monitor.getNextWriteIndex(); monitor.putMessage (mes); System.out.println("b[" + i + "] <- '" + mes + "'"); cs(); } /** * バッファからの読み出しを行ったことを表示する * @return バッファから読み出したメッセージ */ char read() { int j = monitor.getNextReadIndex(); char mes = monitor.getMessage (); System.out.println("\t\tb[" + j + "] -> '" + mes + "'"); cs(); return mes; } /** * 'a'-'z'のランダムな文字を1つ返す * @return ランダムな文字 */ private char createMessage() { return (char) ('a'+random.nextInt(26)); } /** * 臨界領域 * latencyCS() で指定した時間待機する */ void cs() { pass (latencyCS()); } /** * 非臨界領域 * latencyNCS() で指定した時間待機する */ void ncs() { pass (latencyNCS()); } /** * 臨界領域の遅延時間をランダムに設定する * @return 1000未満のランダムな値 */ long latencyCS() { return (long) random.nextInt(1000); } /** * 非臨界領域の遅延時間をランダムに設定する * @return 5000未満のランダムな値 */ long latencyNCS() { return (long) random.nextInt(2000); } /** * 指定した時間待機する * @param latency 待機する時間(ミリ秒) */ void pass (long latency) { try { sleep (latency); // Threadのメソッド 指定したミリ秒の間一時停止する } catch (InterruptedException error_report) { System.out.println (error_report); System.exit (1); } } }