博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
使用WebSocket帮助应用程序群集节点间通信
阅读量:4967 次
发布时间:2019-06-12

本文共 2981 字,大约阅读时间需要 9 分钟。

【序列化message传输方式】两种方式都是转成二进制。

1.使用Java序列化器,ObjectXXXputStream

2.使用ByteBuffer.wrap(bytes)。

 

在一个标准群集场景中,节点通过一个数据包发送到协定好的多播IP地址:Port上,建立起通信。比如使用TCP插头。

【使用Servlet模拟群集场景】

 【1.连接上@ServerEndPoint】

  【节点做的事】

//ws://localhost:8080/cluster/clusterNodeSocket/clusterNode1/query URI uri = new URI("ws", "localhost:8080", path, null, null); //连接上websocket this.session = ContainerProvider.getWebSocketContainer()         .connectToServer(this, uri); 【Server做的事】
2     public void onOpen(Session session, @PathParam("nodeId") String nodeId) 3     {
8 ClusterMessage message = new ClusterMessage(nodeId, "Joined the cluster."); 11 //通知所有节点 有新的节点加入 因为这是在onOpen发生的,也就是终端连接上的代表加入12 byte[] bytes = ClusterNodeEndpoint.toByteArray(message);13 for(Session node : ClusterNodeEndpoint.nodes)14 //发送ByteBuffer 因为ClusterMessage实现了序列化,想要在websocket上传送序列化数据,必须做成二进制。15 node.getBasicRemote().sendBinary(ByteBuffer.wrap(bytes)); 22 ClusterNodeEndpoint.nodes.add(session);23 }

 

【2.Servlet负责路由请求和接收消息、Server负责传递给其他节点消息】

【节点处理get请求】

2     protected void doGet(HttpServletRequest request, HttpServletResponse response) 3             throws ServletException, IOException 4     {
6 //构造Message准备发给节点 7 ClusterMessage message = new ClusterMessage(this.nodeId, 8 "request:{ip:\"" + request.getRemoteAddr() + 9 "\",queryString:\"" + request.getQueryString() + "\"}");10 11 //使用序列化机制发送消息12 try(OutputStream output = this.session.getBasicRemote().getSendStream();13 ObjectOutputStream stream = new ObjectOutputStream(output))14 {15 stream.writeObject(message);16 }17 response.getWriter().append("OK");18 }

【节点接收消息】

1  @OnMessage 2     public void onMessage(InputStream input) 3     { 4         try(ObjectInputStream stream = new ObjectInputStream(input)) 5         { 6             ClusterMessage message = (ClusterMessage)stream.readObject(); 7             System.out.println("INFO (Node " + this.nodeId + 8                     "): Message received from cluster; node = " + 9                     message.getNodeId() + ", message = " + message.getMessage());10         }11         catch(IOException | ClassNotFoundException e)12         {13             e.printStackTrace();14         }15     }

【Server传递给其他节点消息】

1 @OnMessage 2     public void onMessage(Session session, byte[] message) 3     { 4         try 5         { 6             for(Session node : ClusterNodeEndpoint.nodes) 7             { 8                 //向其他节点发送消息(消息来自当前节点) 9                 if(node != session)11                     node.getBasicRemote().sendBinary(ByteBuffer.wrap(message));12             }13         }14         catch(IOException e)15         {16             System.err.println("ERROR: Exception when handling message on server");17             e.printStackTrace();18         }19     }

 

转载于:https://www.cnblogs.com/chenhui7373/p/8654592.html

你可能感兴趣的文章
Android 数据库打包随APK发布
查看>>
uploadify图片上传配置
查看>>
[SDOI2016]模式字符串
查看>>
文字滚动
查看>>
HDU 5317 RGCDQ
查看>>
洗牌算法
查看>>
优先队列小结
查看>>
vim + DoxygenToolkit.vim环境搭建
查看>>
JQuery的选择器的简单介绍
查看>>
ss清除浮动float的三种方法总结,为什么清浮动?浮动会有那些影响?
查看>>
phpQuery—基于jQuery的PHP实现
查看>>
C# 队列(Queue) 和堆栈(Stack)
查看>>
线性表
查看>>
开通博客
查看>>
【排障】Outlook Express 2G收件箱大小限制
查看>>
HyperLedger Fabric ca 1.2 正式环境部署
查看>>
编译链接过程总结
查看>>
数学图形(1.11) 玫瑰线
查看>>
python类中super()和__init__()的关系【复习】
查看>>
工厂方法模式(Factory Method Pattern)
查看>>