(1) The multiple buffers in this experiment are not circular, nor do they require sequential access. The producer can put the product in an empty buffer at present.
(2) Consumers only consume products from designated producers.
(3) All production and consumption requirements are specified in the test case file. Only when the data of the * * * shared buffer meets all consumption requirements, the * * * shared buffer can be used as free space for new producers.
(4) In this experiment, the producers must be mutually exclusive when allocating buffers for producers, and then the specific production activities of each producer can be concurrent. Consumers only need mutual exclusion when they consume the same product, and at the same time, they need to judge whether the consumer has finished consuming and clear the product at the end of the consumption process.
entities used by p>Windows to achieve synchronization and mutual exclusion. In Windows, the common synchronization objects are: Semaphore,
Mutex, CriticalSection and Event. The first three are used in this program. The use of these objects is divided into three steps, one is to create or initialize: then request the synchronized object, and then enter the critical area, which corresponds to the
locking of mutex; Finally, the synchronization object is released, which corresponds to the unlocking of mutex. These synchronization objects are created in one thread and can be used in other threads
, thus realizing synchronization mutual exclusion. Of course, the method of using these synchronization objects to achieve synchronization between processes is similar.
1. Use lock operation primitives to achieve mutual exclusion
In order to solve the problem of process mutual exclusion into critical areas, a lock can be set for each type of critical areas, which has two states: open and closed, and the process executes critical area programs according to the following steps:
① Close the lock. Check the state of the lock first. If it is closed, wait for it to be opened. If it has been opened, close it and continue the operation in step ②.
② execute the critical area program.
③ unlock the lock. Open the lock and exit the critical area.
2. semaphore and WAIT, SIGNAL operation primitive
the initial value of semaphore can be determined by the system according to the resource situation and use needs. Under the initial condition, the pointer item of semaphore can be set to , indicating that the queue is empty. The value of semaphore can be changed during its use, but it can only be changed by WAIT, SIGNAL operation. Let the semaphore be S, the WAIT operation on S is marked as WAIT(S), and the SIGNAL operation on S is marked as SIGNAL(S).
WAIT(S): perform the following two actions in sequence:
① the value of the semaphore is reduced by 1, that is, s = s-1;
② if S≥, the process will continue;
if S(), set the state of the process to a blocking state, connect the corresponding WAITCB to the end of the semaphore queue, and give up the processor to wait (until other processes perform SIGNAL operation on s and release it).
SIGNAL(S): perform the following two actions in sequence
① The value of s is increased by 1, that is, s = s+1;
② if s) is , the process will continue to run;
if S(), the process corresponding to the first PCB on the semaphore queue (that is, the PCB pointed by the semaphore pointer item) is released (the blocked state is changed to the ready state), and the process executing the SIGNAL operation continues to run.
in the concrete implementation, it should be noted that the WAIT and SIGNAL operations should be implemented as a whole, and they are not allowed to be divided or interspersed with each other. In other words, WAIT and SIGNAL operations each seem to correspond to an instruction, which needs to be done continuously, otherwise it will cause confusion.
physically speaking, when the semaphore is s), the value of s indicates the number of available resources. Performing a WAIT operation means requesting the allocation of a unit resource, so the value of s is reduced by 1; When S< , it means that there are no available resources, and the requester must wait for other processes to release such resources before it can run. So it has to line up. Performing a SIGNAL operation means releasing a unit resource, so the value of s is increased by 1; If S(), it means that some processes are waiting for the resource, so to wake up the process at the head of the queue, the process that releases the resource can always run.
-
/* *
* producer
* */
public class producer implementers runnable {
private semaphore mutex, full, empty;
private Buffer buf;
String name;
public Producer(String name,Semaphore mutex,Semaphore full,Semaphore empty,Buffer buf){
this.mutex = mutex;
this.full = full;
this.empty = empty;
this.buf = buf;
this.name = name;
}
public void run(){
while(true){
empty.p();
mutex.p();
System.out.println(name+" inserts a new product into "+buf.nextEmptyIndex);
buf.nextEmptyIndex = (buf.nextEmptyIndex+1)%buf.size;
mutex.v();
full.v();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
-------
/*
* consumer
*
*/
public class customer implementers runnable {<
private Buffer buf;
String name;
public Customer(String name,Semaphore mutex,Semaphore full,Semaphore empty,Buffer buf){
this.mutex = mutex;
this.full = full;
this.empty = empty;
this.buf = buf;
this.name = name;
}
public void run(){
while(true){
full.p();
mutex.p();
System.out.println(name+" gets a product from "+buf.nextFullIndex);
buf.nextFullIndex = (buf.nextFullIndex+1)%buf.size;
mutex.v();
empty.v();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
-----------
/* *
* Buffer
*
*
this.nextFullIndex = nextFull;
this.size = size;
}
public int size;
public int nextEmptyIndex;
public int nextFullIndex;
}
----------
/*
* This class is used to simulate semaphores
*
*/
public class semaphore {
private int semvalue;
public Semaphore(int semValue){
this.semValue = semValue;
}
public synchronized void p(){
semValue--;
if(semValue< ){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public synchronized void v(){
semValue++;
if(semValue< =){
this.notify();
}
}
}
------------------------
public class Test extends Thread
{
public static void main(String[] args)
{
Buffer bf=new Buffer(1,,);
Semaphore mutex=new Semaphore(1);
Semaphore full=new Semaphore();
Semaphore empty=new Semaphore(1);
//new Thread(new Producer("p1",mutex,full,empty,bf)).start();
Producer p=new Producer("p1",mutex,full,empty,bf);
new Thread(new Producer("p2",mutex,full,empty,bf)).start();
new Thread(new Producer("p3",mutex,full,empty,bf)).start();
new Thread(new Producer("p4",mutex,full,empty,bf)).start();
new Thread(new Producer("p5",mutex,full,empty,bf)).start();
try{
sleep(3);
}
catch(Exception ex)
{
ex.printStackTrace();
}
new Thread(new Customer("c1",mutex,full,empty,bf)).start();
new Thread(new Customer("c2",mutex,full,empty,bf)).start();
new Thread(new Customer("c3",mutex,full,empty,bf)).start();
new Thread(new Customer("c4",mutex,full,empty,bf)).start();
new Thread(new Customer("c5",mutex,full,empty,bf)).start();
}
}
--------------------------------------------