不知道符不符合你的要求
public class Test {
public static void main(String[] args){
BlockingQueue<Integer> que1 = new LinkedBlockingQueue<>();
BlockingQueue<MyNumber> que2 = new LinkedBlockingQueue<>();
ProducerTask A =new ProducerTask(que1);
ConsumerIntegerProduceMyNumber B =new ConsumerIntegerProduceMyNumber(que1,que2);
ConsumerIntegerProduceMyNumber C =new ConsumerIntegerProduceMyNumber(que1,que2);
ProduceMyNumber D = new ProduceMyNumber(que2);
ExecutorService executor = Executors.newFixedThreadPool(4);
executor.execute(A);
executor.execute(B);
executor.execute(C);
executor.execute(D);
executor.shutdown();
try {
executor.awaitTermination(Long.MAX_VALUE,TimeUnit.HOURS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class MyNumber{
public Integer num;
public boolean isBig;
public MyNumber(Integer num,boolean isBig){
this.num = num;
this.isBig = isBig;
}
}
class ProducerTask implements Runnable{
private BlockingQueue<Integer> que1;
public ProducerTask(BlockingQueue<Integer> que1){
this.que1 = que1;
}
@Override
public void run() {
Random r =new Random();
int k = r.nextInt(100)+100;
System.out.println(k);
while(k-->0){
int time =r.nextInt(20);
try {
Thread.sleep(time);
que1.put(r.nextInt(Integer.MAX_VALUE));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
que1.put(-1);
que1.put(-1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class ConsumerIntegerProduceMyNumber implements Runnable{
private BlockingQueue<Integer> que1;
private BlockingQueue<MyNumber> que2;
public ConsumerIntegerProduceMyNumber(BlockingQueue<Integer> que1, BlockingQueue<MyNumber> que2) {
this.que1=que1;
this.que2 =que2;
}
@Override
public void run() {
int x=0 ;
Random r =new Random();
while(x>=0){
int time =r.nextInt(50);
try {
Thread.sleep(time);
x=que1.take();
if(x<0)break;
int putTime1=r.nextInt(100);
int putTime2 =r.nextInt(100);
if(putTime1>putTime2){
Thread.sleep(putTime2);
que2.put(new MyNumber(x+1,true));
Thread.sleep(putTime1-putTime2);
que2.put(new MyNumber(x,false));
}
else if(putTime1<putTime2){
Thread.sleep(putTime1);
que2.put(new MyNumber(x,false));
Thread.sleep(putTime2-putTime1);
que2.put(new MyNumber(x+1,true));
}
else{
Thread.sleep(putTime1);
que2.put(new MyNumber(x,false));
que2.put(new MyNumber(x+1,true));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
que2.put(new MyNumber(-1,false));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class ProduceMyNumber implements Runnable{
private BlockingQueue<MyNumber> que2;
public ProduceMyNumber(BlockingQueue<MyNumber> que2) {
this.que2 = que2;
}
@Override
public void run() {
MyNumber num =new MyNumber(0,false);
Set<Integer> set = new HashSet<>();
int count=0;
int nThread=2;
while(nThread>0){
try {
num = que2.take();
if(num.num<0){
nThread--;
continue;
}
if(num.isBig){
if(set.contains(num.num-1)){
System.out.println(num.num-1);
System.out.println(num.num);
count+=2;
}
else
set.add(num.num);
}
else{
if(set.contains(num.num+1)){
System.out.println(num.num);
System.out.println(num.num+1);
count+=2;
}
else
set.add(num.num);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("总计:"+count);
}
}