利用BlockingQueue进行批量操作

    科技2024-07-06  67

    1 背景

    我曾接触一个项目,业务数据用Hbase存储,为了方便查询统计,采用elasticsearch作为查询库,但直接操作elasticsearch对性能有致命的影响。于是引用了rabbitmq + spring-stream来实现削峰操作。

    一开始,接收到一条消息后便将其插入到elasticsearch,但消费太慢,于是改用调用ES批量操作的接口,且用异步多线程的方式插入。

    2 方案

    用生产者消费者的方式实现,接收到消息后,立即将消息插入到本地消息队列。另启几个线程消费线程消费消息,消费线程在消息达到一定数量,或等待一定时间后,会执行插入操作。

    3 实现

    java.util.concurrent.BlockingQueue,因为其自身是线程安全类,所以是实现生产者消费者方案的首选,而且除了阻塞功能外,还自带有timeout功能。两种功能相互配合,可以实现很好的批量操作功能。

    以下是api文档中对BlockingQueue的描述。你可以看到三种类型的操作:insert,remove, examine。且每种都有在达到条件时采取的四种策略。

    Summary of BlockingQueue methods Throws exceptionSpecial valueBlocksTimes outInsertadd add(e)offer offer(e)put put(e)offer(Object, long, TimeUnit) offer(e, time, unit)Removeremove remove()poll poll()take take()poll(long, TimeUnit) poll(time, unit)Examineelement element()peek peek()not applicablenot applicable

    我在里需要用到的是 Remove 的Block与Timeout

    4 代码展示

    import jdk.nashorn.internal.ir.Block; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; public class TestBolckQueue { public static class Message{ private int id; private String name; public Message(int id, String name) { this.id = id; this.name = name; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return "Message{" + "id=" + id + ", name='" + name + '\'' + '}'; } } /** * 消息队列 */ public final static BlockingQueue<Message> queue = new LinkedBlockingDeque(100); /*** * 消息生产者 */ public static Runnable product = ()->{ int i =0; Random random = new Random(); while(true){ try { long sleep = random.nextInt(1000); System.out.println("睡眠:"+sleep+"豪秒"); Thread.sleep(sleep); queue.put(new Message(i++,"msg"+i)); } catch (InterruptedException e) { e.printStackTrace(); } } }; /** * 消费者 */ public static class Consumer implements Runnable{ /*** * 批量操作的最大条数 */ private static int MAX_BATCH_SIZE = 10; /** * 等徒时长 */ private static long MAX_WAIT_TIME = 700; private BlockingQueue<Message> queue; public Consumer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { try { loop(); } catch (InterruptedException e) { e.printStackTrace(); } } private void loop() throws InterruptedException { Message message ; List<Message> willSend ; while(true){ willSend = new ArrayList<>(10); message = this.queue.take(); //take() 是个阻塞操作,当队列无消息时,线程在此阻塞 willSend.add(message); int i = 0; while( ++i < MAX_BATCH_SIZE && (message = queue.poll(MAX_WAIT_TIME, TimeUnit.MILLISECONDS)) != null){ // poll是一个等待的操作,当等 willSend.add(message); // 待 MAX_WAIT_TIME 后未能获 } // 取消息,返回 null , 停止等待 send(willSend); } } protected void send(List<Message> willSend){ System.out.println("发送"+willSend.size()+"条消息"); willSend.forEach(System.out::println); System.out.println("=============发送完成========"); } } public static void main(String[] args) { new Thread(product).start(); new Thread(new Consumer(queue)).start(); } }

    Processed: 0.010, SQL: 9