消息通知系统详解4---整合Netty和WebSocket-灵析社区

提笔写架构

修改配置

修改消息通知微服务模块tensquare_notice的pom文件,添加下面的dependency依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>       
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.5.Final</version>
</dependency>

2. 修改application.yml文件,添加下面的配置

  rabbitmq:
    host: 192.168.200.128

这样消息通知微服务就引入了netty框架,并且具有了和Rabbitmq交互的能力

实现Netty的整合

整合分析

现在的通讯模式如下:

因为使用到了WebSocket和Netty,整合方式和以前有所不同,整合步骤:

  1. 编写NettyServer,启动Netty服务。
  2. 使用配置Bean创建Netty服务。编写NettyConfig。
  3. 编写和WebSocket进行通讯处理类MyWebSocketHandler,进行MQ和WebSocket的消息处理。
  4. 使用配置Bean创建Rabbit监听器容器,使用监听器。编写RabbitConfig。
  5. 编写Rabbit监听器SysNoticeListener,用来获取MQ消息并进行处理。

五个类的关系如下图:

实现整合

1. 编写ApplicationContextProvider.java,这个类是工具类,作用是获取Spring容器中的实例

@Component
public class ApplicationContextProvider implements ApplicationContextAware {
    /**
     * 上下文对象实例
     */
    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    /**
     * 获取applicationContext
     *
     * @return
     */
    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    /**
     * 通过name获取 Bean.
     *
     * @param name
     * @return
     */
    public Object getBean(String name) {
        return getApplicationContext().getBean(name);
    }

    /**
     * 通过class获取Bean.
     *
     * @param clazz
     * @param <T>
     * @return
     */
    public <T> T getBean(Class<T> clazz) {
        return getApplicationContext().getBean(clazz);
    }

    /**
     * 通过name,以及Clazz返回指定的Bean
     *
     * @param name
     * @param clazz
     * @param <T>
     * @return
     */
    public <T> T getBean(String name, Class<T> clazz) {
        return getApplicationContext().getBean(name, clazz);
    }
}

2. 编写NettyServer

public class NettyServer {
​
    public void start(int port) {
        System.out.println("准备启动Netty。。。");
        ServerBootstrap serverBootstrap = new ServerBootstrap();
​
        //用来处理新连接的
        EventLoopGroup boos = new NioEventLoopGroup();
        //用来处理业务逻辑的,读写。。。
        EventLoopGroup worker = new NioEventLoopGroup();
​
        serverBootstrap.group(boos, worker)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        //请求消息解码器
                        ch.pipeline().addLast(new HttpServerCodec());
                        // 将多个消息转换为单一的request或者response对象
                        ch.pipeline().addLast(new HttpObjectAggregator(65536));
                        //处理WebSocket的消息事件
                        ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws"));
​
                        //创建自己的webSocket处理器,就是用来编写业务逻辑的
                        MyWebSocketHandler myWebSocketHandler = new MyWebSocketHandler();
                        ch.pipeline().addLast(myWebSocketHandler);
                    }
                }).bind(port);
    }
}

3. 编写NettyConfig

@Configuration
public class NettyConfig {
​
    @Bean
    public NettyServer createNettyServer() {
        NettyServer nettyServer = new NettyServer();
​
        //启动Netty服务,使用新的线程启动
        new Thread(){
            @Override
            public void run() {
               nettyServer.start(1234);
            }
        }.start();
​
        return nettyServer;
    }
}

4. 编写MyWebSocketHandler

public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
​
    private static ObjectMapper MAPPER = new ObjectMapper();
​
    // 送Spring容器中获取消息监听器容器,处理订阅消息sysNotice
    SimpleMessageListenerContainer sysNoticeContainer = (SimpleMessageListenerContainer) ApplicationContextProvider.getApplicationContext()
            .getBean("sysNoticeContainer");
​
    //从Spring容器中获取RabbitTemplate
    RabbitTemplate rabbitTemplate = ApplicationContextProvider.getApplicationContext()
            .getBean(RabbitTemplate.class);
​
    //存放WebSocket连接Map,根据用户id存放
    public static ConcurrentHashMap<String, Channel> userChannelMap = new ConcurrentHashMap();
​
    //用户请求WebSocket服务端,执行的方法
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        //约定用户第一次请求携带的数据:{"userId":"1"}
        //获取用户请求数据并解析
        String json = msg.text();
        //解析json数据,获取用户id
        String userId = MAPPER.readTree(json).get("userId").asText();
​
​
        //第一次请求的时候,需要建立WebSocket连接
        Channel channel = userChannelMap.get(userId);
        if (channel == null) {
            //获取WebSocket的连接
            channel = ctx.channel();
​
            //把连接放到容器中
            userChannelMap.put(userId, channel);
        }
​
        //只用完成新消息的提醒即可,只需要获取消息的数量
        //获取RabbitMQ的消息内容,并发送给用户
        RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitTemplate);
        //拼接获取队列名称
        String queueName = "article_subscribe_" + userId;
        //获取Rabbit的Properties容器
        Properties queueProperties = rabbitAdmin.getQueueProperties(queueName);
​
        //获取消息数量
        int noticeCount = 0;
        //判断Properties是否不为空
        if (queueProperties != null) {
            // 如果不为空,获取消息的数量
            noticeCount = (int) queueProperties.get("QUEUE_MESSAGE_COUNT");
        }
​
        //封装返回的数据
        HashMap countMap = new HashMap();
        countMap.put("sysNoticeCount", noticeCount);
        Result result = new Result(true, StatusCode.OK, "查询成功", countMap);
​
        //把数据发送给用户
        channel.writeAndFlush(new TextWebSocketFrame(MAPPER.writeValueAsString(result)));
​
        //把消息从队列里面清空,否则MQ消息监听器会再次消费一次
        if (noticeCount > 0) {
            rabbitAdmin.purgeQueue(queueName, true);
        }
​
        //为用户的消息通知队列注册监听器,便于用户在线的时候,
        //一旦有消息,可以主动推送给用户,不需要用户请求服务器获取数据
        sysNoticeContainer.addQueueNames(queueName);
    }
}

5. 编写RabbitConfig

@Configuration
public class RabbitConfig {
​
    @Bean("sysNoticeContainer")
    public SimpleMessageListenerContainer create(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container =
                new SimpleMessageListenerContainer(connectionFactory);
        //使用Channel
        container.setExposeListenerChannel(true);
        //设置自己编写的监听器
        container.setMessageListener(new SysNoticeListener());
​
        return container;
    }
}

6. 编写SysNoticeListener

public class SysNoticeListener implements ChannelAwareMessageListener {
​
    private static ObjectMapper MAPPER = new ObjectMapper();
​
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        //获取用户id,可以通过队列名称获取
        String queueName = message.getMessageProperties().getConsumerQueue();
        String userId = queueName.substring(queueName.lastIndexOf("_") + 1);
​
        io.netty.channel.Channel wsChannel = MyWebSocketHandler.userChannelMap.get(userId);
​
        //判断用户是否在线
        if (wsChannel != null) {
            //如果连接不为空,表示用户在线
            //封装返回数据
            HashMap countMap = new HashMap();
            countMap.put("sysNoticeCount", 1);
            Result result = new Result(true, StatusCode.OK, "查询成功", countMap);
​
            // 把数据通过WebSocket连接主动推送用户
            wsChannel.writeAndFlush(new TextWebSocketFrame(MAPPER.writeValueAsString(result)));
        }
    }
}

7. 修改启动类,添加Netty服务的启动

public static void main(String[] args) {
    SpringApplication.run(NoticeApplication.class, args);
​
    NettyServer server = ApplicationContextProvider.getApplicationContext().getBean(NettyServer.class);
    try {
        server.start(12345);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

至此,基本代码已经完成,可以搞个前端页面测试下。

阅读量:2025

点赞量:0

收藏量:0