【分布式】 09 消息队列 简单实现

    科技2022-08-23  100

    一、处理中心类

    package com.haoxiansheng.demo01.mqTest; import lombok.extern.slf4j.Slf4j; import org.springframework.util.StringUtils; import java.util.concurrent.ArrayBlockingQueue; /** * @author flame * @data 2020/10/5 * 消息处理中心 */ @Slf4j public class Broker { /** * 队列存储消息的最大数量 */ private final static int MAX_SIZE = 5; /** * 保存消息数据的容器 */ private static ArrayBlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(MAX_SIZE); /** * 生产消息 */ public static void produce(String msg) { if (messageQueue.offer(msg)) { log.info("成功向消息中心投递消息=>{}, 当前暂存的消息数量是=>{}", msg, messageQueue.size()); } else { log.info("消息中心暂存的消息达到最大负荷,不能继续放入消息=>{}", messageQueue.size()); } log.info("可以将消息做一些操作消息持久化等"); } /** * 消费消息 */ public static String consume() { String msg = messageQueue.poll(); if (StringUtils.isEmpty(msg)) { log.info("消息中心没有消息可供消费"); } else { // 消费条件满足 log.info("已经消费消息=>{}, 当前暂存消息=>{}", msg, messageQueue.size()); } log.info("可以相对应的讲消息的状态标记为发送等等。。。。。"); return msg; } }

    二、服务端Servwe

    package com.haoxiansheng.demo01.mqTest; import lombok.extern.slf4j.Slf4j; import org.springframework.util.StringUtils; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintStream; import java.net.ServerSocket; import java.net.Socket; /** * @author flame * @data 2020/10/5 * 对外提供服务 */ @Slf4j public class BrokerServer implements Runnable { public static int SERVICE_PORT = 9999; private final static String CONSUMER = "CONSUMER"; private final Socket socket; public BrokerServer(Socket socket) { this.socket = socket; } @Override public void run() { try ( BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); PrintStream out = new PrintStream(socket.getOutputStream()); ) { while (true) { String data = in.readLine(); if (StringUtils.isEmpty(data)) { continue; } log.info("接收到原始数据:data=>{}", data); if (data.equals(CONSUMER)) { // 表示要消费第一条消息 // 从消息队列中消费第一条消息 String msg = Broker.consume(); out.println(msg); out.flush(); } else { // 其他情况表示生产消息放到队列中 Broker.produce(data); } } } catch (IOException e) { e.printStackTrace(); } } // 利用Socket 进行往来通信 public static void main(String[] args) throws IOException { ServerSocket server = new ServerSocket(SERVICE_PORT); while (true) { BrokerServer brokerServer = new BrokerServer(server.accept()); new Thread(brokerServer).start(); } } }

    三、客户端Client

    package com.haoxiansheng.demo01.mqTest; import com.haoxiansheng.demo01.mqTest.BrokerServer; import lombok.extern.slf4j.Slf4j; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.InetAddress; import java.net.Socket; /** * @author flame * @data 2020/10/5 * 客户端建立链接 */ @Slf4j public class MqClient { private final static String CONSUMER = "CONSUMER"; /** * 生产消息 */ public static void produce(String msg) throws IOException { Socket socket = new Socket(InetAddress.getLocalHost(), BrokerServer.SERVICE_PORT); try (PrintWriter out = new PrintWriter(socket.getOutputStream()) ) { out.println(msg); out.flush(); } } /** * 消费消息 */ public static String consume() throws IOException { Socket socket = new Socket(InetAddress.getLocalHost(), BrokerServer.SERVICE_PORT); try ( BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); PrintWriter out = new PrintWriter(socket.getOutputStream()) ) { // 先向队列发送消息"CONSUMER"表示消费 out.println(CONSUMER); out.flush(); // 再从队列中获取消息 String msg = in.readLine(); log.info("消费消息=>{}", msg); return msg; } } }

    四、生产者

    package com.haoxiansheng.demo01.mqTest; import lombok.extern.slf4j.Slf4j; import java.io.IOException; /** * @author flame * @data 2020/10/5 */ @Slf4j public class ProducerClient { public static void main(String[] args) throws IOException, InterruptedException { for (int i = 0; i < 6; i++) { MqClient.produce("Hello world"); } Thread.sleep(5000); MqClient.produce("Hello world"); } }

    五、消费者

    package com.haoxiansheng.demo01.mqTest; import lombok.extern.slf4j.Slf4j; import java.io.IOException; /** * @author flame * @data 2020/10/5 * 客户端建立链接 */ @Slf4j public class ConsumeClient { public static void main(String[] args) throws IOException { String msg = MqClient.consume(); log.info("获取的消息为 msg=>{}", msg); } }

    Processed: 0.017, SQL: 9