一、处理中心类
package com
.haoxiansheng
.demo01
.mqTest
;
import lombok
.extern
.slf4j
.Slf4j
;
import org
.springframework
.util
.StringUtils
;
import java
.util
.concurrent
.ArrayBlockingQueue
;
@Slf4j
public class Broker {
private final static int MAX_SIZE
= 5;
private static ArrayBlockingQueue
<String> messageQueue
= new ArrayBlockingQueue<>(MAX_SIZE
);
public static void produce(String msg
) {
if (messageQueue
.offer(msg
)) {
log
.info("成功向消息中心投递消息=>{}, 当前暂存的消息数量是=>{}", msg
, messageQueue
.size());
} else {
log
.info("消息中心暂存的消息达到最大负荷,不能继续放入消息=>{}", messageQueue
.size());
}
log
.info("可以将消息做一些操作消息持久化等");
}
public static String
consume() {
String msg
= messageQueue
.poll();
if (StringUtils
.isEmpty(msg
)) {
log
.info("消息中心没有消息可供消费");
} else {
log
.info("已经消费消息=>{}, 当前暂存消息=>{}", msg
, messageQueue
.size());
}
log
.info("可以相对应的讲消息的状态标记为发送等等。。。。。");
return msg
;
}
}
二、服务端Servwe
package com
.haoxiansheng
.demo01
.mqTest
;
import lombok
.extern
.slf4j
.Slf4j
;
import org
.springframework
.util
.StringUtils
;
import java
.io
.BufferedReader
;
import java
.io
.IOException
;
import java
.io
.InputStreamReader
;
import java
.io
.PrintStream
;
import java
.net
.ServerSocket
;
import java
.net
.Socket
;
@Slf4j
public class BrokerServer implements Runnable {
public static int SERVICE_PORT
= 9999;
private final static String CONSUMER
= "CONSUMER";
private final Socket socket
;
public BrokerServer(Socket socket
) {
this.socket
= socket
;
}
@Override
public void run() {
try (
BufferedReader in
= new BufferedReader(new InputStreamReader(socket
.getInputStream()));
PrintStream out
= new PrintStream(socket
.getOutputStream());
) {
while (true) {
String data
= in
.readLine();
if (StringUtils
.isEmpty(data
)) {
continue;
}
log
.info("接收到原始数据:data=>{}", data
);
if (data
.equals(CONSUMER
)) {
String msg
= Broker
.consume();
out
.println(msg
);
out
.flush();
} else {
Broker
.produce(data
);
}
}
} catch (IOException e
) {
e
.printStackTrace();
}
}
public static void main(String
[] args
) throws IOException
{
ServerSocket server
= new ServerSocket(SERVICE_PORT
);
while (true) {
BrokerServer brokerServer
= new BrokerServer(server
.accept());
new Thread(brokerServer
).start();
}
}
}
三、客户端Client
package com
.haoxiansheng
.demo01
.mqTest
;
import com
.haoxiansheng
.demo01
.mqTest
.BrokerServer
;
import lombok
.extern
.slf4j
.Slf4j
;
import java
.io
.BufferedReader
;
import java
.io
.IOException
;
import java
.io
.InputStreamReader
;
import java
.io
.PrintWriter
;
import java
.net
.InetAddress
;
import java
.net
.Socket
;
@Slf4j
public class MqClient {
private final static String CONSUMER
= "CONSUMER";
public static void produce(String msg
) throws IOException
{
Socket socket
= new Socket(InetAddress
.getLocalHost(), BrokerServer
.SERVICE_PORT
);
try (PrintWriter out
= new PrintWriter(socket
.getOutputStream())
) {
out
.println(msg
);
out
.flush();
}
}
public static String
consume() throws IOException
{
Socket socket
= new Socket(InetAddress
.getLocalHost(), BrokerServer
.SERVICE_PORT
);
try (
BufferedReader in
= new BufferedReader(new InputStreamReader(socket
.getInputStream()));
PrintWriter out
= new PrintWriter(socket
.getOutputStream())
) {
out
.println(CONSUMER
);
out
.flush();
String msg
= in
.readLine();
log
.info("消费消息=>{}", msg
);
return msg
;
}
}
}
四、生产者
package com
.haoxiansheng
.demo01
.mqTest
;
import lombok
.extern
.slf4j
.Slf4j
;
import java
.io
.IOException
;
@Slf4j
public class ProducerClient {
public static void main(String
[] args
) throws IOException
, InterruptedException
{
for (int i
= 0; i
< 6; i
++) {
MqClient
.produce("Hello world");
}
Thread
.sleep(5000);
MqClient
.produce("Hello world");
}
}
五、消费者
package com
.haoxiansheng
.demo01
.mqTest
;
import lombok
.extern
.slf4j
.Slf4j
;
import java
.io
.IOException
;
@Slf4j
public class ConsumeClient {
public static void main(String
[] args
) throws IOException
{
String msg
= MqClient
.consume();
log
.info("获取的消息为 msg=>{}", msg
);
}
}
转载请注明原文地址:https://blackberry.8miu.com/read-16537.html