Commit 5a80d75584d09b33a88b25fd63d9e41e95e2eafc

Authored by Andrew Shvayka
1 parent 2637babf

Added support of attribute update notifications

1 1 /**
2 2 * Copyright © 2016-2018 The Thingsboard Authors
3   - *
  3 + * <p>
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ... @@ -216,12 +216,16 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
216 216
217 217 void process(ActorContext context, TransportToDeviceActorMsgWrapper wrapper) {
218 218 TransportToDeviceActorMsg msg = wrapper.getMsg();
219   -// processSubscriptionCommands(context, msg);
220 219 // processRpcResponses(context, msg);
221 220 if (msg.hasSessionEvent()) {
222 221 processSessionStateMsgs(msg.getSessionInfo(), msg.getSessionEvent());
223 222 }
224   -
  223 + if (msg.hasSubscribeToAttributes()) {
  224 + processSubscriptionCommands(context, msg.getSessionInfo(), msg.getSubscribeToAttributes());
  225 + }
  226 + if (msg.hasSubscribeToRPC()) {
  227 + processSubscriptionCommands(context, msg.getSessionInfo(), msg.getSubscribeToRPC());
  228 + }
225 229 if (msg.hasPostAttributes()) {
226 230 handlePostAttributesRequest(context, msg.getSessionInfo(), msg.getPostAttributes());
227 231 reportActivity();
... ... @@ -236,9 +240,6 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
236 240 // SessionMsgType sessionMsgType = msg.getPayload().getMsgType();
237 241 // if (sessionMsgType.requiresRulesProcessing()) {
238 242 // switch (sessionMsgType) {
239   -// case GET_ATTRIBUTES_REQUEST:
240   -// handleGetAttributesRequest(msg);
241   -// break;
242 243 // case TO_SERVER_RPC_REQUEST:
243 244 // handleClientSideRPCRequest(context, msg);
244 245 // reportActivity();
... ... @@ -262,7 +263,6 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
262 263 private void handleGetAttributesRequest(ActorContext context, SessionInfoProto sessionInfo, GetAttributeRequestMsg request) {
263 264 ListenableFuture<List<AttributeKvEntry>> clientAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.CLIENT_SCOPE, toOptionalSet(request.getClientAttributeNamesList()));
264 265 ListenableFuture<List<AttributeKvEntry>> sharedAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.SHARED_SCOPE, toOptionalSet(request.getSharedAttributeNamesList()));
265   - UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
266 266 int requestId = request.getRequestId();
267 267 Futures.addCallback(Futures.allAsList(Arrays.asList(clientAttributesFuture, sharedAttributesFuture)), new FutureCallback<List<List<AttributeKvEntry>>>() {
268 268 @Override
... ... @@ -272,7 +272,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
272 272 .addAllClientAttributeList(toTsKvProtos(result.get(0)))
273 273 .addAllSharedAttributeList(toTsKvProtos(result.get(1)))
274 274 .build();
275   - sendToTransport(responseMsg, sessionId, sessionInfo);
  275 + sendToTransport(responseMsg, sessionInfo);
276 276 }
277 277
278 278 @Override
... ... @@ -280,7 +280,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
280 280 GetAttributeResponseMsg responseMsg = GetAttributeResponseMsg.newBuilder()
281 281 .setError(t.getMessage())
282 282 .build();
283   - sendToTransport(responseMsg, sessionId, sessionInfo);
  283 + sendToTransport(responseMsg, sessionInfo);
284 284 }
285 285 });
286 286 }
... ... @@ -353,28 +353,37 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
353 353
354 354 void processAttributesUpdate(ActorContext context, DeviceAttributesEventNotificationMsg msg) {
355 355 if (attributeSubscriptions.size() > 0) {
356   - ToDeviceMsg notification = null;
  356 + boolean hasNotificationData = false;
  357 + AttributeUpdateNotificationMsg.Builder notification = AttributeUpdateNotificationMsg.newBuilder();
357 358 if (msg.isDeleted()) {
358   - List<AttributeKey> sharedKeys = msg.getDeletedKeys().stream()
  359 + List<String> sharedKeys = msg.getDeletedKeys().stream()
359 360 .filter(key -> DataConstants.SHARED_SCOPE.equals(key.getScope()))
  361 + .map(AttributeKey::getAttributeKey)
360 362 .collect(Collectors.toList());
361   - notification = new AttributesUpdateNotification(BasicAttributeKVMsg.fromDeleted(sharedKeys));
  363 + if (!sharedKeys.isEmpty()) {
  364 + notification.addAllSharedDeleted(sharedKeys);
  365 + hasNotificationData = true;
  366 + }
362 367 } else {
363 368 if (DataConstants.SHARED_SCOPE.equals(msg.getScope())) {
364 369 List<AttributeKvEntry> attributes = new ArrayList<>(msg.getValues());
365 370 if (attributes.size() > 0) {
366   - notification = new AttributesUpdateNotification(BasicAttributeKVMsg.fromShared(attributes));
  371 + List<TsKvProto> sharedUpdated = msg.getValues().stream().map(this::toTsKvProto)
  372 + .collect(Collectors.toList());
  373 + if (!sharedUpdated.isEmpty()) {
  374 + notification.addAllSharedUpdated(sharedUpdated);
  375 + hasNotificationData = true;
  376 + }
367 377 } else {
368 378 logger.debug("[{}] No public server side attributes changed!", deviceId);
369 379 }
370 380 }
371 381 }
372   - if (notification != null) {
373   - ToDeviceMsg finalNotification = notification;
374   -// attributeSubscriptions.entrySet().forEach(sub -> {
375   -// ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(finalNotification, sub.getKey());
376   -// sendMsgToSessionActor(response, sub.getValue().getServer());
377   -// });
  382 + if (hasNotificationData) {
  383 + AttributeUpdateNotificationMsg finalNotification = notification.build();
  384 + attributeSubscriptions.entrySet().forEach(sub -> {
  385 + sendToTransport(finalNotification, sub.getKey(), sub.getValue());
  386 + });
378 387 }
379 388 } else {
380 389 logger.debug("[{}] No registered attributes subscriptions to process!", deviceId);
... ... @@ -414,25 +423,35 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
414 423 // }
415 424 }
416 425
417   -// private void processSubscriptionCommands(ActorContext context, DeviceToDeviceActorMsg msg) {
418   -// SessionId sessionId = msg.getSessionId();
419   -// SessionType sessionType = msg.getSessionType();
420   -// FromDeviceMsg inMsg = msg.getPayload();
421   -// if (inMsg.getMsgType() == SessionMsgType.SUBSCRIBE_ATTRIBUTES_REQUEST) {
422   -// logger.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId);
423   -// attributeSubscriptions.put(sessionId, new SessionInfo(sessionType, msg.getServerAddress()));
424   -// } else if (inMsg.getMsgType() == SessionMsgType.UNSUBSCRIBE_ATTRIBUTES_REQUEST) {
425   -// logger.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId);
426   -// attributeSubscriptions.remove(sessionId);
427   -// } else if (inMsg.getMsgType() == SessionMsgType.SUBSCRIBE_RPC_COMMANDS_REQUEST) {
428   -// logger.debug("[{}] Registering rpc subscription for session [{}][{}]", deviceId, sessionId, sessionType);
429   -// rpcSubscriptions.put(sessionId, new SessionInfo(sessionType, msg.getServerAddress()));
430   -// sendPendingRequests(context, sessionId, sessionType, msg.getServerAddress());
431   -// } else if (inMsg.getMsgType() == SessionMsgType.UNSUBSCRIBE_RPC_COMMANDS_REQUEST) {
432   -// logger.debug("[{}] Canceling rpc subscription for session [{}][{}]", deviceId, sessionId, sessionType);
433   -// rpcSubscriptions.remove(sessionId);
434   -// }
435   -// }
  426 + private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg subscribeCmd) {
  427 + UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
  428 + if (subscribeCmd.getUnsubscribe()) {
  429 + logger.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId);
  430 + attributeSubscriptions.remove(sessionId);
  431 + } else {
  432 + SessionInfo session = sessions.get(sessionId);
  433 + if (session == null) {
  434 + session = new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId());
  435 + }
  436 + logger.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId);
  437 + attributeSubscriptions.put(sessionId, session);
  438 + }
  439 + }
  440 +
  441 + private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToRPCMsg subscribeCmd) {
  442 + UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
  443 + if (subscribeCmd.getUnsubscribe()) {
  444 + logger.debug("[{}] Canceling rpc subscription for session [{}]", deviceId, sessionId);
  445 + rpcSubscriptions.remove(sessionId);
  446 + } else {
  447 + SessionInfo session = sessions.get(sessionId);
  448 + if (session == null) {
  449 + session = new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId());
  450 + }
  451 + logger.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId);
  452 + rpcSubscriptions.put(sessionId, session);
  453 + }
  454 + }
436 455
437 456 private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) {
438 457 UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
... ... @@ -521,11 +540,19 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
521 540 }
522 541 }
523 542
524   - private void sendToTransport(GetAttributeResponseMsg responseMsg, UUID sessionId, SessionInfoProto sessionInfo) {
  543 + private void sendToTransport(GetAttributeResponseMsg responseMsg, SessionInfoProto sessionInfo) {
  544 + DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder()
  545 + .setSessionIdMSB(sessionInfo.getSessionIdMSB())
  546 + .setSessionIdLSB(sessionInfo.getSessionIdLSB())
  547 + .setGetAttributesResponse(responseMsg).build();
  548 + systemContext.getRuleEngineTransportService().process(sessionInfo.getNodeId(), msg);
  549 + }
  550 +
  551 + private void sendToTransport(AttributeUpdateNotificationMsg notificationMsg, UUID sessionId, SessionInfo sessionInfo) {
525 552 DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder()
526 553 .setSessionIdMSB(sessionId.getMostSignificantBits())
527 554 .setSessionIdLSB(sessionId.getLeastSignificantBits())
528   - .setGetAttributesResponse(responseMsg).build();
  555 + .setAttributeUpdateNotification(notificationMsg).build();
529 556 systemContext.getRuleEngineTransportService().process(sessionInfo.getNodeId(), msg);
530 557 }
531 558
... ...
... ... @@ -25,5 +25,4 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionType;
25 25 public class SessionInfo {
26 26 private final SessionType type;
27 27 private final String nodeId;
28   -
29 28 }
... ...
1 1 /**
2 2 * Copyright © 2016-2018 The Thingsboard Authors
3   - *
  3 + * <p>
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ... @@ -15,6 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.common.transport;
17 17
  18 +import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
18 19 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
19 20
20 21 /**
... ... @@ -23,4 +24,6 @@ import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponse
23 24 public interface SessionMsgListener {
24 25
25 26 void onGetAttributesResponse(GetAttributeResponseMsg getAttributesResponse);
  27 +
  28 + void onAttributeUpdate(AttributeUpdateNotificationMsg attributeUpdateNotification);
26 29 }
... ...
... ... @@ -15,6 +15,8 @@
15 15 */
16 16 package org.thingsboard.server.common.transport;
17 17
  18 +import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
  19 +import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
18 20 import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
19 21 import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
20 22 import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
... ... @@ -43,6 +45,10 @@ public interface TransportService {
43 45
44 46 void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback);
45 47
  48 + void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback);
  49 +
  50 + void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback);
  51 +
46 52 void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener);
47 53
48 54 void deregisterSession(SessionInfoProto sessionInfo);
... ...
1 1 /**
2 2 * Copyright © 2016-2018 The Thingsboard Authors
3   - *
  3 + * <p>
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ... @@ -39,6 +39,7 @@ import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
39 39 import org.thingsboard.server.common.msg.core.ToServerRpcRequestMsg;
40 40 import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg;
41 41 import org.thingsboard.server.common.msg.kv.AttributesKVMsg;
  42 +import org.thingsboard.server.gen.transport.TransportProtos;
42 43 import org.thingsboard.server.gen.transport.TransportProtos.*;
43 44
44 45 import java.util.ArrayList;
... ... @@ -153,20 +154,6 @@ public class JsonConverter {
153 154 return result;
154 155 }
155 156
156   - private static void parseNumericProto(List<KvEntry> result, Entry<String, JsonElement> valueEntry, JsonPrimitive value) {
157   - if (value.getAsString().contains(".")) {
158   - result.add(new DoubleDataEntry(valueEntry.getKey(), value.getAsDouble()));
159   - } else {
160   - try {
161   - long longValue = Long.parseLong(value.getAsString());
162   - result.add(new LongDataEntry(valueEntry.getKey(), longValue));
163   - } catch (NumberFormatException e) {
164   - throw new JsonSyntaxException("Big integer values are not supported!");
165   - }
166   - }
167   - }
168   -
169   -
170 157 private static TelemetryUploadRequest convertToTelemetry(JsonElement jsonObject, long systemTs, int requestId) throws JsonSyntaxException {
171 158 BasicTelemetryUploadRequest request = new BasicTelemetryUploadRequest(requestId);
172 159 if (jsonObject.isJsonObject()) {
... ... @@ -283,6 +270,19 @@ public class JsonConverter {
283 270 return result;
284 271 }
285 272
  273 + public static JsonElement toJson(AttributeUpdateNotificationMsg payload) {
  274 + JsonObject result = new JsonObject();
  275 + if (payload.getSharedUpdatedCount() > 0) {
  276 + payload.getSharedUpdatedList().forEach(addToObjectFromProto(result));
  277 + }
  278 + if (payload.getSharedDeletedCount() > 0) {
  279 + JsonArray attrObject = new JsonArray();
  280 + payload.getSharedDeletedList().forEach(attrObject::add);
  281 + result.add("deleted", attrObject);
  282 + }
  283 + return result;
  284 + }
  285 +
286 286 public static JsonObject toJson(AttributesKVMsg payload, boolean asMap) {
287 287 JsonObject result = new JsonObject();
288 288 if (asMap) {
... ... @@ -377,4 +377,5 @@ public class JsonConverter {
377 377 error.addProperty("error", errorMsg);
378 378 return error;
379 379 }
  380 +
380 381 }
... ...
... ... @@ -108,6 +108,11 @@ message GetAttributeResponseMsg {
108 108 string error = 5;
109 109 }
110 110
  111 +message AttributeUpdateNotificationMsg {
  112 + repeated TsKvProto sharedUpdated = 1;
  113 + repeated string sharedDeleted = 2;
  114 +}
  115 +
111 116 message ValidateDeviceTokenRequestMsg {
112 117 string token = 1;
113 118 }
... ... @@ -124,12 +129,22 @@ message SessionCloseNotificationProto {
124 129 string message = 1;
125 130 }
126 131
  132 +message SubscribeToAttributeUpdatesMsg {
  133 + bool unsubscribe = 1;
  134 +}
  135 +
  136 +message SubscribeToRPCMsg {
  137 + bool unsubscribe = 1;
  138 +}
  139 +
127 140 message TransportToDeviceActorMsg {
128 141 SessionInfoProto sessionInfo = 1;
129 142 SessionEventMsg sessionEvent = 2;
130 143 PostTelemetryMsg postTelemetry = 3;
131 144 PostAttributeMsg postAttributes = 4;
132 145 GetAttributeRequestMsg getAttributes = 5;
  146 + SubscribeToAttributeUpdatesMsg subscribeToAttributes = 6;
  147 + SubscribeToRPCMsg subscribeToRPC= 7;
133 148 }
134 149
135 150 message DeviceActorToTransportMsg {
... ... @@ -137,6 +152,7 @@ message DeviceActorToTransportMsg {
137 152 int64 sessionIdLSB = 2;
138 153 SessionCloseNotificationProto sessionCloseNotification = 3;
139 154 GetAttributeResponseMsg getAttributesResponse = 4;
  155 + AttributeUpdateNotificationMsg attributeUpdateNotification = 5;
140 156 }
141 157
142 158 /**
... ...
1 1 /**
2 2 * Copyright © 2016-2018 The Thingsboard Authors
3   - *
  3 + * <p>
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ... @@ -71,6 +71,7 @@ import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEP
71 71 import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
72 72 import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
73 73 import static io.netty.handler.codec.mqtt.MqttMessageType.CONNACK;
  74 +import static io.netty.handler.codec.mqtt.MqttMessageType.PINGRESP;
74 75 import static io.netty.handler.codec.mqtt.MqttMessageType.PUBACK;
75 76 import static io.netty.handler.codec.mqtt.MqttMessageType.SUBACK;
76 77 import static io.netty.handler.codec.mqtt.MqttMessageType.UNSUBACK;
... ... @@ -148,16 +149,17 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
148 149 case UNSUBSCRIBE:
149 150 processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
150 151 break;
151   -// case PINGREQ:
152   -// if (checkConnected(ctx)) {
153   -// ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
154   -// }
155   -// break;
156   -// case DISCONNECT:
157   -// if (checkConnected(ctx)) {
158   -// processDisconnect(ctx);
159   -// }
160   -// break;
  152 + case PINGREQ:
  153 + //TODO: should we push the notification to the rule engine?
  154 + if (checkConnected(ctx)) {
  155 + ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
  156 + }
  157 + break;
  158 + case DISCONNECT:
  159 + if (checkConnected(ctx)) {
  160 + processDisconnect(ctx);
  161 + }
  162 + break;
161 163 default:
162 164 break;
163 165 }
... ... @@ -289,25 +291,20 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
289 291 MqttQoS reqQoS = subscription.qualityOfService();
290 292 try {
291 293 switch (topic) {
292   -// case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: {
293   -// AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
294   -// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
295   -// registerSubQoS(topic, grantedQoSList, reqQoS);
296   -// break;
297   -// }
298   -// case DEVICE_RPC_REQUESTS_SUB_TOPIC: {
299   -// AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
300   -// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
301   -// registerSubQoS(topic, grantedQoSList, reqQoS);
302   -// break;
303   -// }
304   -// case DEVICE_RPC_RESPONSE_SUB_TOPIC:
305   -// case GATEWAY_ATTRIBUTES_TOPIC:
306   -// case GATEWAY_RPC_TOPIC:
307   -// registerSubQoS(topic, grantedQoSList, reqQoS);
308   -// break;
  294 + case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: {
  295 + transportService.process(sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), null);
  296 + registerSubQoS(topic, grantedQoSList, reqQoS);
  297 + break;
  298 + }
  299 + case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: {
  300 + transportService.process(sessionInfo, TransportProtos.SubscribeToRPCMsg.newBuilder().build(), null);
  301 + registerSubQoS(topic, grantedQoSList, reqQoS);
  302 + break;
  303 + }
  304 + case MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC:
  305 + case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC:
  306 + case MqttTopics.GATEWAY_RPC_TOPIC:
309 307 case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
310   - deviceSessionCtx.setAllowAttributeResponses();
311 308 registerSubQoS(topic, grantedQoSList, reqQoS);
312 309 break;
313 310 default:
... ... @@ -337,19 +334,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
337 334 mqttQoSMap.remove(topicName);
338 335 try {
339 336 switch (topicName) {
340   -// case DEVICE_ATTRIBUTES_TOPIC: {
341   -// AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
342   -// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
343   -// break;
344   -// }
345   -// case DEVICE_RPC_REQUESTS_SUB_TOPIC: {
346   -// AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
347   -// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
348   -// break;
349   -// }
350   - case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
351   - deviceSessionCtx.setDisallowAttributeResponses();
  337 + case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: {
  338 + transportService.process(sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setUnsubscribe(true).build(), null);
  339 + break;
  340 + }
  341 + case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: {
  342 + transportService.process(sessionInfo, TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(), null);
352 343 break;
  344 + }
353 345 }
354 346 } catch (Exception e) {
355 347 log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName);
... ... @@ -551,7 +543,16 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
551 543 try {
552 544 adaptor.convertToPublish(deviceSessionCtx, response).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
553 545 } catch (Exception e) {
554   - log.trace("[{}] Failed to convert device attributes to MQTT msg", sessionId, e);
  546 + log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
  547 + }
  548 + }
  549 +
  550 + @Override
  551 + public void onAttributeUpdate(TransportProtos.AttributeUpdateNotificationMsg notification) {
  552 + try {
  553 + adaptor.convertToPublish(deviceSessionCtx, notification).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
  554 + } catch (Exception e) {
  555 + log.trace("[{}] Failed to convert device attributes update to MQTT msg", sessionId, e);
555 556 }
556 557 }
557 558 }
... ...
1 1 /**
2 2 * Copyright © 2016-2018 The Thingsboard Authors
3   - *
  3 + * <p>
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ... @@ -114,6 +114,13 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
114 114 }
115 115
116 116 @Override
  117 + public Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException {
  118 + return Optional.of(createMqttPublishMsg(ctx,
  119 + MqttTopics.DEVICE_ATTRIBUTES_TOPIC,
  120 + JsonConverter.toJson(notificationMsg)));
  121 + }
  122 +
  123 + @Override
117 124 public AdaptorToSessionActorMsg convertToActorMsg(DeviceSessionCtx ctx, SessionMsgType type, MqttMessage inbound) throws AdaptorException {
118 125 FromDeviceMsg msg;
119 126 switch (type) {
... ...
... ... @@ -36,4 +36,8 @@ public interface MqttTransportAdaptor extends TransportAdaptor<DeviceSessionCtx,
36 36 TransportProtos.GetAttributeRequestMsg convertToGetAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
37 37
38 38 Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException;
  39 +
  40 + Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException;
  41 +
  42 +
39 43 }
... ...
... ... @@ -44,7 +44,6 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
44 44 private final MqttSessionId sessionId;
45 45 @Getter
46 46 private ChannelHandlerContext channel;
47   - private volatile boolean allowAttributeResponses;
48 47 private AtomicInteger msgIdSeq = new AtomicInteger(0);
49 48
50 49 public DeviceSessionCtx(ConcurrentMap<String, Integer> mqttQoSMap) {
... ... @@ -103,14 +102,6 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
103 102 this.channel = channel;
104 103 }
105 104
106   - public void setAllowAttributeResponses() {
107   - allowAttributeResponses = true;
108   - }
109   -
110   - public void setDisallowAttributeResponses() {
111   - allowAttributeResponses = false;
112   - }
113   -
114 105 public int nextMsgId() {
115 106 return msgIdSeq.incrementAndGet();
116 107 }
... ...
1 1 /**
2 2 * Copyright © 2016-2018 The Thingsboard Authors
3   - *
  3 + * <p>
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ... @@ -161,6 +161,9 @@ public class MqttTransportService implements TransportService {
161 161 if (toSessionMsg.hasGetAttributesResponse()) {
162 162 listener.onGetAttributesResponse(toSessionMsg.getGetAttributesResponse());
163 163 }
  164 + if (toSessionMsg.hasAttributeUpdateNotification()) {
  165 + listener.onAttributeUpdate(toSessionMsg.getAttributeUpdateNotification());
  166 + }
164 167 });
165 168 } else {
166 169 //TODO: should we notify the device actor about missed session?
... ... @@ -252,6 +255,24 @@ public class MqttTransportService implements TransportService {
252 255 }
253 256
254 257 @Override
  258 + public void process(SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
  259 + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
  260 + TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
  261 + .setSubscribeToAttributes(msg).build()
  262 + ).build();
  263 + send(sessionInfo, toRuleEngineMsg, callback);
  264 + }
  265 +
  266 + @Override
  267 + public void process(SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
  268 + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
  269 + TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
  270 + .setSubscribeToRPC(msg).build()
  271 + ).build();
  272 + send(sessionInfo, toRuleEngineMsg, callback);
  273 + }
  274 +
  275 + @Override
255 276 public void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener) {
256 277 sessions.putIfAbsent(toId(sessionInfo), listener);
257 278 //TODO: monitor sessions periodically: PING REQ/RESP, etc.
... ...