import
java.util.Random;
import
java.util.concurrent.BlockingQueue;
import
java.util.concurrent.TimeUnit;
/**
* 消费者线程
*
* @author jackyuj
*/
public
class
Consumer
implements
Runnable {
public
Consumer(BlockingQueue<String> queue) {
this
.queue = queue;
}
public
void
run() {
System.out.println(
"启动消费者线程!"
);
Random r =
new
Random();
boolean
isRunning =
true
;
try
{
while
(isRunning) {
System.out.println(
"正从队列获取数据..."
);
String data = queue.poll(
2
, TimeUnit.SECONDS);
if
(
null
!= data) {
System.out.println(
"拿到数据:"
+ data);
System.out.println(
"正在消费数据:"
+ data);
Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
}
else
{
// 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。
isRunning =
false
;
}
}
}
catch
(InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
finally
{
System.out.println(
"退出消费者线程!"
);
}
}
private
BlockingQueue<String> queue;
private
static
final
int
DEFAULT_RANGE_FOR_SLEEP =
1000
;
}
import
java.util.Random;
import
java.util.concurrent.BlockingQueue;
import
java.util.concurrent.TimeUnit;
import
java.util.concurrent.atomic.AtomicInteger;
/**
* 生产者线程
*
* @author jackyuj
*/
public
class
Producer
implements
Runnable {
public
Producer(BlockingQueue queue) {
this
.queue = queue;
}
public
void
run() {
String data =
null
;
Random r =
new
Random();
System.out.println(
"启动生产者线程!"
);
try
{
while
(isRunning) {
System.out.println(
"正在生产数据..."
);
Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
data =
"data:"
+ count.incrementAndGet();
System.out.println(
"将数据:"
+ data +
"放入队列..."
);
if
(!queue.offer(data,
2
, TimeUnit.SECONDS)) {
System.out.println(
"放入数据失败:"
+ data);
}
}
}
catch
(InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
finally
{
System.out.println(
"退出生产者线程!"
);
}
}
public
void
stop() {
isRunning =
false
;
}
private
volatile
boolean
isRunning =
true
;
private
BlockingQueue queue;
private
static
AtomicInteger count =
new
AtomicInteger();
private
static
final
int
DEFAULT_RANGE_FOR_SLEEP =
1000
;
}
联系客服