Commit 595174709561e4786d663c03d3ea41885163cc2d
1 parent
60046e83
When firing an RpcBroadcastMsg message or a ToAllNodesMsg from one node, the oth…
…er nodes never get notified because the messages doesn't have a serverAddress. As a result firing such a method is not forwarded to the other cluster nodes. This PR adds the serverAddress for each node that the message is sent to so the other nodes get the message and can react to component lifecycle changes.
Showing
1 changed file
with
13 additions
and
7 deletions
... | ... | @@ -5,7 +5,7 @@ |
5 | 5 | * you may not use this file except in compliance with the License. |
6 | 6 | * You may obtain a copy of the License at |
7 | 7 | * |
8 | - * http://www.apache.org/licenses/LICENSE-2.0 | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | 9 | * |
10 | 10 | * Unless required by applicable law or agreed to in writing, software |
11 | 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
... | ... | @@ -29,11 +29,7 @@ import org.thingsboard.server.common.msg.cluster.ServerAddress; |
29 | 29 | import org.thingsboard.server.gen.cluster.ClusterAPIProtos; |
30 | 30 | import org.thingsboard.server.service.cluster.discovery.ServerInstance; |
31 | 31 | |
32 | -import java.util.HashMap; | |
33 | -import java.util.LinkedList; | |
34 | -import java.util.Map; | |
35 | -import java.util.Queue; | |
36 | -import java.util.UUID; | |
32 | +import java.util.*; | |
37 | 33 | |
38 | 34 | /** |
39 | 35 | * @author Andrew Shvayka |
... | ... | @@ -88,7 +84,17 @@ public class RpcManagerActor extends ContextAwareActor { |
88 | 84 | |
89 | 85 | private void onMsg(RpcBroadcastMsg msg) { |
90 | 86 | log.debug("Forwarding msg to session actors {}", msg); |
91 | - sessionActors.keySet().forEach(address -> onMsg(msg.getMsg())); | |
87 | + sessionActors.keySet().forEach(address -> { | |
88 | + ClusterAPIProtos.ClusterMessage msgWithServerAddress = msg.getMsg() | |
89 | + .toBuilder() | |
90 | + .setServerAddress(ClusterAPIProtos.ServerAddress | |
91 | + .newBuilder() | |
92 | + .setHost(address.getHost()) | |
93 | + .setPort(address.getPort()) | |
94 | + .build()) | |
95 | + .build(); | |
96 | + onMsg(msgWithServerAddress); | |
97 | + }); | |
92 | 98 | pendingMsgs.values().forEach(queue -> queue.add(msg.getMsg())); |
93 | 99 | } |
94 | 100 | ... | ... |