Commit 9c792bb95605154b7f06505a61f4cbd1b64e1c7a

Authored by Andrew Shvayka
1 parent 1d6e1ea2

TMP commit

... ... @@ -207,6 +207,10 @@ public class ActorSystemContext {
207 207 @Getter
208 208 private DeviceStateService deviceStateService;
209 209
  210 + @Value("${cluster.partition_id}")
  211 + @Getter
  212 + private long queuePartitionId;
  213 +
210 214 @Value("${actors.session.sync.timeout}")
211 215 @Getter
212 216 private long syncSessionTimeout;
... ...
... ... @@ -114,12 +114,12 @@ class DefaultTbContext implements TbContext {
114 114
115 115 @Override
116 116 public TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) {
117   - return new TbMsg(UUIDs.timeBased(), type, originator, metaData.copy(), data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), 0L);
  117 + return new TbMsg(UUIDs.timeBased(), type, originator, metaData.copy(), data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), mainCtx.getQueuePartitionId());
118 118 }
119 119
120 120 @Override
121 121 public TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) {
122   - return new TbMsg(origMsg.getId(), type, originator, metaData.copy(), data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), 0L);
  122 + return new TbMsg(origMsg.getId(), type, originator, metaData.copy(), data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), mainCtx.getQueuePartitionId());
123 123 }
124 124
125 125 @Override
... ...
... ... @@ -95,12 +95,12 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
95 95
96 96 private void reprocess(List<RuleNode> ruleNodeList) {
97 97 for (RuleNode ruleNode : ruleNodeList) {
98   - for (TbMsg tbMsg : queue.findUnprocessed(ruleNode.getId().getId(), 0L)) {
  98 + for (TbMsg tbMsg : queue.findUnprocessed(ruleNode.getId().getId(), systemContext.getQueuePartitionId())) {
99 99 pushMsgToNode(nodeActors.get(ruleNode.getId()), tbMsg);
100 100 }
101 101 }
102 102 if (firstNode != null) {
103   - for (TbMsg tbMsg : queue.findUnprocessed(entityId.getId(), 0L)) {
  103 + for (TbMsg tbMsg : queue.findUnprocessed(entityId.getId(), systemContext.getQueuePartitionId())) {
104 104 pushMsgToNode(firstNode, tbMsg);
105 105 }
106 106 }
... ... @@ -269,6 +269,6 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
269 269
270 270 private TbMsg enrichWithRuleChainId(TbMsg tbMsg) {
271 271 // We don't put firstNodeId because it may change over time;
272   - return new TbMsg(tbMsg.getId(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData().copy(), tbMsg.getData(), entityId, null, 0L);
  272 + return new TbMsg(tbMsg.getId(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData().copy(), tbMsg.getData(), entityId, null, systemContext.getQueuePartitionId());
273 273 }
274 274 }
... ...
... ... @@ -88,7 +88,7 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract
88 88
89 89 protected void putToQueue(final TbMsg tbMsg, final Consumer<TbMsg> onSuccess) {
90 90 EntityId entityId = tbMsg.getRuleNodeId() != null ? tbMsg.getRuleNodeId() : tbMsg.getRuleChainId();
91   - Futures.addCallback(queue.put(tbMsg, entityId.getId(), 0), new FutureCallback<Void>() {
  91 + Futures.addCallback(queue.put(tbMsg, entityId.getId(), tbMsg.getClusterPartition()), new FutureCallback<Void>() {
92 92 @Override
93 93 public void onSuccess(@Nullable Void result) {
94 94 onSuccess.accept(tbMsg);
... ...
... ... @@ -17,6 +17,7 @@ package org.thingsboard.server.service.cluster.rpc;
17 17
18 18 import io.grpc.stub.StreamObserver;
19 19 import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
  20 +import org.thingsboard.server.common.msg.TbActorMsg;
20 21 import org.thingsboard.server.common.msg.cluster.ServerAddress;
21 22 import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
22 23 import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
... ... @@ -42,4 +43,6 @@ public interface ClusterRpcService {
42 43
43 44 void tell(ClusterAPIProtos.ClusterMessage message);
44 45
  46 + void tell(ServerAddress serverAddress, TbActorMsg actorMsg);
  47 +
45 48 }
... ...
... ... @@ -43,6 +43,7 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription;
43 43 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
44 44 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate;
45 45 import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
  46 +import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
46 47 import org.thingsboard.server.service.state.DefaultDeviceStateService;
47 48 import org.thingsboard.server.service.state.DeviceStateService;
48 49
... ... @@ -83,6 +84,9 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
83 84 private ClusterRoutingService routingService;
84 85
85 86 @Autowired
  87 + private ClusterRpcService rpcService;
  88 +
  89 + @Autowired
86 90 @Lazy
87 91 private DeviceStateService stateService;
88 92
... ... @@ -106,7 +110,6 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
106 110 }
107 111
108 112 private final Map<EntityId, Set<Subscription>> subscriptionsByEntityId = new HashMap<>();
109   -
110 113 private final Map<String, Map<Integer, Subscription>> subscriptionsByWsSessionId = new HashMap<>();
111 114
112 115 @Override
... ... @@ -117,6 +120,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
117 120 ServerAddress address = server.get();
118 121 log.trace("[{}] Forwarding subscription [{}] for device [{}] to [{}]", sessionId, sub.getSubscriptionId(), entityId, address);
119 122 subscription = new Subscription(sub, true, address);
  123 +// rpcService.tell();
120 124 // rpcHandler.onNewSubscription(ctx, address, sessionId, subscription);
121 125 } else {
122 126 log.trace("[{}] Registering local subscription [{}] for device [{}]", sessionId, sub.getSubscriptionId(), entityId);
... ...
... ... @@ -19,70 +19,6 @@ package cluster;
19 19 option java_package = "org.thingsboard.server.gen.cluster";
20 20 option java_outer_classname = "ClusterAPIProtos";
21 21
22   -//message Uid {
23   -// sint64 pluginUuidMsb = 1;
24   -// sint64 pluginUuidLsb = 2;
25   -//}
26   -//
27   -//message PluginAddress {
28   -// Uid pluginId = 1;
29   -// Uid tenantId = 2;
30   -//}
31   -//
32   -//message ToPluginRpcMessage {
33   -// PluginAddress address = 1;
34   -// int32 clazz = 2;
35   -// bytes data = 3;
36   -//}
37   -//
38   -//message ToDeviceActorRpcMessage {
39   -// bytes data = 1;
40   -//}
41   -//
42   -//message ToDeviceSessionActorRpcMessage {
43   -// bytes data = 1;
44   -//}
45   -//
46   -//message ToDeviceActorNotificationRpcMessage {
47   -// bytes data = 1;
48   -//}
49   -//
50   -//message ToAllNodesRpcMessage {
51   -// bytes data = 1;
52   -//}
53   -//
54   -//message ConnectRpcMessage {
55   -// ServerAddress serverAddress = 1;
56   -//}
57   -//
58   -//message ToDeviceRpcRequestRpcMessage {
59   -// Uid deviceTenantId = 2;
60   -// Uid deviceId = 3;
61   -//
62   -// Uid msgId = 4;
63   -// bool oneway = 5;
64   -// int64 expTime = 6;
65   -// string method = 7;
66   -// string params = 8;
67   -//}
68   -//
69   -//message ToPluginRpcResponseRpcMessage {
70   -// Uid msgId = 2;
71   -// string response = 3;
72   -// string error = 4;
73   -//}
74   -//
75   -//message ToRpcServerMessage {
76   -// ConnectRpcMessage connectMsg = 1;
77   -// ToPluginRpcMessage toPluginRpcMsg = 2;
78   -// ToDeviceActorRpcMessage toDeviceActorRpcMsg = 3;
79   -// ToDeviceSessionActorRpcMessage toDeviceSessionActorRpcMsg = 4;
80   -// ToDeviceActorNotificationRpcMessage toDeviceActorNotificationRpcMsg = 5;
81   -// ToAllNodesRpcMessage toAllNodesRpcMsg = 6;
82   -// ToDeviceRpcRequestRpcMessage toDeviceRpcRequestRpcMsg = 7;
83   -// ToPluginRpcResponseRpcMessage toPluginRpcResponseRpcMsg = 8;
84   -//}
85   -
86 22 service ClusterRpcService {
87 23 rpc handleMsgs(stream ClusterMessage) returns (stream ClusterMessage) {}
88 24 }
... ... @@ -103,7 +39,6 @@ message MessageMataInfo {
103 39 repeated string tags = 2;
104 40 }
105 41
106   -
107 42 enum MessageType {
108 43
109 44 //Cluster control messages
... ... @@ -113,6 +48,7 @@ enum MessageType {
113 48 RPC_BROADCAST_MSG = 3;
114 49 CONNECT_RPC_MESSAGE =4;
115 50
116   - //CLUSTER_DATA_MESSAGE
117   - CLUSTER_NETWORK_SERVER_DATA_MESSAGE = 5;
  51 + CLUSTER_ACTOR_MESSAGE = 5;
  52 + CLUSTER_TELEMETRY_MESSAGE = 6;
  53 + CLUSTER_DEVICE_RPC_MESSAGE = 7;
118 54 }
... ...
... ... @@ -58,6 +58,8 @@ cluster:
58 58 hash_function_name: "${CLUSTER_HASH_FUNCTION_NAME:murmur3_128}"
59 59 # Amount of virtual nodes in consistent hash ring.
60 60 vitrual_nodes_size: "${CLUSTER_VIRTUAL_NODES_SIZE:16}"
  61 + # Queue partition id for current node
  62 + partition_id: "${QUEUE_PARTITION_ID:0}"
61 63
62 64 # Plugins configuration parameters
63 65 plugins:
... ... @@ -308,6 +310,7 @@ rule:
308 310 max_size: 10000
309 311
310 312
  313 +
311 314 # PostgreSQL DAO Configuration
312 315 #spring:
313 316 # data:
... ...
  1 +package org.thingsboard.server.mqtt;
  2 +
  3 +import org.junit.rules.TestRule;
  4 +import org.junit.runner.Description;
  5 +import org.junit.runners.model.Statement;
  6 +
  7 +/**
  8 + * Created by ashvayka on 11.05.18.
  9 + */
  10 +public class DbConfigurationTestRule implements TestRule {
  11 +
  12 + @Override
  13 + public Statement apply(Statement base, Description description) {
  14 + return null;
  15 + }
  16 +}
... ...
... ... @@ -7,18 +7,6 @@ services:
7 7 ports:
8 8 - "2181:2181"
9 9
10   - postgres-tb:
11   - image: postgres
12   - command: postgres -c 'max_connections=500'
13   - environment:
14   - - POSTGRES_USER=postgres
15   - - POSTGRES_PASSWORD=postgres
16   - - POSTGRES_DB=thingsboard
17   - networks:
18   - - core
19   - ports:
20   - - "5432:5432"
21   -
22 10 cassandra-tb:
23 11 image: cassandra
24 12 networks:
... ... @@ -36,7 +24,6 @@ services:
36 24 ports:
37 25 - "6379:6379"
38 26
39   -
40 27 networks:
41 28 core:
42 29
... ...
... ... @@ -26,7 +26,7 @@ import com.datastax.driver.core.Session;
26 26 import java.util.List;
27 27
28 28 public class CustomCassandraCQLUnit extends BaseCassandraUnit {
29   - private List<CQLDataSet> dataSets;
  29 + protected List<CQLDataSet> dataSets;
30 30
31 31 public Session session;
32 32 public Cluster cluster;
... ...