Commit 2637babfe3e06ee31013c08c7b83b6475d16dbce

Authored by Andrew Shvayka
1 parent f0bccc7c

Mqtt transport implementation: POST telemetry, attributes

Showing 34 changed files with 738 additions and 591 deletions
@@ -200,10 +200,6 @@ public class ActorSystemContext { @@ -200,10 +200,6 @@ public class ActorSystemContext {
200 200
201 @Autowired 201 @Autowired
202 @Getter 202 @Getter
203 - private MsgQueueService msgQueueService;  
204 -  
205 - @Autowired  
206 - @Getter  
207 private DeviceStateService deviceStateService; 203 private DeviceStateService deviceStateService;
208 204
209 @Lazy 205 @Lazy
@@ -269,10 +265,6 @@ public class ActorSystemContext { @@ -269,10 +265,6 @@ public class ActorSystemContext {
269 265
270 @Getter 266 @Getter
271 @Setter 267 @Setter
272 - private ActorRef sessionManagerActor;  
273 -  
274 - @Getter  
275 - @Setter  
276 private ActorRef statsActor; 268 private ActorRef statsActor;
277 269
278 @Getter 270 @Getter
@@ -38,7 +38,6 @@ import org.thingsboard.server.common.msg.TbActorMsg; @@ -38,7 +38,6 @@ import org.thingsboard.server.common.msg.TbActorMsg;
38 import org.thingsboard.server.common.msg.aware.TenantAwareMsg; 38 import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
39 import org.thingsboard.server.common.msg.cluster.SendToClusterMsg; 39 import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
40 import org.thingsboard.server.common.msg.cluster.ServerAddress; 40 import org.thingsboard.server.common.msg.cluster.ServerAddress;
41 -import org.thingsboard.server.common.msg.core.BasicActorSystemToDeviceSessionActorMsg;  
42 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; 41 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
43 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg; 42 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
44 import org.thingsboard.server.dao.model.ModelConstants; 43 import org.thingsboard.server.dao.model.ModelConstants;
@@ -113,19 +112,12 @@ public class AppActor extends RuleChainManagerActor { @@ -113,19 +112,12 @@ public class AppActor extends RuleChainManagerActor {
113 case REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG: 112 case REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG:
114 onToDeviceActorMsg((TenantAwareMsg) msg); 113 onToDeviceActorMsg((TenantAwareMsg) msg);
115 break; 114 break;
116 - case ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG:  
117 - onToDeviceSessionMsg((BasicActorSystemToDeviceSessionActorMsg) msg);  
118 - break;  
119 default: 115 default:
120 return false; 116 return false;
121 } 117 }
122 return true; 118 return true;
123 } 119 }
124 120
125 - private void onToDeviceSessionMsg(BasicActorSystemToDeviceSessionActorMsg msg) {  
126 - systemContext.getSessionManagerActor().tell(msg, self());  
127 - }  
128 -  
129 private void onPossibleClusterMsg(SendToClusterMsg msg) { 121 private void onPossibleClusterMsg(SendToClusterMsg msg) {
130 Optional<ServerAddress> address = systemContext.getRoutingService().resolveById(msg.getEntityId()); 122 Optional<ServerAddress> address = systemContext.getRoutingService().resolveById(msg.getEntityId());
131 if (address.isPresent()) { 123 if (address.isPresent()) {
@@ -27,7 +27,6 @@ import org.thingsboard.server.common.data.id.TenantId; @@ -27,7 +27,6 @@ import org.thingsboard.server.common.data.id.TenantId;
27 import org.thingsboard.server.common.msg.TbActorMsg; 27 import org.thingsboard.server.common.msg.TbActorMsg;
28 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; 28 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
29 import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg; 29 import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg;
30 -import org.thingsboard.server.common.msg.timeout.DeviceActorQueueTimeoutMsg;  
31 import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg; 30 import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
32 import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; 31 import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
33 import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg; 32 import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
@@ -74,12 +73,6 @@ public class DeviceActor extends ContextAwareActor { @@ -74,12 +73,6 @@ public class DeviceActor extends ContextAwareActor {
74 case DEVICE_ACTOR_CLIENT_SIDE_RPC_TIMEOUT_MSG: 73 case DEVICE_ACTOR_CLIENT_SIDE_RPC_TIMEOUT_MSG:
75 processor.processClientSideRpcTimeout(context(), (DeviceActorClientSideRpcTimeoutMsg) msg); 74 processor.processClientSideRpcTimeout(context(), (DeviceActorClientSideRpcTimeoutMsg) msg);
76 break; 75 break;
77 - case DEVICE_ACTOR_QUEUE_TIMEOUT_MSG:  
78 - processor.processQueueTimeout(context(), (DeviceActorQueueTimeoutMsg) msg);  
79 - break;  
80 - case RULE_ENGINE_QUEUE_PUT_ACK_MSG:  
81 - processor.processQueueAck(context(), (RuleEngineQueuePutAckMsg) msg);  
82 - break;  
83 default: 76 default:
84 return false; 77 return false;
85 } 78 }
1 /** 1 /**
2 * Copyright © 2016-2018 The Thingsboard Authors 2 * Copyright © 2016-2018 The Thingsboard Authors
3 - * <p> 3 + *
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 - * <p>  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * <p> 7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,7 +16,6 @@ @@ -16,7 +16,6 @@
16 package org.thingsboard.server.actors.device; 16 package org.thingsboard.server.actors.device;
17 17
18 import akka.actor.ActorContext; 18 import akka.actor.ActorContext;
19 -import akka.actor.ActorRef;  
20 import akka.event.LoggingAdapter; 19 import akka.event.LoggingAdapter;
21 import com.datastax.driver.core.utils.UUIDs; 20 import com.datastax.driver.core.utils.UUIDs;
22 import com.google.common.util.concurrent.FutureCallback; 21 import com.google.common.util.concurrent.FutureCallback;
@@ -46,29 +45,15 @@ import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; @@ -46,29 +45,15 @@ import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
46 import org.thingsboard.server.common.msg.cluster.ServerAddress; 45 import org.thingsboard.server.common.msg.cluster.ServerAddress;
47 import org.thingsboard.server.common.msg.core.ActorSystemToDeviceSessionActorMsg; 46 import org.thingsboard.server.common.msg.core.ActorSystemToDeviceSessionActorMsg;
48 import org.thingsboard.server.common.msg.core.AttributesUpdateNotification; 47 import org.thingsboard.server.common.msg.core.AttributesUpdateNotification;
49 -import org.thingsboard.server.common.msg.core.AttributesUpdateRequest;  
50 -import org.thingsboard.server.common.msg.core.BasicActorSystemToDeviceSessionActorMsg;  
51 -import org.thingsboard.server.common.msg.core.BasicCommandAckResponse;  
52 -import org.thingsboard.server.common.msg.core.BasicGetAttributesResponse;  
53 -import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse;  
54 -import org.thingsboard.server.common.msg.core.GetAttributesRequest;  
55 import org.thingsboard.server.common.msg.core.RuleEngineError; 48 import org.thingsboard.server.common.msg.core.RuleEngineError;
56 import org.thingsboard.server.common.msg.core.RuleEngineErrorMsg; 49 import org.thingsboard.server.common.msg.core.RuleEngineErrorMsg;
57 -import org.thingsboard.server.common.msg.core.SessionCloseMsg;  
58 -import org.thingsboard.server.common.msg.core.SessionCloseNotification;  
59 -import org.thingsboard.server.common.msg.core.SessionOpenMsg;  
60 -import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;  
61 import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg; 50 import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
62 -import org.thingsboard.server.common.msg.core.ToDeviceRpcResponseMsg;  
63 -import org.thingsboard.server.common.msg.core.ToServerRpcRequestMsg;  
64 import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg; 51 import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg;
65 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; 52 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
66 -import org.thingsboard.server.common.msg.session.FromDeviceMsg;  
67 import org.thingsboard.server.common.msg.session.SessionMsgType; 53 import org.thingsboard.server.common.msg.session.SessionMsgType;
68 import org.thingsboard.server.common.msg.session.SessionType; 54 import org.thingsboard.server.common.msg.session.SessionType;
69 import org.thingsboard.server.common.msg.session.ToDeviceMsg; 55 import org.thingsboard.server.common.msg.session.ToDeviceMsg;
70 import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg; 56 import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg;
71 -import org.thingsboard.server.common.msg.timeout.DeviceActorQueueTimeoutMsg;  
72 import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg; 57 import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
73 import org.thingsboard.server.gen.transport.TransportProtos; 58 import org.thingsboard.server.gen.transport.TransportProtos;
74 import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; 59 import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
@@ -88,9 +73,7 @@ import java.util.Map; @@ -88,9 +73,7 @@ import java.util.Map;
88 import java.util.Optional; 73 import java.util.Optional;
89 import java.util.Set; 74 import java.util.Set;
90 import java.util.UUID; 75 import java.util.UUID;
91 -import java.util.concurrent.TimeoutException;  
92 import java.util.function.Consumer; 76 import java.util.function.Consumer;
93 -import java.util.function.Predicate;  
94 import java.util.stream.Collectors; 77 import java.util.stream.Collectors;
95 78
96 import org.thingsboard.server.gen.transport.TransportProtos.*; 79 import org.thingsboard.server.gen.transport.TransportProtos.*;
@@ -192,19 +175,6 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -192,19 +175,6 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
192 } 175 }
193 } 176 }
194 177
195 - void processQueueAck(ActorContext context, RuleEngineQueuePutAckMsg msg) {  
196 - PendingSessionMsgData data = pendingMsgs.remove(msg.getId());  
197 - if (data != null && data.isReplyOnQueueAck()) {  
198 - int remainingAcks = data.getAckMsgCount() - 1;  
199 - data.setAckMsgCount(remainingAcks);  
200 - logger.debug("[{}] Queue put [{}] ack detected. Remaining acks: {}!", deviceId, msg.getId(), remainingAcks);  
201 - if (remainingAcks == 0) {  
202 - ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onSuccess(data.getSessionMsgType(), data.getRequestId());  
203 - sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());  
204 - }  
205 - }  
206 - }  
207 -  
208 private void sendPendingRequests(ActorContext context, SessionId sessionId, SessionType type, Optional<ServerAddress> server) { 178 private void sendPendingRequests(ActorContext context, SessionId sessionId, SessionType type, Optional<ServerAddress> server) {
209 if (!toDeviceRpcPendingMap.isEmpty()) { 179 if (!toDeviceRpcPendingMap.isEmpty()) {
210 logger.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId); 180 logger.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId);
@@ -239,8 +209,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -239,8 +209,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
239 body.getMethod(), 209 body.getMethod(),
240 body.getParams() 210 body.getParams()
241 ); 211 );
242 - ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sessionId);  
243 - sendMsgToSessionActor(response, server); 212 +// ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sessionId);
  213 +// sendMsgToSessionActor(response, server);
244 }; 214 };
245 } 215 }
246 216
@@ -292,57 +262,25 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -292,57 +262,25 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
292 private void handleGetAttributesRequest(ActorContext context, SessionInfoProto sessionInfo, GetAttributeRequestMsg request) { 262 private void handleGetAttributesRequest(ActorContext context, SessionInfoProto sessionInfo, GetAttributeRequestMsg request) {
293 ListenableFuture<List<AttributeKvEntry>> clientAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.CLIENT_SCOPE, toOptionalSet(request.getClientAttributeNamesList())); 263 ListenableFuture<List<AttributeKvEntry>> clientAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.CLIENT_SCOPE, toOptionalSet(request.getClientAttributeNamesList()));
294 ListenableFuture<List<AttributeKvEntry>> sharedAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.SHARED_SCOPE, toOptionalSet(request.getSharedAttributeNamesList())); 264 ListenableFuture<List<AttributeKvEntry>> sharedAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.SHARED_SCOPE, toOptionalSet(request.getSharedAttributeNamesList()));
295 -  
296 - Futures.addCallback(Futures.allAsList(Arrays.asList(clientAttributesFuture, sharedAttributesFuture)), new FutureCallback<List<List<AttributeKvEntry>>>() {  
297 - @Override  
298 - public void onSuccess(@Nullable List<List<AttributeKvEntry>> result) {  
299 - systemContext.getRuleEngineTransportService().process();  
300 - BasicGetAttributesResponse response = BasicGetAttributesResponse.onSuccess(request.getMsgType(),  
301 - request.getRequestId(), BasicAttributeKVMsg.from(result.get(0), result.get(1)));  
302 - sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(response, src.getSessionId()), src.getServerAddress());  
303 - }  
304 -  
305 - @Override  
306 - public void onFailure(Throwable t) {  
307 - if (t instanceof Exception) {  
308 - ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onError(SessionMsgType.GET_ATTRIBUTES_REQUEST, request.getRequestId(), (Exception) t);  
309 - sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, src.getSessionId()), src.getServerAddress());  
310 - } else {  
311 - logger.error("[{}] Failed to process attributes request", deviceId, t);  
312 - }  
313 - }  
314 - });  
315 - }  
316 -  
317 - private Optional<Set<String>> toOptionalSet(List<String> strings) {  
318 - if (strings == null || strings.isEmpty()) {  
319 - return Optional.empty();  
320 - } else {  
321 - return Optional.of(new HashSet<>(strings));  
322 - }  
323 - }  
324 -  
325 - private void handleGetAttributesRequest(DeviceToDeviceActorMsg src) {  
326 - GetAttributesRequest request = (GetAttributesRequest) src.getPayload();  
327 - ListenableFuture<List<AttributeKvEntry>> clientAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.CLIENT_SCOPE, request.getClientAttributeNames());  
328 - ListenableFuture<List<AttributeKvEntry>> sharedAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.SHARED_SCOPE, request.getSharedAttributeNames());  
329 - 265 + UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
  266 + int requestId = request.getRequestId();
330 Futures.addCallback(Futures.allAsList(Arrays.asList(clientAttributesFuture, sharedAttributesFuture)), new FutureCallback<List<List<AttributeKvEntry>>>() { 267 Futures.addCallback(Futures.allAsList(Arrays.asList(clientAttributesFuture, sharedAttributesFuture)), new FutureCallback<List<List<AttributeKvEntry>>>() {
331 @Override 268 @Override
332 public void onSuccess(@Nullable List<List<AttributeKvEntry>> result) { 269 public void onSuccess(@Nullable List<List<AttributeKvEntry>> result) {
333 - BasicGetAttributesResponse response = BasicGetAttributesResponse.onSuccess(request.getMsgType(),  
334 - request.getRequestId(), BasicAttributeKVMsg.from(result.get(0), result.get(1)));  
335 - sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(response, src.getSessionId()), src.getServerAddress()); 270 + GetAttributeResponseMsg responseMsg = GetAttributeResponseMsg.newBuilder()
  271 + .setRequestId(requestId)
  272 + .addAllClientAttributeList(toTsKvProtos(result.get(0)))
  273 + .addAllSharedAttributeList(toTsKvProtos(result.get(1)))
  274 + .build();
  275 + sendToTransport(responseMsg, sessionId, sessionInfo);
336 } 276 }
337 277
338 @Override 278 @Override
339 public void onFailure(Throwable t) { 279 public void onFailure(Throwable t) {
340 - if (t instanceof Exception) {  
341 - ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onError(SessionMsgType.GET_ATTRIBUTES_REQUEST, request.getRequestId(), (Exception) t);  
342 - sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, src.getSessionId()), src.getServerAddress());  
343 - } else {  
344 - logger.error("[{}] Failed to process attributes request", deviceId, t);  
345 - } 280 + GetAttributeResponseMsg responseMsg = GetAttributeResponseMsg.newBuilder()
  281 + .setError(t.getMessage())
  282 + .build();
  283 + sendToTransport(responseMsg, sessionId, sessionInfo);
346 } 284 }
347 }); 285 });
348 } 286 }
@@ -376,36 +314,36 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -376,36 +314,36 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
376 } 314 }
377 } 315 }
378 316
379 - private void handleClientSideRPCRequest(ActorContext context, DeviceToDeviceActorMsg src) {  
380 - ToServerRpcRequestMsg request = (ToServerRpcRequestMsg) src.getPayload();  
381 -  
382 - JsonObject json = new JsonObject();  
383 - json.addProperty("method", request.getMethod());  
384 - json.add("params", jsonParser.parse(request.getParams()));  
385 -  
386 - TbMsgMetaData requestMetaData = defaultMetaData.copy();  
387 - requestMetaData.putValue("requestId", Integer.toString(request.getRequestId()));  
388 - TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, requestMetaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);  
389 - PendingSessionMsgData msgData = new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(), SessionMsgType.TO_SERVER_RPC_REQUEST, request.getRequestId(), false, 1);  
390 - pushToRuleEngineWithTimeout(context, tbMsg, msgData);  
391 -  
392 - scheduleMsgWithDelay(context, new DeviceActorClientSideRpcTimeoutMsg(request.getRequestId(), systemContext.getClientSideRpcTimeout()), systemContext.getClientSideRpcTimeout());  
393 - toServerRpcPendingMap.put(request.getRequestId(), new ToServerRpcRequestMetadata(src.getSessionId(), src.getSessionType(), src.getServerAddress()));  
394 - } 317 +// private void handleClientSideRPCRequest(ActorContext context, DeviceToDeviceActorMsg src) {
  318 +// ToServerRpcRequestMsg request = (ToServerRpcRequestMsg) src.getPayload();
  319 +//
  320 +// JsonObject json = new JsonObject();
  321 +// json.addProperty("method", request.getMethod());
  322 +// json.add("params", jsonParser.parse(request.getParams()));
  323 +//
  324 +// TbMsgMetaData requestMetaData = defaultMetaData.copy();
  325 +// requestMetaData.putValue("requestId", Integer.toString(request.getRequestId()));
  326 +// TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, requestMetaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
  327 +// PendingSessionMsgData msgData = new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(), SessionMsgType.TO_SERVER_RPC_REQUEST, request.getRequestId(), false, 1);
  328 +// pushToRuleEngineWithTimeout(context, tbMsg, msgData);
  329 +//
  330 +// scheduleMsgWithDelay(context, new DeviceActorClientSideRpcTimeoutMsg(request.getRequestId(), systemContext.getClientSideRpcTimeout()), systemContext.getClientSideRpcTimeout());
  331 +// toServerRpcPendingMap.put(request.getRequestId(), new ToServerRpcRequestMetadata(src.getSessionId(), src.getSessionType(), src.getServerAddress()));
  332 +// }
395 333
396 public void processClientSideRpcTimeout(ActorContext context, DeviceActorClientSideRpcTimeoutMsg msg) { 334 public void processClientSideRpcTimeout(ActorContext context, DeviceActorClientSideRpcTimeoutMsg msg) {
397 ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getId()); 335 ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getId());
398 if (data != null) { 336 if (data != null) {
399 logger.debug("[{}] Client side RPC request [{}] timeout detected!", deviceId, msg.getId()); 337 logger.debug("[{}] Client side RPC request [{}] timeout detected!", deviceId, msg.getId());
400 ToDeviceMsg toDeviceMsg = new RuleEngineErrorMsg(SessionMsgType.TO_SERVER_RPC_REQUEST, RuleEngineError.TIMEOUT); 338 ToDeviceMsg toDeviceMsg = new RuleEngineErrorMsg(SessionMsgType.TO_SERVER_RPC_REQUEST, RuleEngineError.TIMEOUT);
401 - sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServer()); 339 +// sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServer());
402 } 340 }
403 } 341 }
404 342
405 void processToServerRPCResponse(ActorContext context, ToServerRpcResponseActorMsg msg) { 343 void processToServerRPCResponse(ActorContext context, ToServerRpcResponseActorMsg msg) {
406 ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getMsg().getRequestId()); 344 ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getMsg().getRequestId());
407 if (data != null) { 345 if (data != null) {
408 - sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(msg.getMsg(), data.getSessionId()), data.getServer()); 346 +// sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(msg.getMsg(), data.getSessionId()), data.getServer());
409 } 347 }
410 } 348 }
411 349
@@ -433,68 +371,68 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -433,68 +371,68 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
433 } 371 }
434 if (notification != null) { 372 if (notification != null) {
435 ToDeviceMsg finalNotification = notification; 373 ToDeviceMsg finalNotification = notification;
436 - attributeSubscriptions.entrySet().forEach(sub -> {  
437 - ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(finalNotification, sub.getKey());  
438 - sendMsgToSessionActor(response, sub.getValue().getServer());  
439 - }); 374 +// attributeSubscriptions.entrySet().forEach(sub -> {
  375 +// ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(finalNotification, sub.getKey());
  376 +// sendMsgToSessionActor(response, sub.getValue().getServer());
  377 +// });
440 } 378 }
441 } else { 379 } else {
442 logger.debug("[{}] No registered attributes subscriptions to process!", deviceId); 380 logger.debug("[{}] No registered attributes subscriptions to process!", deviceId);
443 } 381 }
444 } 382 }
445 383
446 - private void processRpcResponses(ActorContext context, DeviceToDeviceActorMsg msg) {  
447 - SessionId sessionId = msg.getSessionId();  
448 - FromDeviceMsg inMsg = msg.getPayload();  
449 - if (inMsg.getMsgType() == SessionMsgType.TO_DEVICE_RPC_RESPONSE) {  
450 - logger.debug("[{}] Processing rpc command response [{}]", deviceId, sessionId);  
451 - ToDeviceRpcResponseMsg responseMsg = (ToDeviceRpcResponseMsg) inMsg;  
452 - ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());  
453 - boolean success = requestMd != null;  
454 - if (success) {  
455 - systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),  
456 - requestMd.getMsg().getServerAddress(), responseMsg.getData(), null));  
457 - } else {  
458 - logger.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());  
459 - }  
460 - if (msg.getSessionType() == SessionType.SYNC) {  
461 - BasicCommandAckResponse response = success  
462 - ? BasicCommandAckResponse.onSuccess(SessionMsgType.TO_DEVICE_RPC_REQUEST, responseMsg.getRequestId())  
463 - : BasicCommandAckResponse.onError(SessionMsgType.TO_DEVICE_RPC_REQUEST, responseMsg.getRequestId(), new TimeoutException());  
464 - sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(response, msg.getSessionId()), msg.getServerAddress());  
465 - }  
466 - }  
467 - } 384 +// private void processRpcResponses(ActorContext context, DeviceToDeviceActorMsg msg) {
  385 +// SessionId sessionId = msg.getSessionId();
  386 +// FromDeviceMsg inMsg = msg.getPayload();
  387 +// if (inMsg.getMsgType() == SessionMsgType.TO_DEVICE_RPC_RESPONSE) {
  388 +// logger.debug("[{}] Processing rpc command response [{}]", deviceId, sessionId);
  389 +// ToDeviceRpcResponseMsg responseMsg = (ToDeviceRpcResponseMsg) inMsg;
  390 +// ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
  391 +// boolean success = requestMd != null;
  392 +// if (success) {
  393 +// systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
  394 +// requestMd.getMsg().getServerAddress(), responseMsg.getData(), null));
  395 +// } else {
  396 +// logger.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
  397 +// }
  398 +// if (msg.getSessionType() == SessionType.SYNC) {
  399 +// BasicCommandAckResponse response = success
  400 +// ? BasicCommandAckResponse.onSuccess(SessionMsgType.TO_DEVICE_RPC_REQUEST, responseMsg.getRequestId())
  401 +// : BasicCommandAckResponse.onError(SessionMsgType.TO_DEVICE_RPC_REQUEST, responseMsg.getRequestId(), new TimeoutException());
  402 +// sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(response, msg.getSessionId()), msg.getServerAddress());
  403 +// }
  404 +// }
  405 +// }
468 406
469 void processClusterEventMsg(ClusterEventMsg msg) { 407 void processClusterEventMsg(ClusterEventMsg msg) {
470 - if (!msg.isAdded()) {  
471 - logger.debug("[{}] Clearing attributes/rpc subscription for server [{}]", deviceId, msg.getServerAddress());  
472 - Predicate<Map.Entry<SessionId, SessionInfo>> filter = e -> e.getValue().getServer()  
473 - .map(serverAddress -> serverAddress.equals(msg.getServerAddress())).orElse(false);  
474 - attributeSubscriptions.entrySet().removeIf(filter);  
475 - rpcSubscriptions.entrySet().removeIf(filter);  
476 - } 408 +// if (!msg.isAdded()) {
  409 +// logger.debug("[{}] Clearing attributes/rpc subscription for server [{}]", deviceId, msg.getServerAddress());
  410 +// Predicate<Map.Entry<SessionId, SessionInfo>> filter = e -> e.getValue().getServer()
  411 +// .map(serverAddress -> serverAddress.equals(msg.getServerAddress())).orElse(false);
  412 +// attributeSubscriptions.entrySet().removeIf(filter);
  413 +// rpcSubscriptions.entrySet().removeIf(filter);
  414 +// }
477 } 415 }
478 416
479 - private void processSubscriptionCommands(ActorContext context, DeviceToDeviceActorMsg msg) {  
480 - SessionId sessionId = msg.getSessionId();  
481 - SessionType sessionType = msg.getSessionType();  
482 - FromDeviceMsg inMsg = msg.getPayload();  
483 - if (inMsg.getMsgType() == SessionMsgType.SUBSCRIBE_ATTRIBUTES_REQUEST) {  
484 - logger.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId);  
485 - attributeSubscriptions.put(sessionId, new SessionInfo(sessionType, msg.getServerAddress()));  
486 - } else if (inMsg.getMsgType() == SessionMsgType.UNSUBSCRIBE_ATTRIBUTES_REQUEST) {  
487 - logger.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId);  
488 - attributeSubscriptions.remove(sessionId);  
489 - } else if (inMsg.getMsgType() == SessionMsgType.SUBSCRIBE_RPC_COMMANDS_REQUEST) {  
490 - logger.debug("[{}] Registering rpc subscription for session [{}][{}]", deviceId, sessionId, sessionType);  
491 - rpcSubscriptions.put(sessionId, new SessionInfo(sessionType, msg.getServerAddress()));  
492 - sendPendingRequests(context, sessionId, sessionType, msg.getServerAddress());  
493 - } else if (inMsg.getMsgType() == SessionMsgType.UNSUBSCRIBE_RPC_COMMANDS_REQUEST) {  
494 - logger.debug("[{}] Canceling rpc subscription for session [{}][{}]", deviceId, sessionId, sessionType);  
495 - rpcSubscriptions.remove(sessionId);  
496 - }  
497 - } 417 +// private void processSubscriptionCommands(ActorContext context, DeviceToDeviceActorMsg msg) {
  418 +// SessionId sessionId = msg.getSessionId();
  419 +// SessionType sessionType = msg.getSessionType();
  420 +// FromDeviceMsg inMsg = msg.getPayload();
  421 +// if (inMsg.getMsgType() == SessionMsgType.SUBSCRIBE_ATTRIBUTES_REQUEST) {
  422 +// logger.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId);
  423 +// attributeSubscriptions.put(sessionId, new SessionInfo(sessionType, msg.getServerAddress()));
  424 +// } else if (inMsg.getMsgType() == SessionMsgType.UNSUBSCRIBE_ATTRIBUTES_REQUEST) {
  425 +// logger.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId);
  426 +// attributeSubscriptions.remove(sessionId);
  427 +// } else if (inMsg.getMsgType() == SessionMsgType.SUBSCRIBE_RPC_COMMANDS_REQUEST) {
  428 +// logger.debug("[{}] Registering rpc subscription for session [{}][{}]", deviceId, sessionId, sessionType);
  429 +// rpcSubscriptions.put(sessionId, new SessionInfo(sessionType, msg.getServerAddress()));
  430 +// sendPendingRequests(context, sessionId, sessionType, msg.getServerAddress());
  431 +// } else if (inMsg.getMsgType() == SessionMsgType.UNSUBSCRIBE_RPC_COMMANDS_REQUEST) {
  432 +// logger.debug("[{}] Canceling rpc subscription for session [{}][{}]", deviceId, sessionId, sessionType);
  433 +// rpcSubscriptions.remove(sessionId);
  434 +// }
  435 +// }
498 436
499 private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) { 437 private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) {
500 UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()); 438 UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
@@ -506,15 +444,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -506,15 +444,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
506 closeSession(sessionIdToRemove, sessions.remove(sessionIdToRemove)); 444 closeSession(sessionIdToRemove, sessions.remove(sessionIdToRemove));
507 } 445 }
508 } 446 }
509 - sessions.put(sessionId, new SessionInfo(SessionType.ASYNC, msg.getServerAddress())); 447 + sessions.put(sessionId, new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfo.getNodeId()));
510 if (sessions.size() == 1) { 448 if (sessions.size() == 1) {
511 reportSessionOpen(); 449 reportSessionOpen();
512 } 450 }
513 - }  
514 - FromDeviceMsg inMsg = msg.getPayload();  
515 - if (inMsg instanceof SessionOpenMsg) {  
516 - logger.debug("[{}] Processing new session [{}]", deviceId, sessionId);  
517 - } else if (inMsg instanceof SessionCloseMsg) { 451 + } else if (msg.getEvent() == SessionEvent.CLOSED) {
518 logger.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId); 452 logger.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId);
519 sessions.remove(sessionId); 453 sessions.remove(sessionId);
520 attributeSubscriptions.remove(sessionId); 454 attributeSubscriptions.remove(sessionId);
@@ -532,7 +466,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -532,7 +466,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
532 systemContext.getRpcService().tell(systemContext.getEncodingService() 466 systemContext.getRpcService().tell(systemContext.getEncodingService()
533 .convertToProtoDataMessage(sessionAddress.get(), response)); 467 .convertToProtoDataMessage(sessionAddress.get(), response));
534 } else { 468 } else {
535 - systemContext.getSessionManagerActor().tell(response, ActorRef.noSender()); 469 +// systemContext.getSessionManagerActor().tell(response, ActorRef.noSender());
536 } 470 }
537 } 471 }
538 472
@@ -578,4 +512,62 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -578,4 +512,62 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
578 } 512 }
579 return json; 513 return json;
580 } 514 }
  515 +
  516 + private Optional<Set<String>> toOptionalSet(List<String> strings) {
  517 + if (strings == null || strings.isEmpty()) {
  518 + return Optional.empty();
  519 + } else {
  520 + return Optional.of(new HashSet<>(strings));
  521 + }
  522 + }
  523 +
  524 + private void sendToTransport(GetAttributeResponseMsg responseMsg, UUID sessionId, SessionInfoProto sessionInfo) {
  525 + DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder()
  526 + .setSessionIdMSB(sessionId.getMostSignificantBits())
  527 + .setSessionIdLSB(sessionId.getLeastSignificantBits())
  528 + .setGetAttributesResponse(responseMsg).build();
  529 + systemContext.getRuleEngineTransportService().process(sessionInfo.getNodeId(), msg);
  530 + }
  531 +
  532 + private List<TsKvProto> toTsKvProtos(@Nullable List<AttributeKvEntry> result) {
  533 + List<TsKvProto> clientAttributes;
  534 + if (result == null || result.isEmpty()) {
  535 + clientAttributes = Collections.emptyList();
  536 + } else {
  537 + clientAttributes = new ArrayList<>(result.size());
  538 + for (AttributeKvEntry attrEntry : result) {
  539 + clientAttributes.add(toTsKvProto(attrEntry));
  540 + }
  541 + }
  542 + return clientAttributes;
  543 + }
  544 +
  545 + private TsKvProto toTsKvProto(AttributeKvEntry attrEntry) {
  546 + return TsKvProto.newBuilder().setTs(attrEntry.getLastUpdateTs())
  547 + .setKv(toKeyValueProto(attrEntry)).build();
  548 + }
  549 +
  550 + private KeyValueProto toKeyValueProto(KvEntry kvEntry) {
  551 + KeyValueProto.Builder builder = KeyValueProto.newBuilder();
  552 + builder.setKey(kvEntry.getKey());
  553 + switch (kvEntry.getDataType()) {
  554 + case BOOLEAN:
  555 + builder.setType(KeyValueType.BOOLEAN_V);
  556 + builder.setBoolV(kvEntry.getBooleanValue().get());
  557 + break;
  558 + case DOUBLE:
  559 + builder.setType(KeyValueType.DOUBLE_V);
  560 + builder.setDoubleV(kvEntry.getDoubleValue().get());
  561 + break;
  562 + case LONG:
  563 + builder.setType(KeyValueType.LONG_V);
  564 + builder.setLongV(kvEntry.getLongValue().get());
  565 + break;
  566 + case STRING:
  567 + builder.setType(KeyValueType.STRING_V);
  568 + builder.setStringV(kvEntry.getStrValue().get());
  569 + break;
  570 + }
  571 + return builder.build();
  572 + }
581 } 573 }
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.actors.device;  
17 -  
18 -import lombok.Data;  
19 -import org.thingsboard.server.common.msg.MsgType;  
20 -import org.thingsboard.server.common.msg.TbActorMsg;  
21 -  
22 -import java.util.UUID;  
23 -  
24 -/**  
25 - * Created by ashvayka on 15.03.18.  
26 - */  
27 -@Data  
28 -public final class RuleEngineQueuePutAckMsg implements TbActorMsg {  
29 -  
30 - private final UUID id;  
31 -  
32 - @Override  
33 - public MsgType getMsgType() {  
34 - return MsgType.RULE_ENGINE_QUEUE_PUT_ACK_MSG;  
35 - }  
36 -}  
1 /** 1 /**
2 * Copyright © 2016-2018 The Thingsboard Authors 2 * Copyright © 2016-2018 The Thingsboard Authors
3 - * <p> 3 + *
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 - * <p>  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * <p> 7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -25,7 +25,6 @@ import java.util.Optional; @@ -25,7 +25,6 @@ import java.util.Optional;
25 25
26 import org.thingsboard.server.actors.ActorSystemContext; 26 import org.thingsboard.server.actors.ActorSystemContext;
27 import org.thingsboard.server.actors.device.DeviceActorToRuleEngineMsg; 27 import org.thingsboard.server.actors.device.DeviceActorToRuleEngineMsg;
28 -import org.thingsboard.server.actors.device.RuleEngineQueuePutAckMsg;  
29 import org.thingsboard.server.actors.service.DefaultActorService; 28 import org.thingsboard.server.actors.service.DefaultActorService;
30 import org.thingsboard.server.actors.shared.ComponentMsgProcessor; 29 import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
31 import org.thingsboard.server.common.data.EntityType; 30 import org.thingsboard.server.common.data.EntityType;
@@ -90,26 +89,12 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh @@ -90,26 +89,12 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
90 nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode)); 89 nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
91 } 90 }
92 initRoutes(ruleChain, ruleNodeList); 91 initRoutes(ruleChain, ruleNodeList);
93 - reprocess(ruleNodeList);  
94 started = true; 92 started = true;
95 } else { 93 } else {
96 onUpdate(context); 94 onUpdate(context);
97 } 95 }
98 } 96 }
99 97
100 - private void reprocess(List<RuleNode> ruleNodeList) {  
101 - for (RuleNode ruleNode : ruleNodeList) {  
102 - for (TbMsg tbMsg : queue.findUnprocessed(tenantId, ruleNode.getId().getId(), systemContext.getQueuePartitionId())) {  
103 - pushMsgToNode(nodeActors.get(ruleNode.getId()), tbMsg, "");  
104 - }  
105 - }  
106 - if (firstNode != null) {  
107 - for (TbMsg tbMsg : queue.findUnprocessed(tenantId, entityId.getId(), systemContext.getQueuePartitionId())) {  
108 - pushMsgToNode(firstNode, tbMsg, "");  
109 - }  
110 - }  
111 - }  
112 -  
113 @Override 98 @Override
114 public void onUpdate(ActorContext context) throws Exception { 99 public void onUpdate(ActorContext context) throws Exception {
115 RuleChain ruleChain = service.findRuleChainById(entityId); 100 RuleChain ruleChain = service.findRuleChainById(entityId);
@@ -134,7 +119,6 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh @@ -134,7 +119,6 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
134 }); 119 });
135 120
136 initRoutes(ruleChain, ruleNodeList); 121 initRoutes(ruleChain, ruleNodeList);
137 - reprocess(ruleNodeList);  
138 } 122 }
139 123
140 @Override 124 @Override
@@ -188,17 +172,14 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh @@ -188,17 +172,14 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
188 void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) { 172 void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) {
189 checkActive(); 173 checkActive();
190 if (firstNode != null) { 174 if (firstNode != null) {
191 - putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> pushMsgToNode(firstNode, msg, "")); 175 + pushMsgToNode(firstNode, enrichWithRuleChainId(envelope.getTbMsg()), "");
192 } 176 }
193 } 177 }
194 178
195 void onDeviceActorToRuleEngineMsg(DeviceActorToRuleEngineMsg envelope) { 179 void onDeviceActorToRuleEngineMsg(DeviceActorToRuleEngineMsg envelope) {
196 checkActive(); 180 checkActive();
197 if (firstNode != null) { 181 if (firstNode != null) {
198 - putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> {  
199 - pushMsgToNode(firstNode, msg, "");  
200 - envelope.getCallbackRef().tell(new RuleEngineQueuePutAckMsg(msg.getId()), self);  
201 - }); 182 + pushMsgToNode(firstNode, enrichWithRuleChainId(envelope.getTbMsg()), "");
202 } 183 }
203 } 184 }
204 185
@@ -206,15 +187,16 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh @@ -206,15 +187,16 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
206 checkActive(); 187 checkActive();
207 if (envelope.isEnqueue()) { 188 if (envelope.isEnqueue()) {
208 if (firstNode != null) { 189 if (firstNode != null) {
209 - putToQueue(enrichWithRuleChainId(envelope.getMsg()), msg -> pushMsgToNode(firstNode, msg, envelope.getFromRelationType())); 190 + pushMsgToNode(firstNode, enrichWithRuleChainId(envelope.getMsg()), envelope.getFromRelationType());
210 } 191 }
211 } else { 192 } else {
212 if (firstNode != null) { 193 if (firstNode != null) {
213 pushMsgToNode(firstNode, envelope.getMsg(), envelope.getFromRelationType()); 194 pushMsgToNode(firstNode, envelope.getMsg(), envelope.getFromRelationType());
214 } else { 195 } else {
215 - TbMsg msg = envelope.getMsg();  
216 - EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();  
217 - queue.ack(tenantId, envelope.getMsg(), ackId.getId(), msg.getClusterPartition()); 196 +// TODO: Ack this message in Kafka
  197 +// TbMsg msg = envelope.getMsg();
  198 +// EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
  199 +// queue.ack(tenantId, envelope.getMsg(), ackId.getId(), msg.getClusterPartition());
218 } 200 }
219 } 201 }
220 } 202 }
@@ -249,7 +231,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh @@ -249,7 +231,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
249 EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId(); 231 EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
250 if (relationsCount == 0) { 232 if (relationsCount == 0) {
251 if (ackId != null) { 233 if (ackId != null) {
252 - queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition()); 234 +// TODO: Ack this message in Kafka
  235 +// queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
253 } 236 }
254 } else if (relationsCount == 1) { 237 } else if (relationsCount == 1) {
255 for (RuleNodeRelation relation : relations) { 238 for (RuleNodeRelation relation : relations) {
@@ -269,7 +252,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh @@ -269,7 +252,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
269 } 252 }
270 //TODO: Ideally this should happen in async way when all targets confirm that the copied messages are successfully written to corresponding target queues. 253 //TODO: Ideally this should happen in async way when all targets confirm that the copied messages are successfully written to corresponding target queues.
271 if (ackId != null) { 254 if (ackId != null) {
272 - queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition()); 255 +// TODO: Ack this message in Kafka
  256 +// queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
273 } 257 }
274 } 258 }
275 } 259 }
@@ -296,7 +280,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh @@ -296,7 +280,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
296 RuleNodeId targetId = new RuleNodeId(target.getId()); 280 RuleNodeId targetId = new RuleNodeId(target.getId());
297 RuleNodeCtx targetNodeCtx = nodeActors.get(targetId); 281 RuleNodeCtx targetNodeCtx = nodeActors.get(targetId);
298 TbMsg copy = msg.copy(UUIDs.timeBased(), entityId, targetId, DEFAULT_CLUSTER_PARTITION); 282 TbMsg copy = msg.copy(UUIDs.timeBased(), entityId, targetId, DEFAULT_CLUSTER_PARTITION);
299 - putToQueue(copy, queuedMsg -> pushMsgToNode(targetNodeCtx, queuedMsg, fromRelationType)); 283 + pushMsgToNode(targetNodeCtx, copy, fromRelationType);
300 } 284 }
301 285
302 private void pushToTarget(TbMsg msg, EntityId target, String fromRelationType) { 286 private void pushToTarget(TbMsg msg, EntityId target, String fromRelationType) {
@@ -30,7 +30,6 @@ import org.thingsboard.server.actors.app.AppActor; @@ -30,7 +30,6 @@ import org.thingsboard.server.actors.app.AppActor;
30 import org.thingsboard.server.actors.rpc.RpcBroadcastMsg; 30 import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
31 import org.thingsboard.server.actors.rpc.RpcManagerActor; 31 import org.thingsboard.server.actors.rpc.RpcManagerActor;
32 import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg; 32 import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
33 -import org.thingsboard.server.actors.session.SessionManagerActor;  
34 import org.thingsboard.server.actors.stats.StatsActor; 33 import org.thingsboard.server.actors.stats.StatsActor;
35 import org.thingsboard.server.common.data.Device; 34 import org.thingsboard.server.common.data.Device;
36 import org.thingsboard.server.common.data.id.DeviceId; 35 import org.thingsboard.server.common.data.id.DeviceId;
@@ -90,8 +89,6 @@ public class DefaultActorService implements ActorService { @@ -90,8 +89,6 @@ public class DefaultActorService implements ActorService {
90 89
91 private ActorRef appActor; 90 private ActorRef appActor;
92 91
93 - private ActorRef sessionManagerActor;  
94 -  
95 private ActorRef rpcManagerActor; 92 private ActorRef rpcManagerActor;
96 93
97 @PostConstruct 94 @PostConstruct
@@ -104,10 +101,6 @@ public class DefaultActorService implements ActorService { @@ -104,10 +101,6 @@ public class DefaultActorService implements ActorService {
104 appActor = system.actorOf(Props.create(new AppActor.ActorCreator(actorContext)).withDispatcher(APP_DISPATCHER_NAME), "appActor"); 101 appActor = system.actorOf(Props.create(new AppActor.ActorCreator(actorContext)).withDispatcher(APP_DISPATCHER_NAME), "appActor");
105 actorContext.setAppActor(appActor); 102 actorContext.setAppActor(appActor);
106 103
107 - sessionManagerActor = system.actorOf(Props.create(new SessionManagerActor.ActorCreator(actorContext)).withDispatcher(CORE_DISPATCHER_NAME),  
108 - "sessionManagerActor");  
109 - actorContext.setSessionManagerActor(sessionManagerActor);  
110 -  
111 rpcManagerActor = system.actorOf(Props.create(new RpcManagerActor.ActorCreator(actorContext)).withDispatcher(CORE_DISPATCHER_NAME), 104 rpcManagerActor = system.actorOf(Props.create(new RpcManagerActor.ActorCreator(actorContext)).withDispatcher(CORE_DISPATCHER_NAME),
112 "rpcManagerActor"); 105 "rpcManagerActor");
113 106
@@ -135,12 +128,6 @@ public class DefaultActorService implements ActorService { @@ -135,12 +128,6 @@ public class DefaultActorService implements ActorService {
135 } 128 }
136 129
137 @Override 130 @Override
138 - public void process(SessionAwareMsg msg) {  
139 - log.debug("Processing session aware msg: {}", msg);  
140 - sessionManagerActor.tell(msg, ActorRef.noSender());  
141 - }  
142 -  
143 - @Override  
144 public void onServerAdded(ServerInstance server) { 131 public void onServerAdded(ServerInstance server) {
145 log.trace("Processing onServerAdded msg: {}", server); 132 log.trace("Processing onServerAdded msg: {}", server);
146 broadcast(new ClusterEventMsg(server.getServerAddress(), true)); 133 broadcast(new ClusterEventMsg(server.getServerAddress(), true));
@@ -194,7 +181,6 @@ public class DefaultActorService implements ActorService { @@ -194,7 +181,6 @@ public class DefaultActorService implements ActorService {
194 181
195 private void broadcast(ClusterEventMsg msg) { 182 private void broadcast(ClusterEventMsg msg) {
196 this.appActor.tell(msg, ActorRef.noSender()); 183 this.appActor.tell(msg, ActorRef.noSender());
197 - this.sessionManagerActor.tell(msg, ActorRef.noSender());  
198 this.rpcManagerActor.tell(msg, ActorRef.noSender()); 184 this.rpcManagerActor.tell(msg, ActorRef.noSender());
199 } 185 }
200 186
@@ -35,14 +35,12 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract @@ -35,14 +35,12 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract
35 35
36 protected final TenantId tenantId; 36 protected final TenantId tenantId;
37 protected final T entityId; 37 protected final T entityId;
38 - protected final MsgQueueService queue;  
39 protected ComponentLifecycleState state; 38 protected ComponentLifecycleState state;
40 39
41 protected ComponentMsgProcessor(ActorSystemContext systemContext, LoggingAdapter logger, TenantId tenantId, T id) { 40 protected ComponentMsgProcessor(ActorSystemContext systemContext, LoggingAdapter logger, TenantId tenantId, T id) {
42 super(systemContext, logger); 41 super(systemContext, logger);
43 this.tenantId = tenantId; 42 this.tenantId = tenantId;
44 this.entityId = id; 43 this.entityId = id;
45 - this.queue = systemContext.getMsgQueueService();  
46 } 44 }
47 45
48 public abstract void start(ActorContext context) throws Exception; 46 public abstract void start(ActorContext context) throws Exception;
@@ -86,18 +84,4 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract @@ -86,18 +84,4 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract
86 } 84 }
87 } 85 }
88 86
89 - protected void putToQueue(final TbMsg tbMsg, final Consumer<TbMsg> onSuccess) {  
90 - EntityId entityId = tbMsg.getRuleNodeId() != null ? tbMsg.getRuleNodeId() : tbMsg.getRuleChainId();  
91 - Futures.addCallback(queue.put(this.tenantId, tbMsg, entityId.getId(), tbMsg.getClusterPartition()), new FutureCallback<Void>() {  
92 - @Override  
93 - public void onSuccess(@Nullable Void result) {  
94 - onSuccess.accept(tbMsg);  
95 - }  
96 -  
97 - @Override  
98 - public void onFailure(Throwable t) {  
99 - logger.debug("Failed to push message [{}] to queue due to [{}]", tbMsg, t);  
100 - }  
101 - });  
102 - }  
103 } 87 }
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
1 package org.thingsboard.server.service.transport; 16 package org.thingsboard.server.service.transport;
2 17
3 import akka.actor.ActorRef; 18 import akka.actor.ActorRef;
@@ -30,6 +45,7 @@ import javax.annotation.PostConstruct; @@ -30,6 +45,7 @@ import javax.annotation.PostConstruct;
30 import javax.annotation.PreDestroy; 45 import javax.annotation.PreDestroy;
31 import java.time.Duration; 46 import java.time.Duration;
32 import java.util.Optional; 47 import java.util.Optional;
  48 +import java.util.UUID;
33 import java.util.concurrent.ExecutorService; 49 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Executors; 50 import java.util.concurrent.Executors;
35 import java.util.function.Consumer; 51 import java.util.function.Consumer;
@@ -136,6 +152,7 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ @@ -136,6 +152,7 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
136 @Override 152 @Override
137 public void process(String nodeId, DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure) { 153 public void process(String nodeId, DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure) {
138 notificationsProducer.send(notificationsTopic + "." + nodeId, 154 notificationsProducer.send(notificationsTopic + "." + nodeId,
  155 + new UUID(msg.getSessionIdMSB(), msg.getSessionIdLSB()).toString(),
139 ToTransportMsg.newBuilder().setToDeviceSessionMsg(msg).build() 156 ToTransportMsg.newBuilder().setToDeviceSessionMsg(msg).build()
140 , new QueueCallbackAdaptor(onSuccess, onFailure)); 157 , new QueueCallbackAdaptor(onSuccess, onFailure));
141 } 158 }
1 /** 1 /**
2 * Copyright © 2016-2018 The Thingsboard Authors 2 * Copyright © 2016-2018 The Thingsboard Authors
3 - * <p> 3 + *
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 - * <p>  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * <p> 7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,7 +16,6 @@ @@ -16,7 +16,6 @@
16 package org.thingsboard.server.service.transport; 16 package org.thingsboard.server.service.transport;
17 17
18 import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg; 18 import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg;
19 -import org.thingsboard.server.gen.transport.TransportProtos.;  
20 19
21 import java.util.function.Consumer; 20 import java.util.function.Consumer;
22 21
1 /** 1 /**
2 * Copyright © 2016-2018 The Thingsboard Authors 2 * Copyright © 2016-2018 The Thingsboard Authors
3 - * <p> 3 + *
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 - * <p>  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * <p> 7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
1 package org.thingsboard.server.service.transport.msg; 16 package org.thingsboard.server.service.transport.msg;
2 17
3 import lombok.Data; 18 import lombok.Data;
@@ -7,7 +22,6 @@ import org.thingsboard.server.common.msg.MsgType; @@ -7,7 +22,6 @@ import org.thingsboard.server.common.msg.MsgType;
7 import org.thingsboard.server.common.msg.TbActorMsg; 22 import org.thingsboard.server.common.msg.TbActorMsg;
8 import org.thingsboard.server.common.msg.aware.DeviceAwareMsg; 23 import org.thingsboard.server.common.msg.aware.DeviceAwareMsg;
9 import org.thingsboard.server.common.msg.aware.TenantAwareMsg; 24 import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
10 -import org.thingsboard.server.common.msg.cluster.ServerAddress;  
11 import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; 25 import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
12 26
13 import java.io.Serializable; 27 import java.io.Serializable;
@@ -91,8 +91,6 @@ public enum MsgType { @@ -91,8 +91,6 @@ public enum MsgType {
91 91
92 DEVICE_ACTOR_CLIENT_SIDE_RPC_TIMEOUT_MSG, 92 DEVICE_ACTOR_CLIENT_SIDE_RPC_TIMEOUT_MSG,
93 93
94 - DEVICE_ACTOR_QUEUE_TIMEOUT_MSG,  
95 -  
96 /** 94 /**
97 * Message that is sent from the Device Actor to Rule Engine. Requires acknowledgement 95 * Message that is sent from the Device Actor to Rule Engine. Requires acknowledgement
98 */ 96 */
@@ -101,7 +99,6 @@ public enum MsgType { @@ -101,7 +99,6 @@ public enum MsgType {
101 /** 99 /**
102 * Message that is sent from Rule Engine to the Device Actor when message is successfully pushed to queue. 100 * Message that is sent from Rule Engine to the Device Actor when message is successfully pushed to queue.
103 */ 101 */
104 - RULE_ENGINE_QUEUE_PUT_ACK_MSG,  
105 ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG, 102 ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG,
106 TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG, 103 TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG,
107 SESSION_TIMEOUT_MSG, 104 SESSION_TIMEOUT_MSG,
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.common.msg.core;  
17 -  
18 -import org.thingsboard.server.common.data.id.SessionId;  
19 -import org.thingsboard.server.common.msg.MsgType;  
20 -import org.thingsboard.server.common.msg.session.ToDeviceMsg;  
21 -  
22 -public class BasicActorSystemToDeviceSessionActorMsg implements ActorSystemToDeviceSessionActorMsg {  
23 -  
24 - private final ToDeviceMsg msg;  
25 - private final SessionId sessionId;  
26 -  
27 - public BasicActorSystemToDeviceSessionActorMsg(ToDeviceMsg msg, SessionId sessionId) {  
28 - super();  
29 - this.msg = msg;  
30 - this.sessionId = sessionId;  
31 - }  
32 -  
33 - @Override  
34 - public SessionId getSessionId() {  
35 - return sessionId;  
36 - }  
37 -  
38 - @Override  
39 - public ToDeviceMsg getMsg() {  
40 - return msg;  
41 - }  
42 -  
43 - @Override  
44 - public String toString() {  
45 - return "BasicActorSystemToDeviceSessionActorMsg [msg=" + msg + ", sessionId=" + sessionId + "]";  
46 - }  
47 -  
48 - @Override  
49 - public MsgType getMsgType() {  
50 - return MsgType.ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG;  
51 - }  
52 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.common.msg.timeout;  
17 -  
18 -import org.thingsboard.server.common.msg.MsgType;  
19 -import org.thingsboard.server.common.msg.timeout.TimeoutMsg;  
20 -  
21 -import java.util.UUID;  
22 -  
23 -/**  
24 - * @author Andrew Shvayka  
25 - */  
26 -public final class DeviceActorQueueTimeoutMsg extends TimeoutMsg<UUID> {  
27 -  
28 - public DeviceActorQueueTimeoutMsg(UUID id, long timeout) {  
29 - super(id, timeout);  
30 - }  
31 -  
32 - @Override  
33 - public MsgType getMsgType() {  
34 - return MsgType.DEVICE_ACTOR_QUEUE_TIMEOUT_MSG;  
35 - }  
36 -}  
1 /** 1 /**
2 * Copyright © 2016-2018 The Thingsboard Authors 2 * Copyright © 2016-2018 The Thingsboard Authors
3 - * <p> 3 + *
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 - * <p>  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * <p> 7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1 /** 1 /**
2 * Copyright © 2016-2018 The Thingsboard Authors 2 * Copyright © 2016-2018 The Thingsboard Authors
3 - * <p> 3 + *
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 - * <p>  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * <p> 7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -106,6 +106,10 @@ public class TBKafkaProducerTemplate<T> { @@ -106,6 +106,10 @@ public class TBKafkaProducerTemplate<T> {
106 return send(topic, key, value, null, headers, callback); 106 return send(topic, key, value, null, headers, callback);
107 } 107 }
108 108
  109 + public Future<RecordMetadata> send(String topic, String key, T value, Callback callback) {
  110 + return send(topic, key, value, null, null, callback);
  111 + }
  112 +
109 public Future<RecordMetadata> send(String topic, String key, T value, Long timestamp, Iterable<Header> headers, Callback callback) { 113 public Future<RecordMetadata> send(String topic, String key, T value, Long timestamp, Iterable<Header> headers, Callback callback) {
110 byte[] data = encoder.encode(value); 114 byte[] data = encoder.encode(value);
111 ProducerRecord<String, byte[]> record; 115 ProducerRecord<String, byte[]> record;
@@ -160,7 +160,7 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe @@ -160,7 +160,7 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
160 SettableFuture<Response> future = SettableFuture.create(); 160 SettableFuture<Response> future = SettableFuture.create();
161 pendingRequests.putIfAbsent(requestId, new ResponseMetaData<>(tickTs + maxRequestTimeout, future)); 161 pendingRequests.putIfAbsent(requestId, new ResponseMetaData<>(tickTs + maxRequestTimeout, future));
162 request = requestTemplate.enrich(request, responseTemplate.getTopic(), requestId); 162 request = requestTemplate.enrich(request, responseTemplate.getTopic(), requestId);
163 - requestTemplate.send(key, request, headers); 163 + requestTemplate.send(key, request, headers, null);
164 return future; 164 return future;
165 } 165 }
166 166
1 /** 1 /**
2 * Copyright © 2016-2018 The Thingsboard Authors 2 * Copyright © 2016-2018 The Thingsboard Authors
3 - * <p> 3 + *
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 - * <p>  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * <p> 7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1 /** 1 /**
2 * Copyright © 2016-2018 The Thingsboard Authors 2 * Copyright © 2016-2018 The Thingsboard Authors
3 - * <p> 3 + *
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 - * <p>  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * <p> 7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,8 +20,6 @@ import org.thingsboard.server.common.msg.aware.SessionAwareMsg; @@ -20,8 +20,6 @@ import org.thingsboard.server.common.msg.aware.SessionAwareMsg;
20 20
21 public interface SessionMsgProcessor { 21 public interface SessionMsgProcessor {
22 22
23 - void process(SessionAwareMsg msg);  
24 -  
25 void onDeviceAdded(Device device); 23 void onDeviceAdded(Device device);
26 24
27 } 25 }
@@ -20,6 +20,7 @@ import org.thingsboard.server.common.msg.session.SessionMsgType; @@ -20,6 +20,7 @@ import org.thingsboard.server.common.msg.session.SessionMsgType;
20 import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg; 20 import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg;
21 import org.thingsboard.server.common.msg.session.SessionContext; 21 import org.thingsboard.server.common.msg.session.SessionContext;
22 import org.thingsboard.server.common.transport.adaptor.AdaptorException; 22 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
  23 +import org.thingsboard.server.gen.transport.TransportProtos;
23 24
24 import java.util.Optional; 25 import java.util.Optional;
25 26
1 /** 1 /**
2 * Copyright © 2016-2018 The Thingsboard Authors 2 * Copyright © 2016-2018 The Thingsboard Authors
3 - * <p> 3 + *
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 - * <p>  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * <p> 7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,6 +22,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg; @@ -22,6 +22,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
22 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg; 22 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
23 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg; 23 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
24 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; 24 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
  25 +import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
25 26
26 /** 27 /**
27 * Created by ashvayka on 04.10.18. 28 * Created by ashvayka on 04.10.18.
@@ -40,6 +41,8 @@ public interface TransportService { @@ -40,6 +41,8 @@ public interface TransportService {
40 41
41 void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback); 42 void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback);
42 43
  44 + void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback);
  45 +
43 void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener); 46 void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener);
44 47
45 void deregisterSession(SessionInfoProto sessionInfo); 48 void deregisterSession(SessionInfoProto sessionInfo);
@@ -15,18 +15,38 @@ @@ -15,18 +15,38 @@
15 */ 15 */
16 package org.thingsboard.server.common.transport.adaptor; 16 package org.thingsboard.server.common.transport.adaptor;
17 17
  18 +import com.google.gson.Gson;
  19 +import com.google.gson.JsonArray;
  20 +import com.google.gson.JsonElement;
  21 +import com.google.gson.JsonObject;
  22 +import com.google.gson.JsonParser;
  23 +import com.google.gson.JsonPrimitive;
  24 +import com.google.gson.JsonSyntaxException;
  25 +import org.thingsboard.server.common.data.kv.AttributeKey;
  26 +import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  27 +import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
  28 +import org.thingsboard.server.common.data.kv.BooleanDataEntry;
  29 +import org.thingsboard.server.common.data.kv.DoubleDataEntry;
  30 +import org.thingsboard.server.common.data.kv.KvEntry;
  31 +import org.thingsboard.server.common.data.kv.LongDataEntry;
  32 +import org.thingsboard.server.common.data.kv.StringDataEntry;
  33 +import org.thingsboard.server.common.msg.core.AttributesUpdateRequest;
  34 +import org.thingsboard.server.common.msg.core.BasicAttributesUpdateRequest;
  35 +import org.thingsboard.server.common.msg.core.BasicRequest;
  36 +import org.thingsboard.server.common.msg.core.BasicTelemetryUploadRequest;
  37 +import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
  38 +import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
  39 +import org.thingsboard.server.common.msg.core.ToServerRpcRequestMsg;
  40 +import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg;
  41 +import org.thingsboard.server.common.msg.kv.AttributesKVMsg;
  42 +import org.thingsboard.server.gen.transport.TransportProtos.*;
  43 +
18 import java.util.ArrayList; 44 import java.util.ArrayList;
19 import java.util.List; 45 import java.util.List;
20 import java.util.Map.Entry; 46 import java.util.Map.Entry;
21 import java.util.function.Consumer; 47 import java.util.function.Consumer;
22 import java.util.stream.Collectors; 48 import java.util.stream.Collectors;
23 49
24 -import com.google.gson.*;  
25 -import org.thingsboard.server.common.msg.core.*;  
26 -  
27 -import org.thingsboard.server.common.data.kv.*;  
28 -import org.thingsboard.server.common.msg.kv.AttributesKVMsg;  
29 -  
30 public class JsonConverter { 50 public class JsonConverter {
31 51
32 private static final Gson GSON = new Gson(); 52 private static final Gson GSON = new Gson();
@@ -44,6 +64,109 @@ public class JsonConverter { @@ -44,6 +64,109 @@ public class JsonConverter {
44 return convertToTelemetry(jsonObject, System.currentTimeMillis(), requestId); 64 return convertToTelemetry(jsonObject, System.currentTimeMillis(), requestId);
45 } 65 }
46 66
  67 + public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonObject) throws JsonSyntaxException {
  68 + long systemTs = System.currentTimeMillis();
  69 + PostTelemetryMsg.Builder builder = PostTelemetryMsg.newBuilder();
  70 + if (jsonObject.isJsonObject()) {
  71 + parseObject(builder, systemTs, jsonObject);
  72 + } else if (jsonObject.isJsonArray()) {
  73 + jsonObject.getAsJsonArray().forEach(je -> {
  74 + if (je.isJsonObject()) {
  75 + parseObject(builder, systemTs, je.getAsJsonObject());
  76 + } else {
  77 + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + je);
  78 + }
  79 + });
  80 + } else {
  81 + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + jsonObject);
  82 + }
  83 + return builder.build();
  84 + }
  85 +
  86 + public static PostAttributeMsg convertToAttributesProto(JsonElement jsonObject) throws JsonSyntaxException {
  87 + if (jsonObject.isJsonObject()) {
  88 + PostAttributeMsg.Builder result = PostAttributeMsg.newBuilder();
  89 + List<KeyValueProto> keyValueList = parseProtoValues(jsonObject.getAsJsonObject());
  90 + result.addAllKv(keyValueList);
  91 + return result.build();
  92 + } else {
  93 + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + jsonObject);
  94 + }
  95 + }
  96 +
  97 +
  98 + private static void parseObject(PostTelemetryMsg.Builder builder, long systemTs, JsonElement jsonObject) {
  99 + JsonObject jo = jsonObject.getAsJsonObject();
  100 + if (jo.has("ts") && jo.has("values")) {
  101 + parseWithTs(builder, jo);
  102 + } else {
  103 + parseWithoutTs(builder, systemTs, jo);
  104 + }
  105 + }
  106 +
  107 + private static void parseWithoutTs(PostTelemetryMsg.Builder request, long systemTs, JsonObject jo) {
  108 + TsKvListProto.Builder builder = TsKvListProto.newBuilder();
  109 + builder.setTs(systemTs);
  110 + builder.addAllKv(parseProtoValues(jo));
  111 + request.addTsKvList(builder.build());
  112 + }
  113 +
  114 + public static void parseWithTs(PostTelemetryMsg.Builder request, JsonObject jo) {
  115 + TsKvListProto.Builder builder = TsKvListProto.newBuilder();
  116 + builder.setTs(jo.get("ts").getAsLong());
  117 + builder.addAllKv(parseProtoValues(jo.get("values").getAsJsonObject()));
  118 + request.addTsKvList(builder.build());
  119 + }
  120 +
  121 + public static List<KeyValueProto> parseProtoValues(JsonObject valuesObject) {
  122 + List<KeyValueProto> result = new ArrayList<>();
  123 + for (Entry<String, JsonElement> valueEntry : valuesObject.entrySet()) {
  124 + JsonElement element = valueEntry.getValue();
  125 + if (element.isJsonPrimitive()) {
  126 + JsonPrimitive value = element.getAsJsonPrimitive();
  127 + if (value.isString()) {
  128 + result.add(KeyValueProto.newBuilder().setKey(valueEntry.getKey()).setType(KeyValueType.STRING_V)
  129 + .setStringV(value.getAsString()).build());
  130 + } else if (value.isBoolean()) {
  131 + result.add(KeyValueProto.newBuilder().setKey(valueEntry.getKey()).setType(KeyValueType.BOOLEAN_V)
  132 + .setBoolV(value.getAsBoolean()).build());
  133 + } else if (value.isNumber()) {
  134 + if (value.getAsString().contains(".")) {
  135 + result.add(KeyValueProto.newBuilder().setKey(valueEntry.getKey()).setType(KeyValueType.DOUBLE_V)
  136 + .setDoubleV(value.getAsDouble()).build());
  137 + } else {
  138 + try {
  139 + long longValue = Long.parseLong(value.getAsString());
  140 + result.add(KeyValueProto.newBuilder().setKey(valueEntry.getKey()).setType(KeyValueType.LONG_V)
  141 + .setLongV(longValue).build());
  142 + } catch (NumberFormatException e) {
  143 + throw new JsonSyntaxException("Big integer values are not supported!");
  144 + }
  145 + }
  146 + } else {
  147 + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + value);
  148 + }
  149 + } else {
  150 + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + element);
  151 + }
  152 + }
  153 + return result;
  154 + }
  155 +
  156 + private static void parseNumericProto(List<KvEntry> result, Entry<String, JsonElement> valueEntry, JsonPrimitive value) {
  157 + if (value.getAsString().contains(".")) {
  158 + result.add(new DoubleDataEntry(valueEntry.getKey(), value.getAsDouble()));
  159 + } else {
  160 + try {
  161 + long longValue = Long.parseLong(value.getAsString());
  162 + result.add(new LongDataEntry(valueEntry.getKey(), longValue));
  163 + } catch (NumberFormatException e) {
  164 + throw new JsonSyntaxException("Big integer values are not supported!");
  165 + }
  166 + }
  167 + }
  168 +
  169 +
47 private static TelemetryUploadRequest convertToTelemetry(JsonElement jsonObject, long systemTs, int requestId) throws JsonSyntaxException { 170 private static TelemetryUploadRequest convertToTelemetry(JsonElement jsonObject, long systemTs, int requestId) throws JsonSyntaxException {
48 BasicTelemetryUploadRequest request = new BasicTelemetryUploadRequest(requestId); 171 BasicTelemetryUploadRequest request = new BasicTelemetryUploadRequest(requestId);
49 if (jsonObject.isJsonObject()) { 172 if (jsonObject.isJsonObject()) {
@@ -140,6 +263,26 @@ public class JsonConverter { @@ -140,6 +263,26 @@ public class JsonConverter {
140 } 263 }
141 } 264 }
142 265
  266 + public static JsonObject toJson(GetAttributeResponseMsg payload) {
  267 + JsonObject result = new JsonObject();
  268 + if (payload.getClientAttributeListCount() > 0) {
  269 + JsonObject attrObject = new JsonObject();
  270 + payload.getClientAttributeListList().forEach(addToObjectFromProto(attrObject));
  271 + result.add("client", attrObject);
  272 + }
  273 + if (payload.getSharedAttributeListCount() > 0) {
  274 + JsonObject attrObject = new JsonObject();
  275 + payload.getSharedAttributeListList().forEach(addToObjectFromProto(attrObject));
  276 + result.add("shared", attrObject);
  277 + }
  278 + if (payload.getDeletedAttributeKeysCount() > 0) {
  279 + JsonArray attrObject = new JsonArray();
  280 + payload.getDeletedAttributeKeysList().forEach(attrObject::add);
  281 + result.add("deleted", attrObject);
  282 + }
  283 + return result;
  284 + }
  285 +
143 public static JsonObject toJson(AttributesKVMsg payload, boolean asMap) { 286 public static JsonObject toJson(AttributesKVMsg payload, boolean asMap) {
144 JsonObject result = new JsonObject(); 287 JsonObject result = new JsonObject();
145 if (asMap) { 288 if (asMap) {
@@ -166,8 +309,29 @@ public class JsonConverter { @@ -166,8 +309,29 @@ public class JsonConverter {
166 } 309 }
167 310
168 private static Consumer<AttributeKey> addToObject(JsonArray result) { 311 private static Consumer<AttributeKey> addToObject(JsonArray result) {
169 - return key -> {  
170 - result.add(key.getAttributeKey()); 312 + return key -> result.add(key.getAttributeKey());
  313 + }
  314 +
  315 + private static Consumer<TsKvProto> addToObjectFromProto(JsonObject result) {
  316 + return de -> {
  317 + JsonPrimitive value;
  318 + switch (de.getKv().getType()) {
  319 + case BOOLEAN_V:
  320 + value = new JsonPrimitive(de.getKv().getBoolV());
  321 + break;
  322 + case DOUBLE_V:
  323 + value = new JsonPrimitive(de.getKv().getDoubleV());
  324 + break;
  325 + case LONG_V:
  326 + value = new JsonPrimitive(de.getKv().getLongV());
  327 + break;
  328 + case STRING_V:
  329 + value = new JsonPrimitive(de.getKv().getStringV());
  330 + break;
  331 + default:
  332 + throw new IllegalArgumentException("Unsupported data type: " + de.getKv().getType());
  333 + }
  334 + result.add(de.getKv().getKey(), value);
171 }; 335 };
172 } 336 }
173 337
1 /** 1 /**
2 * Copyright © 2016-2018 The Thingsboard Authors 2 * Copyright © 2016-2018 The Thingsboard Authors
3 - * <p> 3 + *
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 - * <p>  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * <p> 7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -23,12 +23,13 @@ option java_outer_classname = "TransportProtos"; @@ -23,12 +23,13 @@ option java_outer_classname = "TransportProtos";
23 * Data Structures; 23 * Data Structures;
24 */ 24 */
25 message SessionInfoProto { 25 message SessionInfoProto {
26 - int64 sessionIdMSB = 1;  
27 - int64 sessionIdLSB = 2;  
28 - int64 tenantIdMSB = 3;  
29 - int64 tenantIdLSB = 4;  
30 - int64 deviceIdMSB = 5;  
31 - int64 deviceIdLSB = 6; 26 + string nodeId = 1;
  27 + int64 sessionIdMSB = 2;
  28 + int64 sessionIdLSB = 3;
  29 + int64 tenantIdMSB = 4;
  30 + int64 tenantIdLSB = 5;
  31 + int64 deviceIdMSB = 6;
  32 + int64 deviceIdLSB = 7;
32 } 33 }
33 34
34 enum SessionEvent { 35 enum SessionEvent {
@@ -57,6 +58,11 @@ message KeyValueProto { @@ -57,6 +58,11 @@ message KeyValueProto {
57 string string_v = 6; 58 string string_v = 6;
58 } 59 }
59 60
  61 +message TsKvProto {
  62 + int64 ts = 1;
  63 + KeyValueProto kv = 2;
  64 +}
  65 +
60 message TsKvListProto { 66 message TsKvListProto {
61 int64 ts = 1; 67 int64 ts = 1;
62 repeated KeyValueProto kv = 2; 68 repeated KeyValueProto kv = 2;
@@ -76,9 +82,8 @@ message DeviceInfoProto { @@ -76,9 +82,8 @@ message DeviceInfoProto {
76 * Messages that use Data Structures; 82 * Messages that use Data Structures;
77 */ 83 */
78 message SessionEventMsg { 84 message SessionEventMsg {
79 - string nodeId = 1;  
80 - SessionType sessionType = 2;  
81 - SessionEvent event = 3; 85 + SessionType sessionType = 1;
  86 + SessionEvent event = 2;
82 } 87 }
83 88
84 message PostTelemetryMsg { 89 message PostTelemetryMsg {
@@ -90,14 +95,17 @@ message PostAttributeMsg { @@ -90,14 +95,17 @@ message PostAttributeMsg {
90 } 95 }
91 96
92 message GetAttributeRequestMsg { 97 message GetAttributeRequestMsg {
93 - repeated string clientAttributeNames = 1;  
94 - repeated string sharedAttributeNames = 2; 98 + int32 requestId = 1;
  99 + repeated string clientAttributeNames = 2;
  100 + repeated string sharedAttributeNames = 3;
95 } 101 }
96 102
97 message GetAttributeResponseMsg { 103 message GetAttributeResponseMsg {
98 - repeated TsKvListProto clientAttributeList = 1;  
99 - repeated TsKvListProto sharedAttributeList = 2;  
100 - repeated string deletedAttributeKeys = 3; 104 + int32 requestId = 1;
  105 + repeated TsKvProto clientAttributeList = 2;
  106 + repeated TsKvProto sharedAttributeList = 3;
  107 + repeated string deletedAttributeKeys = 4;
  108 + string error = 5;
101 } 109 }
102 110
103 message ValidateDeviceTokenRequestMsg { 111 message ValidateDeviceTokenRequestMsg {
@@ -106,30 +106,30 @@ public class CoapServerTest { @@ -106,30 +106,30 @@ public class CoapServerTest {
106 public static SessionMsgProcessor sessionMsgProcessor() { 106 public static SessionMsgProcessor sessionMsgProcessor() {
107 return new SessionMsgProcessor() { 107 return new SessionMsgProcessor() {
108 108
109 - @Override  
110 - public void process(SessionAwareMsg toActorMsg) {  
111 - if (toActorMsg instanceof TransportToDeviceSessionActorMsg) {  
112 - AdaptorToSessionActorMsg sessionMsg = ((TransportToDeviceSessionActorMsg) toActorMsg).getSessionMsg();  
113 - try {  
114 - FromDeviceMsg deviceMsg = sessionMsg.getMsg();  
115 - ToDeviceMsg toDeviceMsg = null;  
116 - if (deviceMsg.getMsgType() == SessionMsgType.POST_TELEMETRY_REQUEST) {  
117 - toDeviceMsg = BasicStatusCodeResponse.onSuccess(deviceMsg.getMsgType(), BasicRequest.DEFAULT_REQUEST_ID);  
118 - } else if (deviceMsg.getMsgType() == SessionMsgType.GET_ATTRIBUTES_REQUEST) {  
119 - List<AttributeKvEntry> data = new ArrayList<>();  
120 - data.add(new BaseAttributeKvEntry(new StringDataEntry("key1", "value1"), System.currentTimeMillis()));  
121 - data.add(new BaseAttributeKvEntry(new LongDataEntry("key2", 42L), System.currentTimeMillis()));  
122 - BasicAttributeKVMsg kv = BasicAttributeKVMsg.fromClient(data);  
123 - toDeviceMsg = BasicGetAttributesResponse.onSuccess(deviceMsg.getMsgType(), BasicRequest.DEFAULT_REQUEST_ID, kv);  
124 - }  
125 - if (toDeviceMsg != null) {  
126 - sessionMsg.getSessionContext().onMsg(new BasicSessionActorToAdaptorMsg(sessionMsg.getSessionContext(), toDeviceMsg));  
127 - }  
128 - } catch (Exception e) {  
129 - e.printStackTrace();  
130 - }  
131 - }  
132 - } 109 +// @Override
  110 +// public void process(SessionAwareMsg toActorMsg) {
  111 +// if (toActorMsg instanceof TransportToDeviceSessionActorMsg) {
  112 +// AdaptorToSessionActorMsg sessionMsg = ((TransportToDeviceSessionActorMsg) toActorMsg).getSessionMsg();
  113 +// try {
  114 +// FromDeviceMsg deviceMsg = sessionMsg.getMsg();
  115 +// ToDeviceMsg toDeviceMsg = null;
  116 +// if (deviceMsg.getMsgType() == SessionMsgType.POST_TELEMETRY_REQUEST) {
  117 +// toDeviceMsg = BasicStatusCodeResponse.onSuccess(deviceMsg.getMsgType(), BasicRequest.DEFAULT_REQUEST_ID);
  118 +// } else if (deviceMsg.getMsgType() == SessionMsgType.GET_ATTRIBUTES_REQUEST) {
  119 +// List<AttributeKvEntry> data = new ArrayList<>();
  120 +// data.add(new BaseAttributeKvEntry(new StringDataEntry("key1", "value1"), System.currentTimeMillis()));
  121 +// data.add(new BaseAttributeKvEntry(new LongDataEntry("key2", 42L), System.currentTimeMillis()));
  122 +// BasicAttributeKVMsg kv = BasicAttributeKVMsg.fromClient(data);
  123 +// toDeviceMsg = BasicGetAttributesResponse.onSuccess(deviceMsg.getMsgType(), BasicRequest.DEFAULT_REQUEST_ID, kv);
  124 +// }
  125 +// if (toDeviceMsg != null) {
  126 +// sessionMsg.getSessionContext().onMsg(new BasicSessionActorToAdaptorMsg(sessionMsg.getSessionContext(), toDeviceMsg));
  127 +// }
  128 +// } catch (Exception e) {
  129 +// e.printStackTrace();
  130 +// }
  131 +// }
  132 +// }
133 133
134 @Override 134 @Override
135 public void onDeviceAdded(Device device) { 135 public void onDeviceAdded(Device device) {
1 /** 1 /**
2 * Copyright © 2016-2018 The Thingsboard Authors 2 * Copyright © 2016-2018 The Thingsboard Authors
3 - * <p> 3 + *
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 - * <p>  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * <p> 7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -26,18 +26,25 @@ import io.netty.handler.codec.mqtt.MqttFixedHeader; @@ -26,18 +26,25 @@ import io.netty.handler.codec.mqtt.MqttFixedHeader;
26 import io.netty.handler.codec.mqtt.MqttMessage; 26 import io.netty.handler.codec.mqtt.MqttMessage;
27 import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; 27 import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
28 import io.netty.handler.codec.mqtt.MqttPubAckMessage; 28 import io.netty.handler.codec.mqtt.MqttPubAckMessage;
  29 +import io.netty.handler.codec.mqtt.MqttPublishMessage;
29 import io.netty.handler.codec.mqtt.MqttQoS; 30 import io.netty.handler.codec.mqtt.MqttQoS;
30 import io.netty.handler.codec.mqtt.MqttSubAckMessage; 31 import io.netty.handler.codec.mqtt.MqttSubAckMessage;
31 import io.netty.handler.codec.mqtt.MqttSubAckPayload; 32 import io.netty.handler.codec.mqtt.MqttSubAckPayload;
  33 +import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
  34 +import io.netty.handler.codec.mqtt.MqttTopicSubscription;
  35 +import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
32 import io.netty.handler.ssl.SslHandler; 36 import io.netty.handler.ssl.SslHandler;
33 import io.netty.util.concurrent.Future; 37 import io.netty.util.concurrent.Future;
34 import io.netty.util.concurrent.GenericFutureListener; 38 import io.netty.util.concurrent.GenericFutureListener;
35 import lombok.extern.slf4j.Slf4j; 39 import lombok.extern.slf4j.Slf4j;
36 import org.springframework.util.StringUtils; 40 import org.springframework.util.StringUtils;
  41 +import org.thingsboard.server.common.transport.SessionMsgListener;
37 import org.thingsboard.server.common.transport.TransportService; 42 import org.thingsboard.server.common.transport.TransportService;
38 import org.thingsboard.server.common.transport.TransportServiceCallback; 43 import org.thingsboard.server.common.transport.TransportServiceCallback;
  44 +import org.thingsboard.server.common.transport.adaptor.AdaptorException;
39 import org.thingsboard.server.common.transport.quota.QuotaService; 45 import org.thingsboard.server.common.transport.quota.QuotaService;
40 import org.thingsboard.server.dao.EncryptionUtil; 46 import org.thingsboard.server.dao.EncryptionUtil;
  47 +import org.thingsboard.server.gen.transport.TransportProtos;
41 import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto; 48 import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
42 import org.thingsboard.server.gen.transport.TransportProtos.SessionEvent; 49 import org.thingsboard.server.gen.transport.TransportProtos.SessionEvent;
43 import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg; 50 import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
@@ -54,6 +61,7 @@ import javax.net.ssl.SSLPeerUnverifiedException; @@ -54,6 +61,7 @@ import javax.net.ssl.SSLPeerUnverifiedException;
54 import javax.security.cert.X509Certificate; 61 import javax.security.cert.X509Certificate;
55 import java.io.IOException; 62 import java.io.IOException;
56 import java.net.InetSocketAddress; 63 import java.net.InetSocketAddress;
  64 +import java.util.ArrayList;
57 import java.util.List; 65 import java.util.List;
58 import java.util.UUID; 66 import java.util.UUID;
59 import java.util.concurrent.ConcurrentHashMap; 67 import java.util.concurrent.ConcurrentHashMap;
@@ -65,14 +73,16 @@ import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUS @@ -65,14 +73,16 @@ import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUS
65 import static io.netty.handler.codec.mqtt.MqttMessageType.CONNACK; 73 import static io.netty.handler.codec.mqtt.MqttMessageType.CONNACK;
66 import static io.netty.handler.codec.mqtt.MqttMessageType.PUBACK; 74 import static io.netty.handler.codec.mqtt.MqttMessageType.PUBACK;
67 import static io.netty.handler.codec.mqtt.MqttMessageType.SUBACK; 75 import static io.netty.handler.codec.mqtt.MqttMessageType.SUBACK;
  76 +import static io.netty.handler.codec.mqtt.MqttMessageType.UNSUBACK;
68 import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE; 77 import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
69 import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; 78 import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
  79 +import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE;
70 80
71 /** 81 /**
72 * @author Andrew Shvayka 82 * @author Andrew Shvayka
73 */ 83 */
74 @Slf4j 84 @Slf4j
75 -public class MqttTransportHandler extends ChannelInboundHandlerAdapter implements GenericFutureListener<Future<? super Void>> { 85 +public class MqttTransportHandler extends ChannelInboundHandlerAdapter implements GenericFutureListener<Future<? super Void>>, SessionMsgListener {
76 86
77 public static final MqttQoS MAX_SUPPORTED_QOS_LVL = AT_LEAST_ONCE; 87 public static final MqttQoS MAX_SUPPORTED_QOS_LVL = AT_LEAST_ONCE;
78 88
@@ -84,8 +94,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -84,8 +94,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
84 private final SslHandler sslHandler; 94 private final SslHandler sslHandler;
85 private final ConcurrentMap<String, Integer> mqttQoSMap; 95 private final ConcurrentMap<String, Integer> mqttQoSMap;
86 96
87 - private final SessionInfoProto sessionInfo;  
88 - 97 + private volatile SessionInfoProto sessionInfo;
89 private volatile InetSocketAddress address; 98 private volatile InetSocketAddress address;
90 private volatile DeviceSessionCtx deviceSessionCtx; 99 private volatile DeviceSessionCtx deviceSessionCtx;
91 private volatile GatewaySessionCtx gatewaySessionCtx; 100 private volatile GatewaySessionCtx gatewaySessionCtx;
@@ -98,11 +107,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -98,11 +107,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
98 this.quotaService = context.getQuotaService(); 107 this.quotaService = context.getQuotaService();
99 this.sslHandler = context.getSslHandler(); 108 this.sslHandler = context.getSslHandler();
100 this.mqttQoSMap = new ConcurrentHashMap<>(); 109 this.mqttQoSMap = new ConcurrentHashMap<>();
101 - this.sessionInfo = SessionInfoProto.newBuilder()  
102 - .setNodeId(context.getNodeId())  
103 - .setSessionIdMSB(sessionId.getMostSignificantBits())  
104 - .setSessionIdLSB(sessionId.getLeastSignificantBits())  
105 - .build();  
106 this.deviceSessionCtx = new DeviceSessionCtx(mqttQoSMap); 110 this.deviceSessionCtx = new DeviceSessionCtx(mqttQoSMap);
107 } 111 }
108 112
@@ -135,15 +139,15 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -135,15 +139,15 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
135 case CONNECT: 139 case CONNECT:
136 processConnect(ctx, (MqttConnectMessage) msg); 140 processConnect(ctx, (MqttConnectMessage) msg);
137 break; 141 break;
138 -// case PUBLISH:  
139 -// processPublish(ctx, (MqttPublishMessage) msg);  
140 -// break;  
141 -// case SUBSCRIBE:  
142 -// processSubscribe(ctx, (MqttSubscribeMessage) msg);  
143 -// break;  
144 -// case UNSUBSCRIBE:  
145 -// processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);  
146 -// break; 142 + case PUBLISH:
  143 + processPublish(ctx, (MqttPublishMessage) msg);
  144 + break;
  145 + case SUBSCRIBE:
  146 + processSubscribe(ctx, (MqttSubscribeMessage) msg);
  147 + break;
  148 + case UNSUBSCRIBE:
  149 + processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
  150 + break;
147 // case PINGREQ: 151 // case PINGREQ:
148 // if (checkConnected(ctx)) { 152 // if (checkConnected(ctx)) {
149 // ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); 153 // ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
@@ -160,24 +164,25 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -160,24 +164,25 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
160 164
161 } 165 }
162 166
163 -// private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {  
164 -// if (!checkConnected(ctx)) {  
165 -// return;  
166 -// }  
167 -// String topicName = mqttMsg.variableHeader().topicName();  
168 -// int msgId = mqttMsg.variableHeader().packetId();  
169 -// log.trace("[{}] Processing publish msg [{}][{}]!", sessionId, topicName, msgId);  
170 -//  
171 -// if (topicName.startsWith(BASE_GATEWAY_API_TOPIC)) {  
172 -// if (gatewaySessionCtx != null) {  
173 -// gatewaySessionCtx.setChannel(ctx); 167 + private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {
  168 + if (!checkConnected(ctx)) {
  169 + return;
  170 + }
  171 + String topicName = mqttMsg.variableHeader().topicName();
  172 + int msgId = mqttMsg.variableHeader().packetId();
  173 + log.trace("[{}] Processing publish msg [{}][{}]!", sessionId, topicName, msgId);
  174 +
  175 + if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) {
  176 + if (gatewaySessionCtx != null) {
  177 + gatewaySessionCtx.setChannel(ctx);
174 // handleMqttPublishMsg(topicName, msgId, mqttMsg); 178 // handleMqttPublishMsg(topicName, msgId, mqttMsg);
175 -// }  
176 -// } else {  
177 -// processDevicePublish(ctx, mqttMsg, topicName, msgId);  
178 -// }  
179 -// }  
180 -// 179 + }
  180 + } else {
  181 + processDevicePublish(ctx, mqttMsg, topicName, msgId);
  182 + }
  183 + }
  184 +
  185 + //
181 // private void handleMqttPublishMsg(String topicName, int msgId, MqttPublishMessage mqttMsg) { 186 // private void handleMqttPublishMsg(String topicName, int msgId, MqttPublishMessage mqttMsg) {
182 // try { 187 // try {
183 // switch (topicName) { 188 // switch (topicName) {
@@ -205,7 +210,23 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -205,7 +210,23 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
205 // } 210 // }
206 // } 211 // }
207 // 212 //
208 -// private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) { 213 + private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
  214 + try {
  215 + if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_TOPIC)) {
  216 + TransportProtos.PostTelemetryMsg postTelemetryMsg = adaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);
  217 + transportService.process(sessionInfo, postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));
  218 + } else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_TOPIC)) {
  219 + TransportProtos.PostAttributeMsg postAttributeMsg = adaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg);
  220 + transportService.process(sessionInfo, postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));
  221 + } else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
  222 + TransportProtos.GetAttributeRequestMsg getAttributeMsg = adaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg);
  223 + transportService.process(sessionInfo, getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));
  224 + }
  225 + } catch (AdaptorException e) {
  226 + log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
  227 + log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
  228 + ctx.close();
  229 + }
209 // AdaptorToSessionActorMsg msg = null; 230 // AdaptorToSessionActorMsg msg = null;
210 // try { 231 // try {
211 // if (topicName.equals(DEVICE_TELEMETRY_TOPIC)) { 232 // if (topicName.equals(DEVICE_TELEMETRY_TOPIC)) {
@@ -237,20 +258,38 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -237,20 +258,38 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
237 // log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId); 258 // log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
238 // ctx.close(); 259 // ctx.close();
239 // } 260 // }
240 -// }  
241 -//  
242 -// private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) {  
243 -// if (!checkConnected(ctx)) {  
244 -// return;  
245 -// }  
246 -// log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());  
247 -// List<Integer> grantedQoSList = new ArrayList<>();  
248 -// for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {  
249 -// String topic = subscription.topicName();  
250 -// MqttQoS reqQoS = subscription.qualityOfService();  
251 -// try {  
252 -// switch (topic) {  
253 -// case DEVICE_ATTRIBUTES_TOPIC: { 261 + }
  262 +
  263 + private <T> TransportServiceCallback<Void> getPubAckCallback(final ChannelHandlerContext ctx, final int msgId, final T msg) {
  264 + return new TransportServiceCallback<Void>() {
  265 + @Override
  266 + public void onSuccess(Void dummy) {
  267 + log.trace("[{}] Published msg: {}", sessionId, msg);
  268 + if (msgId > 0) {
  269 + ctx.writeAndFlush(createMqttPubAckMsg(msgId));
  270 + }
  271 + }
  272 +
  273 + @Override
  274 + public void onError(Throwable e) {
  275 + log.trace("[{}] Failed to publish msg: {}", sessionId, msg, e);
  276 + ctx.close();
  277 + }
  278 + };
  279 + }
  280 +
  281 + private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) {
  282 + if (!checkConnected(ctx)) {
  283 + return;
  284 + }
  285 + log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
  286 + List<Integer> grantedQoSList = new ArrayList<>();
  287 + for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {
  288 + String topic = subscription.topicName();
  289 + MqttQoS reqQoS = subscription.qualityOfService();
  290 + try {
  291 + switch (topic) {
  292 +// case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: {
254 // AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg); 293 // AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
255 // processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); 294 // processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
256 // registerSubQoS(topic, grantedQoSList, reqQoS); 295 // registerSubQoS(topic, grantedQoSList, reqQoS);
@@ -267,37 +306,37 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -267,37 +306,37 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
267 // case GATEWAY_RPC_TOPIC: 306 // case GATEWAY_RPC_TOPIC:
268 // registerSubQoS(topic, grantedQoSList, reqQoS); 307 // registerSubQoS(topic, grantedQoSList, reqQoS);
269 // break; 308 // break;
270 -// case DEVICE_ATTRIBUTES_RESPONSES_TOPIC:  
271 -// deviceSessionCtx.setAllowAttributeResponses();  
272 -// registerSubQoS(topic, grantedQoSList, reqQoS);  
273 -// break;  
274 -// default:  
275 -// log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);  
276 -// grantedQoSList.add(FAILURE.value());  
277 -// break;  
278 -// }  
279 -// } catch (AdaptorException e) {  
280 -// log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);  
281 -// grantedQoSList.add(FAILURE.value());  
282 -// }  
283 -// }  
284 -// ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList));  
285 -// }  
286 -//  
287 -// private void registerSubQoS(String topic, List<Integer> grantedQoSList, MqttQoS reqQoS) {  
288 -// grantedQoSList.add(getMinSupportedQos(reqQoS));  
289 -// mqttQoSMap.put(topic, getMinSupportedQos(reqQoS));  
290 -// }  
291 -//  
292 -// private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {  
293 -// if (!checkConnected(ctx)) {  
294 -// return;  
295 -// }  
296 -// log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());  
297 -// for (String topicName : mqttMsg.payload().topics()) {  
298 -// mqttQoSMap.remove(topicName);  
299 -// try {  
300 -// switch (topicName) { 309 + case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
  310 + deviceSessionCtx.setAllowAttributeResponses();
  311 + registerSubQoS(topic, grantedQoSList, reqQoS);
  312 + break;
  313 + default:
  314 + log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
  315 + grantedQoSList.add(FAILURE.value());
  316 + break;
  317 + }
  318 + } catch (Exception e) {
  319 + log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
  320 + grantedQoSList.add(FAILURE.value());
  321 + }
  322 + }
  323 + ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList));
  324 + }
  325 +
  326 + private void registerSubQoS(String topic, List<Integer> grantedQoSList, MqttQoS reqQoS) {
  327 + grantedQoSList.add(getMinSupportedQos(reqQoS));
  328 + mqttQoSMap.put(topic, getMinSupportedQos(reqQoS));
  329 + }
  330 +
  331 + private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
  332 + if (!checkConnected(ctx)) {
  333 + return;
  334 + }
  335 + log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
  336 + for (String topicName : mqttMsg.payload().topics()) {
  337 + mqttQoSMap.remove(topicName);
  338 + try {
  339 + switch (topicName) {
301 // case DEVICE_ATTRIBUTES_TOPIC: { 340 // case DEVICE_ATTRIBUTES_TOPIC: {
302 // AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg); 341 // AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
303 // processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); 342 // processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
@@ -308,23 +347,23 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -308,23 +347,23 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
308 // processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); 347 // processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
309 // break; 348 // break;
310 // } 349 // }
311 -// case DEVICE_ATTRIBUTES_RESPONSES_TOPIC:  
312 -// deviceSessionCtx.setDisallowAttributeResponses();  
313 -// break;  
314 -// }  
315 -// } catch (AdaptorException e) {  
316 -// log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName);  
317 -// }  
318 -// }  
319 -// ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId()));  
320 -// }  
321 -//  
322 -// private MqttMessage createUnSubAckMessage(int msgId) {  
323 -// MqttFixedHeader mqttFixedHeader =  
324 -// new MqttFixedHeader(UNSUBACK, false, AT_LEAST_ONCE, false, 0);  
325 -// MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId);  
326 -// return new MqttMessage(mqttFixedHeader, mqttMessageIdVariableHeader);  
327 -// } 350 + case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
  351 + deviceSessionCtx.setDisallowAttributeResponses();
  352 + break;
  353 + }
  354 + } catch (Exception e) {
  355 + log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName);
  356 + }
  357 + }
  358 + ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId()));
  359 + }
  360 +
  361 + private MqttMessage createUnSubAckMessage(int msgId) {
  362 + MqttFixedHeader mqttFixedHeader =
  363 + new MqttFixedHeader(UNSUBACK, false, AT_LEAST_ONCE, false, 0);
  364 + MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId);
  365 + return new MqttMessage(mqttFixedHeader, mqttMessageIdVariableHeader);
  366 + }
328 367
329 private void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) { 368 private void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
330 log.info("[{}] Processing connect msg for client: {}!", sessionId, msg.payload().clientIdentifier()); 369 log.info("[{}] Processing connect msg for client: {}!", sessionId, msg.payload().clientIdentifier());
@@ -346,15 +385,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -346,15 +385,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
346 new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() { 385 new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {
347 @Override 386 @Override
348 public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) { 387 public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
349 - if (!msg.hasDeviceInfo()) {  
350 - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));  
351 - ctx.close();  
352 - } else {  
353 - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));  
354 - deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());  
355 - transportService.process(deviceSessionCtx, getSessionEventMsg(SessionEvent.OPEN), null);  
356 - checkGatewaySession();  
357 - } 388 + onValidateDeviceResponse(msg, ctx);
358 } 389 }
359 390
360 @Override 391 @Override
@@ -375,15 +406,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -375,15 +406,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
375 new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() { 406 new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {
376 @Override 407 @Override
377 public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) { 408 public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
378 - if (!msg.hasDeviceInfo()) {  
379 - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));  
380 - ctx.close();  
381 - } else {  
382 - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));  
383 - deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());  
384 - transportService.process(deviceSessionCtx, getSessionEventMsg(SessionEvent.OPEN), null);  
385 - checkGatewaySession();  
386 - } 409 + onValidateDeviceResponse(msg, ctx);
387 } 410 }
388 411
389 @Override 412 @Override
@@ -415,7 +438,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -415,7 +438,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
415 private void processDisconnect(ChannelHandlerContext ctx) { 438 private void processDisconnect(ChannelHandlerContext ctx) {
416 ctx.close(); 439 ctx.close();
417 if (deviceSessionCtx.isConnected()) { 440 if (deviceSessionCtx.isConnected()) {
418 - transportService.process(deviceSessionCtx, getSessionEventMsg(SessionEvent.CLOSED), null); 441 + transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.CLOSED), null);
  442 + transportService.deregisterSession(sessionInfo);
419 if (gatewaySessionCtx != null) { 443 if (gatewaySessionCtx != null) {
420 gatewaySessionCtx.onGatewayDisconnect(); 444 gatewaySessionCtx.onGatewayDisconnect();
421 } 445 }
@@ -488,16 +512,46 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -488,16 +512,46 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
488 512
489 private SessionEventMsg getSessionEventMsg(SessionEvent event) { 513 private SessionEventMsg getSessionEventMsg(SessionEvent event) {
490 return SessionEventMsg.newBuilder() 514 return SessionEventMsg.newBuilder()
491 - .setSessionInfo(sessionInfo)  
492 - .setDeviceIdMSB(deviceSessionCtx.getDeviceIdMSB())  
493 - .setDeviceIdLSB(deviceSessionCtx.getDeviceIdLSB()) 515 + .setSessionType(TransportProtos.SessionType.ASYNC)
494 .setEvent(event).build(); 516 .setEvent(event).build();
495 } 517 }
496 518
497 @Override 519 @Override
498 public void operationComplete(Future<? super Void> future) throws Exception { 520 public void operationComplete(Future<? super Void> future) throws Exception {
499 if (deviceSessionCtx.isConnected()) { 521 if (deviceSessionCtx.isConnected()) {
500 - transportService.process(deviceSessionCtx, getSessionEventMsg(SessionEvent.CLOSED), null); 522 + transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.CLOSED), null);
  523 + transportService.deregisterSession(sessionInfo);
  524 + }
  525 + }
  526 +
  527 + private void onValidateDeviceResponse(ValidateDeviceCredentialsResponseMsg msg, ChannelHandlerContext ctx) {
  528 + if (!msg.hasDeviceInfo()) {
  529 + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
  530 + ctx.close();
  531 + } else {
  532 + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
  533 + deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
  534 + sessionInfo = SessionInfoProto.newBuilder()
  535 + .setNodeId(context.getNodeId())
  536 + .setSessionIdMSB(sessionId.getMostSignificantBits())
  537 + .setSessionIdLSB(sessionId.getLeastSignificantBits())
  538 + .setDeviceIdMSB(msg.getDeviceInfo().getDeviceIdMSB())
  539 + .setDeviceIdLSB(msg.getDeviceInfo().getDeviceIdLSB())
  540 + .setTenantIdMSB(msg.getDeviceInfo().getTenantIdMSB())
  541 + .setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB())
  542 + .build();
  543 + transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.OPEN), null);
  544 + transportService.registerSession(sessionInfo, this);
  545 + checkGatewaySession();
  546 + }
  547 + }
  548 +
  549 + @Override
  550 + public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) {
  551 + try {
  552 + adaptor.convertToPublish(deviceSessionCtx, response).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
  553 + } catch (Exception e) {
  554 + log.trace("[{}] Failed to convert device attributes to MQTT msg", sessionId, e);
501 } 555 }
502 } 556 }
503 } 557 }
@@ -17,6 +17,7 @@ package org.thingsboard.server.transport.mqtt.adaptors; @@ -17,6 +17,7 @@ package org.thingsboard.server.transport.mqtt.adaptors;
17 17
18 import com.google.gson.Gson; 18 import com.google.gson.Gson;
19 import com.google.gson.JsonElement; 19 import com.google.gson.JsonElement;
  20 +import com.google.gson.JsonObject;
20 import com.google.gson.JsonParser; 21 import com.google.gson.JsonParser;
21 import com.google.gson.JsonSyntaxException; 22 import com.google.gson.JsonSyntaxException;
22 import io.netty.buffer.ByteBuf; 23 import io.netty.buffer.ByteBuf;
@@ -25,12 +26,14 @@ import io.netty.buffer.UnpooledByteBufAllocator; @@ -25,12 +26,14 @@ import io.netty.buffer.UnpooledByteBufAllocator;
25 import io.netty.handler.codec.mqtt.*; 26 import io.netty.handler.codec.mqtt.*;
26 import lombok.extern.slf4j.Slf4j; 27 import lombok.extern.slf4j.Slf4j;
27 import org.springframework.stereotype.Component; 28 import org.springframework.stereotype.Component;
  29 +import org.springframework.util.StringUtils;
28 import org.thingsboard.server.common.data.id.SessionId; 30 import org.thingsboard.server.common.data.id.SessionId;
29 import org.thingsboard.server.common.msg.core.*; 31 import org.thingsboard.server.common.msg.core.*;
30 import org.thingsboard.server.common.msg.kv.AttributesKVMsg; 32 import org.thingsboard.server.common.msg.kv.AttributesKVMsg;
31 import org.thingsboard.server.common.msg.session.*; 33 import org.thingsboard.server.common.msg.session.*;
32 import org.thingsboard.server.common.transport.adaptor.AdaptorException; 34 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
33 import org.thingsboard.server.common.transport.adaptor.JsonConverter; 35 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
  36 +import org.thingsboard.server.gen.transport.TransportProtos;
34 import org.thingsboard.server.transport.mqtt.MqttTopics; 37 import org.thingsboard.server.transport.mqtt.MqttTopics;
35 import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx; 38 import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
36 import org.thingsboard.server.transport.mqtt.MqttTransportHandler; 39 import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
@@ -53,6 +56,64 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { @@ -53,6 +56,64 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
53 private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false); 56 private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false);
54 57
55 @Override 58 @Override
  59 + public TransportProtos.PostTelemetryMsg convertToPostTelemetry(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
  60 + String payload = validatePayload(ctx.getSessionId(), inbound.payload());
  61 + try {
  62 + return JsonConverter.convertToTelemetryProto(new JsonParser().parse(payload));
  63 + } catch (IllegalStateException | JsonSyntaxException ex) {
  64 + throw new AdaptorException(ex);
  65 + }
  66 + }
  67 +
  68 + @Override
  69 + public TransportProtos.PostAttributeMsg convertToPostAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
  70 + String payload = validatePayload(ctx.getSessionId(), inbound.payload());
  71 + try {
  72 + return JsonConverter.convertToAttributesProto(new JsonParser().parse(payload));
  73 + } catch (IllegalStateException | JsonSyntaxException ex) {
  74 + throw new AdaptorException(ex);
  75 + }
  76 + }
  77 +
  78 + @Override
  79 + public TransportProtos.GetAttributeRequestMsg convertToGetAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
  80 + String topicName = inbound.variableHeader().topicName();
  81 + try {
  82 + TransportProtos.GetAttributeRequestMsg.Builder result = TransportProtos.GetAttributeRequestMsg.newBuilder();
  83 + result.setRequestId(Integer.valueOf(topicName.substring(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX.length())));
  84 + String payload = inbound.payload().toString(UTF8);
  85 + JsonElement requestBody = new JsonParser().parse(payload);
  86 + Set<String> clientKeys = toStringSet(requestBody, "clientKeys");
  87 + Set<String> sharedKeys = toStringSet(requestBody, "sharedKeys");
  88 + if (clientKeys != null) {
  89 + result.addAllClientAttributeNames(clientKeys);
  90 + }
  91 + if (sharedKeys != null) {
  92 + result.addAllSharedAttributeNames(sharedKeys);
  93 + }
  94 + return result.build();
  95 + } catch (RuntimeException e) {
  96 + log.warn("Failed to decode get attributes request", e);
  97 + throw new AdaptorException(e);
  98 + }
  99 + }
  100 +
  101 + @Override
  102 + public Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException {
  103 + if (!StringUtils.isEmpty(responseMsg.getError())) {
  104 + throw new AdaptorException(responseMsg.getError());
  105 + } else {
  106 + Integer requestId = responseMsg.getRequestId();
  107 + if (requestId >= 0) {
  108 + return Optional.of(createMqttPublishMsg(ctx,
  109 + MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_TOPIC_PREFIX + requestId,
  110 + JsonConverter.toJson(responseMsg)));
  111 + }
  112 + return Optional.empty();
  113 + }
  114 + }
  115 +
  116 + @Override
56 public AdaptorToSessionActorMsg convertToActorMsg(DeviceSessionCtx ctx, SessionMsgType type, MqttMessage inbound) throws AdaptorException { 117 public AdaptorToSessionActorMsg convertToActorMsg(DeviceSessionCtx ctx, SessionMsgType type, MqttMessage inbound) throws AdaptorException {
57 FromDeviceMsg msg; 118 FromDeviceMsg msg;
58 switch (type) { 119 switch (type) {
@@ -16,11 +16,24 @@ @@ -16,11 +16,24 @@
16 package org.thingsboard.server.transport.mqtt.adaptors; 16 package org.thingsboard.server.transport.mqtt.adaptors;
17 17
18 import io.netty.handler.codec.mqtt.MqttMessage; 18 import io.netty.handler.codec.mqtt.MqttMessage;
  19 +import io.netty.handler.codec.mqtt.MqttPublishMessage;
19 import org.thingsboard.server.common.transport.TransportAdaptor; 20 import org.thingsboard.server.common.transport.TransportAdaptor;
  21 +import org.thingsboard.server.common.transport.adaptor.AdaptorException;
  22 +import org.thingsboard.server.gen.transport.TransportProtos;
20 import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx; 23 import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
21 24
  25 +import java.util.Optional;
  26 +
22 /** 27 /**
23 * @author Andrew Shvayka 28 * @author Andrew Shvayka
24 */ 29 */
25 public interface MqttTransportAdaptor extends TransportAdaptor<DeviceSessionCtx, MqttMessage, MqttMessage> { 30 public interface MqttTransportAdaptor extends TransportAdaptor<DeviceSessionCtx, MqttMessage, MqttMessage> {
  31 +
  32 + TransportProtos.PostTelemetryMsg convertToPostTelemetry(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
  33 +
  34 + TransportProtos.PostAttributeMsg convertToPostAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
  35 +
  36 + TransportProtos.GetAttributeRequestMsg convertToGetAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
  37 +
  38 + Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException;
26 } 39 }
@@ -17,6 +17,7 @@ package org.thingsboard.server.transport.mqtt.session; @@ -17,6 +17,7 @@ package org.thingsboard.server.transport.mqtt.session;
17 17
18 import io.netty.channel.ChannelHandlerContext; 18 import io.netty.channel.ChannelHandlerContext;
19 import io.netty.handler.codec.mqtt.*; 19 import io.netty.handler.codec.mqtt.*;
  20 +import lombok.Getter;
20 import lombok.extern.slf4j.Slf4j; 21 import lombok.extern.slf4j.Slf4j;
21 import org.thingsboard.server.common.data.id.SessionId; 22 import org.thingsboard.server.common.data.id.SessionId;
22 import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg; 23 import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg;
@@ -41,12 +42,13 @@ import java.util.concurrent.atomic.AtomicInteger; @@ -41,12 +42,13 @@ import java.util.concurrent.atomic.AtomicInteger;
41 public class DeviceSessionCtx extends MqttDeviceAwareSessionContext { 42 public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
42 43
43 private final MqttSessionId sessionId; 44 private final MqttSessionId sessionId;
  45 + @Getter
44 private ChannelHandlerContext channel; 46 private ChannelHandlerContext channel;
45 private volatile boolean allowAttributeResponses; 47 private volatile boolean allowAttributeResponses;
46 private AtomicInteger msgIdSeq = new AtomicInteger(0); 48 private AtomicInteger msgIdSeq = new AtomicInteger(0);
47 49
48 public DeviceSessionCtx(ConcurrentMap<String, Integer> mqttQoSMap) { 50 public DeviceSessionCtx(ConcurrentMap<String, Integer> mqttQoSMap) {
49 - super(null, null, null); 51 + super(null, null, mqttQoSMap);
50 this.sessionId = new MqttSessionId(); 52 this.sessionId = new MqttSessionId();
51 } 53 }
52 54
1 /** 1 /**
2 * Copyright © 2016-2018 The Thingsboard Authors 2 * Copyright © 2016-2018 The Thingsboard Authors
3 - * <p> 3 + *
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 - * <p>  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * <p> 7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -111,8 +111,8 @@ public class MqttTransportService implements TransportService { @@ -111,8 +111,8 @@ public class MqttTransportService implements TransportService {
111 TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TransportApiResponseMsg> responseBuilder = TBKafkaConsumerTemplate.builder(); 111 TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TransportApiResponseMsg> responseBuilder = TBKafkaConsumerTemplate.builder();
112 responseBuilder.settings(kafkaSettings); 112 responseBuilder.settings(kafkaSettings);
113 responseBuilder.topic(transportApiResponsesTopic + "." + transportContext.getNodeId()); 113 responseBuilder.topic(transportApiResponsesTopic + "." + transportContext.getNodeId());
114 - responseBuilder.clientId(transportContext.getNodeId());  
115 - responseBuilder.groupId(null); 114 + responseBuilder.clientId("transport-api-client-" + transportContext.getNodeId());
  115 + responseBuilder.groupId("transport-api-client");
116 responseBuilder.autoCommit(true); 116 responseBuilder.autoCommit(true);
117 responseBuilder.autoCommitIntervalMs(autoCommitInterval); 117 responseBuilder.autoCommitIntervalMs(autoCommitInterval);
118 responseBuilder.decoder(new TransportApiResponseDecoder()); 118 responseBuilder.decoder(new TransportApiResponseDecoder());
@@ -137,8 +137,8 @@ public class MqttTransportService implements TransportService { @@ -137,8 +137,8 @@ public class MqttTransportService implements TransportService {
137 TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<ToTransportMsg> mainConsumerBuilder = TBKafkaConsumerTemplate.builder(); 137 TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<ToTransportMsg> mainConsumerBuilder = TBKafkaConsumerTemplate.builder();
138 mainConsumerBuilder.settings(kafkaSettings); 138 mainConsumerBuilder.settings(kafkaSettings);
139 mainConsumerBuilder.topic(notificationsTopic + "." + transportContext.getNodeId()); 139 mainConsumerBuilder.topic(notificationsTopic + "." + transportContext.getNodeId());
140 - mainConsumerBuilder.clientId(transportContext.getNodeId());  
141 - mainConsumerBuilder.groupId(null); 140 + mainConsumerBuilder.clientId("transport-" + transportContext.getNodeId());
  141 + mainConsumerBuilder.groupId("transport");
142 mainConsumerBuilder.autoCommit(true); 142 mainConsumerBuilder.autoCommit(true);
143 mainConsumerBuilder.autoCommitIntervalMs(notificationsAutoCommitInterval); 143 mainConsumerBuilder.autoCommitIntervalMs(notificationsAutoCommitInterval);
144 mainConsumerBuilder.decoder(new ToTransportMsgResponseDecoder()); 144 mainConsumerBuilder.decoder(new ToTransportMsgResponseDecoder());
@@ -243,6 +243,15 @@ public class MqttTransportService implements TransportService { @@ -243,6 +243,15 @@ public class MqttTransportService implements TransportService {
243 } 243 }
244 244
245 @Override 245 @Override
  246 + public void process(SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
  247 + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
  248 + TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
  249 + .setGetAttributes(msg).build()
  250 + ).build();
  251 + send(sessionInfo, toRuleEngineMsg, callback);
  252 + }
  253 +
  254 + @Override
246 public void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener) { 255 public void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener) {
247 sessions.putIfAbsent(toId(sessionInfo), listener); 256 sessions.putIfAbsent(toId(sessionInfo), listener);
248 //TODO: monitor sessions periodically: PING REQ/RESP, etc. 257 //TODO: monitor sessions periodically: PING REQ/RESP, etc.
@@ -271,9 +280,13 @@ public class MqttTransportService implements TransportService { @@ -271,9 +280,13 @@ public class MqttTransportService implements TransportService {
271 @Override 280 @Override
272 public void onCompletion(RecordMetadata metadata, Exception exception) { 281 public void onCompletion(RecordMetadata metadata, Exception exception) {
273 if (exception == null) { 282 if (exception == null) {
274 - callback.onSuccess(null); 283 + if (callback != null) {
  284 + callback.onSuccess(null);
  285 + }
275 } else { 286 } else {
276 - callback.onError(exception); 287 + if (callback != null) {
  288 + callback.onError(exception);
  289 + }
277 } 290 }
278 } 291 }
279 } 292 }
1 /** 1 /**
2 * Copyright © 2016-2018 The Thingsboard Authors 2 * Copyright © 2016-2018 The Thingsboard Authors
3 - * <p> 3 + *
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 - * <p>  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * <p> 7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.