用户获取新的消息通知有两种模式
设想下,用户的通知消息和新通知提醒数据都放在数据库中,数据库的读写操作频繁。如果消息量大,DB压力较大,可能出现数据瓶颈。
这边按照上述两种模式,拆分下设计:
此模式是接受者请求系统,系统将新的消息通知返回给接收者的模式,流程如下:
此模式是系统将新的消息通知返回给接收者的模式,流程如下:
在虚拟机中启动RabbitMQ
docker run -id --name=tensquare_rabbit -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 15672:15672 -p 25672:25672 rabbitmq:management
访问地址:http://192.168.200.128:15672
登录账号: guest
登录密码: guest
上面提到了Netty,在开始了解Netty之前,先来实现一个客户端与服务端通信的程序,使用传统的IO编程和使用NIO编程有什么不一样。
每个客户端连接过来后,服务端都会启动一个线程去处理该客户端的请求。阻塞I/O的通信模型示意图如下:
业务场景:客户端每隔两秒发送字符串给服务端,服务端收到之后打印到控制台。
public class IOServer {
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(8000);
while (true) {
// (1) 阻塞方法获取新的连接
Socket socket = serverSocket.accept();
new Thread() {
@Override
public void run() {
String name = Thread.currentThread().getName();
try {
// (2) 每一个新的连接都创建一个线程,负责读取数据
byte[] data = new byte[1024];
InputStream inputStream = socket.getInputStream();
while (true) {
int len;
// (3) 按字节流方式读取数据
while ((len = inputStream.read(data)) != -1) {
System.out.println("线程" + name + ":" + new String(data, 0, len));
}
}
} catch (Exception e) {
}
}
}.start();
}
}
}
客户端实现:
public class MyClient {
public static void main(String[] args) {
//测试使用不同的线程数进行访问
for (int i = 0; i < 5; i++) {
new ClientDemo().start();
}
}
static class ClientDemo extends Thread {
@Override
public void run() {
try {
Socket socket = new Socket("127.0.0.1", 8000);
while (true) {
socket.getOutputStream().write(("测试数据").getBytes());
socket.getOutputStream().flush();
Thread.sleep(2000);
}
} catch (Exception e) {
}
}
}
}
从服务端代码中我们可以看到,在传统的IO模型中,每个连接创建成功之后都需要一个线程来维护,每个线程包含一个while死循环。
如果在用户数量较少的情况下运行是没有问题的,但是对于用户数量比较多的业务来说,服务端可能需要支撑成千上万的连接,IO模型可能就不太合适了。
如果有1万个连接就对应1万个线程,继而1万个while死循环,这种模型存在以下问题:
NIO,也叫做new-IO或者non-blocking-IO,可理解为非阻塞IO。NIO编程模型中,新来一个连接不再创建一个新的线程,而是可以把这条连接直接绑定到某个固定的线程,然后这条连接所有的读写都由这个线程来负责,我们用一幅图来对比一下IO与NIO:
如上图所示,IO模型中,一个连接都会创建一个线程,对应一个while死循环,死循环的目的就是不断监测这条连接上是否有数据可以读。但是在大多数情况下,1万个连接里面同一时刻只有少量的连接有数据可读,因此,很多个while死循环都白白浪费掉了,因为没有数据。
而在NIO模型中,可以把这么多的while死循环变成一个死循环,这个死循环由一个线程控制。这就是NIO模型中选择器(Selector)的作用,一条连接来了之后,现在不创建一个while死循环去监听是否有数据可读了,而是直接把这条连接注册到选择器上,通过检查这个选择器,就可以批量监测出有数据可读的连接,进而读取数据。
NIO的三大核心组件:通道(Channel)、缓冲(Buffer)、选择器(Selector)
通道(Channel)
是传统IO中的Stream(流)的升级版。Stream是单向的、读写分离(inputstream和outputstream),Channel是双向的,既可以进行读操作,又可以进行写操作。
缓冲(Buffer)
Buffer可以理解为一块内存区域,可以写入数据,并且在之后读取它。
选择器(Selector)
选择器(Selector)可以实现一个单独的线程来监控多个注册在她上面的信道(Channel),通过一定的选择机制,实现多路复用的效果。
NIO相对于IO的优势:
IO是面向流的,每次都是从操作系统底层一个字节一个字节地读取数据,并且数据只能从一端读取到另一端,不能前后移动流中的数据。NIO则是面向缓冲区的,每次可以从这个缓冲区里面读取一块的数据,并且可以在需要时在缓冲区中前后移动。
IO是阻塞的,这意味着,当一个线程读取数据或写数据时,该线程被阻塞,直到有一些数据被读取,或数据完全写入,在此期间该线程不能干其他任何事情。而NIO是非阻塞的,不需要一直等待操作完成才能干其他事情,而是在等待的过程中可以同时去做别的事情,所以能最大限度地使用服务器的资源。
NIO引入了IO多路复用器selector。selector是一个提供channel注册服务的线程,可以同时对接多个Channel,并在线程池中为channel适配、选择合适的线程来处理channel。由于NIO模型中线程数量大大降低,线程切换效率因此也大幅度提高。
和前面一样的场景,使用NIO实现(复制代码演示效果即可):
public class NIOServer {
public static void main(String[] args) throws IOException {
// 负责轮询是否有新的连接
Selector serverSelector = Selector.open();
// 负责轮询处理连接中的数据
Selector clientSelector = Selector.open();
new Thread() {
@Override
public void run() {
try {
// 对应IO编程中服务端启动
ServerSocketChannel listenerChannel = ServerSocketChannel.open();
listenerChannel.socket().bind(new InetSocketAddress(8000));
listenerChannel.configureBlocking(false);
// OP_ACCEPT表示服务器监听到了客户连接,服务器可以接收这个连接了
listenerChannel.register(serverSelector, SelectionKey.OP_ACCEPT);
while (true) {
// 监测是否有新的连接,这里的1指的是阻塞的时间为1ms
if (serverSelector.select(1) > 0) {
Set<SelectionKey> set = serverSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = set.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isAcceptable()) {
try {
// (1) 每来一个新连接,不需要创建一个线程,而是直接注册到clientSelector
SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
clientChannel.configureBlocking(false);
// OP_READ表示通道中已经有了可读的数据,可以执行读操作了(通道目前有数据,可以进行读操作了)
clientChannel.register(clientSelector, SelectionKey.OP_READ);
} finally {
keyIterator.remove();
}
}
}
}
}
} catch (IOException ignored) {
}
}
}.start();
new Thread() {
@Override
public void run() {
String name = Thread.currentThread().getName();
try {
while (true) {
// (2) 批量轮询是否有哪些连接有数据可读,这里的1指的是阻塞的时间为1ms
if (clientSelector.select(1) > 0) {
Set<SelectionKey> set = clientSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = set.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isReadable()) {
try {
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// (3) 读取数据以块为单位批量读取
clientChannel.read(byteBuffer);
byteBuffer.flip();
System.out.println("线程" + name + ":" + Charset.defaultCharset().newDecoder().decode(byteBuffer)
.toString());
} finally {
keyIterator.remove();
key.interestOps(SelectionKey.OP_READ);
}
}
}
}
}
} catch (IOException ignored) {
}
}
}.start();
}
}
阅读量:2053
点赞量:0
收藏量:0