1 背景
我曾接触一个项目,业务数据用Hbase存储,为了方便查询统计,采用elasticsearch作为查询库,但直接操作elasticsearch对性能有致命的影响。于是引用了rabbitmq + spring-stream来实现削峰操作。
一开始,接收到一条消息后便将其插入到elasticsearch,但消费太慢,于是改用调用ES批量操作的接口,且用异步多线程的方式插入。
2 方案
用生产者消费者的方式实现,接收到消息后,立即将消息插入到本地消息队列。另启几个线程消费线程消费消息,消费线程在消息达到一定数量,或等待一定时间后,会执行插入操作。
3 实现
java.util.concurrent.BlockingQueue,因为其自身是线程安全类,所以是实现生产者消费者方案的首选,而且除了阻塞功能外,还自带有timeout功能。两种功能相互配合,可以实现很好的批量操作功能。
以下是api文档中对BlockingQueue的描述。你可以看到三种类型的操作:insert,remove, examine。且每种都有在达到条件时采取的四种策略。
Summary of BlockingQueue methods
Throws exceptionSpecial valueBlocksTimes outInsertadd add(e)offer offer(e)put put(e)offer(Object, long, TimeUnit) offer(e, time, unit)Removeremove remove()poll poll()take take()poll(long, TimeUnit) poll(time, unit)Examineelement element()peek peek()not applicablenot applicable
我在里需要用到的是 Remove 的Block与Timeout
4 代码展示
import jdk
.nashorn
.internal
.ir
.Block
;
import java
.util
.ArrayList
;
import java
.util
.List
;
import java
.util
.Random
;
import java
.util
.concurrent
.BlockingQueue
;
import java
.util
.concurrent
.LinkedBlockingDeque
;
import java
.util
.concurrent
.TimeUnit
;
public class TestBolckQueue {
public static class Message{
private int id
;
private String name
;
public Message(int id
, String name
) {
this.id
= id
;
this.name
= name
;
}
public int getId() {
return id
;
}
public void setId(int id
) {
this.id
= id
;
}
public String
getName() {
return name
;
}
public void setName(String name
) {
this.name
= name
;
}
@Override
public String
toString() {
return "Message{" +
"id=" + id
+
", name='" + name
+ '\'' +
'}';
}
}
public final static BlockingQueue
<Message> queue
= new LinkedBlockingDeque(100);
public static Runnable product
= ()->{
int i
=0;
Random random
= new Random();
while(true){
try {
long sleep
= random
.nextInt(1000);
System
.out
.println("睡眠:"+sleep
+"豪秒");
Thread
.sleep(sleep
);
queue
.put(new Message(i
++,"msg"+i
));
} catch (InterruptedException e
) {
e
.printStackTrace();
}
}
};
public static class Consumer implements Runnable{
private static int MAX_BATCH_SIZE
= 10;
private static long MAX_WAIT_TIME
= 700;
private BlockingQueue
<Message> queue
;
public Consumer(BlockingQueue queue
) {
this.queue
= queue
;
}
@Override
public void run() {
try {
loop();
} catch (InterruptedException e
) {
e
.printStackTrace();
}
}
private void loop() throws InterruptedException
{
Message message
;
List
<Message> willSend
;
while(true){
willSend
= new ArrayList<>(10);
message
= this.queue
.take();
willSend
.add(message
);
int i
= 0;
while( ++i
< MAX_BATCH_SIZE
&&
(message
= queue
.poll(MAX_WAIT_TIME
, TimeUnit
.MILLISECONDS
)) != null
){
willSend
.add(message
);
}
send(willSend
);
}
}
protected void send(List
<Message> willSend
){
System
.out
.println("发送"+willSend
.size()+"条消息");
willSend
.forEach(System
.out
::println
);
System
.out
.println("=============发送完成========");
}
}
public static void main(String
[] args
) {
new Thread(product
).start();
new Thread(new Consumer(queue
)).start();
}
}