(精华)2020年10月7日 高并发高可用 Redis实现异步架构

    科技2025-01-30  6

    前言

    在后端编程时,对需要立即返回的数据我们应当立刻返回,而对于可以慢慢处理而业务复杂的我们可以选择延迟返回。这个实现使用到了异步消息队列。

    异步消息队列

    主要用于实现生产者-消费者模式。也就是说,这个队列应当是可以阻塞的,否者会带来大量的性能浪费。

    生产者-消费者模式

    实现 1.定义事件类型 – 定义Enum类 – EnumType 用于表示该事件的类型

    public enum EventType { //这里列举了四种类型 LIKE(0), COMMENT(1), LOGIN(2), MAIL(3); private int value; EventType(int value) { this.value = value; } public int getValue() { return value; } }

    2.定义事件的实体 – EventModel 这里说明一下entityOwnerId的必要性。举个例子,当我们给一个人点赞时,系统要给那个人(也就是entityOwnerId)发送一个站内信,通知那个人他被点赞了。当然,我们也可以把entityOwnerId包装在exts里,但因为几乎每一个事件都需要这个字段,所以这里我们开一个字段给他。

    public class EventModel { //之前定义的事件类型 private EventType type; //触发者的id private int actorId; //entityId和entityType共同组成了所触发的事件 private int entityId; private int entityType; //该事件的拥有者 private int entityOwnerId; //需要传输的额外信息 private Map<String, String> exts = new HashMap<>(); public Map<String, String> getExts() { return exts; } public EventModel() { } public EventModel(EventType type) { this.type = type; } public String getExt(String name) { return exts.get(name); } public EventModel setExt(String name, String value) { exts.put(name, value); return this; } public EventType getType() { return type; } public EventModel setType(EventType type) { this.type = type; return this; } public int getActorId() { return actorId; } public EventModel setActorId(int actorId) { this.actorId = actorId; return this; } public int getEntityId() { return entityId; } public EventModel setEntityId(int entityId) { this.entityId = entityId; return this; } public int getEntityType() { return entityType; } public EventModel setEntityType(int entityType) { this.entityType = entityType; return this; } public int getEntityOwnerId() { return entityOwnerId; } public EventModel setEntityOwnerId(int entityOwnerId) { this.entityOwnerId = entityOwnerId; return this; } }

    3.生产者的实现 – EventProducer 这里的队列我们使用Redis的阻塞双向队列list来实现。 a) 我们先用JSON把事件序列化 b) 再通过lpush把事件推进队列里 EventPr oducer

    @Service public class EventProducer { @Autowired JedisAdapter jedisAdapter; public boolean fireEvent(EventModel eventModel) { try { String json = JSONObject.toJSONString(eventModel); String key = RedisKeyUtil.getEventQueueKey(); jedisAdapter.lpush(key, json); return true; } catch (Exception e) { return false; } } }

    RedisKeyUtil – 用于统一的管理Redis的Key

    public class RedisKeyUtil { private static String BIZ_EVENT = "EVENT"; public static String getEventQueueKey() { return BIZ_EVENT; } }

    JedisAdapter – 对Jedis的函数进行一层封装

    @Service public class JedisAdapter implements InitializingBean { private static final Logger logger = LoggerFactory.getLogger(JedisAdapter.class); private Jedis jedis = null; private JedisPool pool = null; @Override public void afterPropertiesSet() throws Exception { pool = new JedisPool("localhost", 6379); } private Jedis getJedis() { return pool.getResource(); } public long lpush(String key, String value) { Jedis jedis = null; try { jedis = getJedis(); return jedis.lpush(key, value); } catch (Exception e) { logger.error("发生异常" + e.getMessage()); return 0; } finally { if (jedis != null) { jedis.close(); } } } public List<String> brpop(int timeout, String key) { Jedis jedis = null; try { jedis = pool.getResource(); return jedis.brpop(timeout, key); } catch (Exception e) { logger.error("发生异常" + e.getMessage()); return null; } finally { if (jedis != null) { jedis.close(); } } } }

    4.定义一个事件处理器的接口 – EventHandler

    public interface EventHandler { //事件处理函数 void doHandle(EventModel model); //获取该事件处理器所支持的事件类型 List<EventType> getSupportEventTypes(); }

    5.消费者的实现 – EventConsumer a)创建一个类型为Map<EventType, List>的map,用于存放所有的Handler。 b)在afterPropertiesSet函数中(这个函数在sping在初始化完该Bean后会执行),我们通过applicationContext获取实现了EventHandler接口的全部Handler。 b.1)通过for循环,分门别类的把各个Handler放到map中。 b.2)启动线程去消化事件 b.2.1)该线程使用死循环让其不间断的运行。 b.2.2)用brpop把事件拉出来 b.2.3)过滤掉key之后,剩下value,把value用JSON的api转化为EventModel b.2.4)在map中寻找是否有能处理EventModel的Handler,判断方法是看EventType是否支持。 b.2.5)过滤掉不支持的EventType之后,调用每一个支持该EventType的doHandle方法。 具体代码实现

    @Service public class EventConsumer implements InitializingBean, ApplicationContextAware { private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class); private Map<EventType, List<EventHandler>> config = new HashMap<>(); private ApplicationContext applicationContext; @Autowired private JedisAdapter jedisAdapter; @Override public void afterPropertiesSet() throws Exception { Map<String, EventHandler> beans = applicationContext.getBeansOfType(EventHandler.class); if (beans != null) { for (Map.Entry<String, EventHandler> entry : beans.entrySet()) { List<EventType> eventTypes = entry.getValue().getSupportEventTypes(); for (EventType type : eventTypes) { if (!config.containsKey(type)) { config.put(type, new ArrayList<EventHandler>()); } // 注册每个事件的处理函数 config.get(type).add(entry.getValue()); } } } // 启动线程去消费事件 Thread thread = new Thread(new Runnable() { @Override public void run() { // 从队列一直消费 while (true) { String key = RedisKeyUtil.getEventQueueKey(); List<String> messages = jedisAdapter.brpop(0, key); // 第一个元素是队列名字 for (String message : messages) { if (message.equals(key)) { continue; } EventModel eventModel = JSON.parseObject(message, EventModel.class); // 找到这个事件的处理handler列表 if (!config.containsKey(eventModel.getType())) { logger.error("不能识别的事件"); continue; } for (EventHandler handler : config.get(eventModel.getType())) { handler.doHandle(eventModel); } } } } }); thread.start(); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } }

    6.写一个实现了EventHandler接口的实现

    @Component public class LikeHandler implements EventHandler { @Autowired MessageService messageService; @Autowired UserService userService; @Override public void doHandle(EventModel model) { Message message = new Message(); User user = userService.getUser(model.getActorId()); message.setToId(model.getEntityOwnerId()); message.setContent("用户" + user.getName() + " 赞了你的资讯,http://127.0.0.1:8080/news/" + String.valueOf(model.getEntityId())); // SYSTEM ACCOUNT message.setFromId(3); message.setCreatedDate(new Date()); messageService.addMessage(message); } @Override public List<EventType> getSupportEventTypes() { return Arrays.asList(EventType.LIKE); } }

    7.在Controller中调用Producer的fireEvent – 用于产生一个事件

    @Controller public class LikeController { @Autowired LikeService likeService; @Autowired HostHolder hostHolder; @Autowired NewsService newsService; @Autowired EventProducer eventProducer; @RequestMapping(path = {"/like"}, method = {RequestMethod.GET, RequestMethod.POST}) @ResponseBody public String like(@Param("newId") int newsId) { long likeCount = likeService.like(hostHolder.getUser().getId(), EntityType.ENTITY_NEWS, newsId); // 更新喜欢数 News news = newsService.getById(newsId); newsService.updateLikeCount(newsId, (int) likeCount); eventProducer.fireEvent(new EventModel(EventType.LIKE) .setEntityOwnerId(news.getUserId()) .setActorId(hostHolder.getUser().getId()).setEntityId(newsId)); return ToutiaoUtil.getJSONString(0, String.valueOf(likeCount)); } }
    Processed: 0.009, SQL: 8