Commit 58ec7a335af732810cad08cca96e109a44a7d6a2

Authored by Andrew Shvayka
1 parent 54215215

Credentials revocation

Showing 17 changed files with 214 additions and 41 deletions
... ... @@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.id.TenantId;
27 27 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
28 28 import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
29 29 import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg;
  30 +import org.thingsboard.server.extensions.api.device.DeviceCredentialsUpdateNotificationMsg;
30 31 import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
31 32 import org.thingsboard.server.extensions.api.plugins.msg.*;
32 33
... ... @@ -58,6 +59,8 @@ public class DeviceActor extends ContextAwareActor {
58 59 processor.processAttributesUpdate(context(), (DeviceAttributesEventNotificationMsg) msg);
59 60 } else if (msg instanceof ToDeviceRpcRequestPluginMsg) {
60 61 processor.processRpcRequest(context(), (ToDeviceRpcRequestPluginMsg) msg);
  62 + } else if (msg instanceof DeviceCredentialsUpdateNotificationMsg){
  63 + processor.processCredentialsUpdate(context(), (DeviceCredentialsUpdateNotificationMsg) msg);
61 64 }
62 65 } else if (msg instanceof TimeoutMsg) {
63 66 processor.processTimeout(context(), (TimeoutMsg) msg);
... ...
... ... @@ -32,13 +32,7 @@ import org.thingsboard.server.common.data.kv.AttributeKey;
32 32 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
33 33 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
34 34 import org.thingsboard.server.common.msg.cluster.ServerAddress;
35   -import org.thingsboard.server.common.msg.core.AttributesUpdateNotification;
36   -import org.thingsboard.server.common.msg.core.BasicCommandAckResponse;
37   -import org.thingsboard.server.common.msg.core.BasicToDeviceSessionActorMsg;
38   -import org.thingsboard.server.common.msg.core.SessionCloseMsg;
39   -import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
40   -import org.thingsboard.server.common.msg.core.ToDeviceRpcResponseMsg;
41   -import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
  35 +import org.thingsboard.server.common.msg.core.*;
42 36 import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
43 37 import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg;
44 38 import org.thingsboard.server.common.msg.session.FromDeviceMsg;
... ... @@ -47,6 +41,7 @@ import org.thingsboard.server.common.msg.session.SessionType;
47 41 import org.thingsboard.server.common.msg.session.ToDeviceMsg;
48 42 import org.thingsboard.server.extensions.api.device.DeviceAttributes;
49 43 import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg;
  44 +import org.thingsboard.server.extensions.api.device.DeviceCredentialsUpdateNotificationMsg;
50 45 import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
51 46 import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
52 47 import org.thingsboard.server.extensions.api.plugins.msg.TimeoutIntMsg;
... ... @@ -74,6 +69,7 @@ import java.util.stream.Collectors;
74 69 public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
75 70
76 71 private final DeviceId deviceId;
  72 + private final Map<SessionId, SessionInfo> sessions;
77 73 private final Map<SessionId, SessionInfo> attributeSubscriptions;
78 74 private final Map<SessionId, SessionInfo> rpcSubscriptions;
79 75
... ... @@ -85,6 +81,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
85 81 public DeviceActorMessageProcessor(ActorSystemContext systemContext, LoggingAdapter logger, DeviceId deviceId) {
86 82 super(systemContext, logger);
87 83 this.deviceId = deviceId;
  84 + this.sessions = new HashMap<>();
88 85 this.attributeSubscriptions = new HashMap<>();
89 86 this.rpcSubscriptions = new HashMap<>();
90 87 this.rpcPendingMap = new HashMap<>();
... ... @@ -281,7 +278,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
281 278 if (!msg.isAdded()) {
282 279 logger.debug("[{}] Clearing attributes/rpc subscription for server [{}]", deviceId, msg.getServerAddress());
283 280 Predicate<Map.Entry<SessionId, SessionInfo>> filter = e -> e.getValue().getServer()
284   - .map(serverAddress -> serverAddress.equals(msg.getServerAddress())).orElse(false);
  281 + .map(serverAddress -> serverAddress.equals(msg.getServerAddress())).orElse(false);
285 282 attributeSubscriptions.entrySet().removeIf(filter);
286 283 rpcSubscriptions.entrySet().removeIf(filter);
287 284 }
... ... @@ -342,8 +339,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
342 339 private void processSessionStateMsgs(ToDeviceActorMsg msg) {
343 340 SessionId sessionId = msg.getSessionId();
344 341 FromDeviceMsg inMsg = msg.getPayload();
345   - if (inMsg instanceof SessionCloseMsg) {
  342 + if (inMsg instanceof SessionOpenMsg) {
  343 + logger.debug("[{}] Processing new session [{}]", deviceId, sessionId);
  344 + sessions.put(sessionId, new SessionInfo(SessionType.ASYNC, msg.getServerAddress()));
  345 + } else if (inMsg instanceof SessionCloseMsg) {
346 346 logger.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId);
  347 + sessions.remove(sessionId);
347 348 attributeSubscriptions.remove(sessionId);
348 349 rpcSubscriptions.remove(sessionId);
349 350 }
... ... @@ -363,4 +364,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
363 364 return systemContext.getAttributesService().findAll(this.deviceId, attributeType);
364 365 }
365 366
  367 + public void processCredentialsUpdate(ActorContext context, DeviceCredentialsUpdateNotificationMsg msg) {
  368 + sessions.forEach((k, v) -> {
  369 + sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(new SessionCloseNotification(), k), v.getServer());
  370 + });
  371 + attributeSubscriptions.clear();
  372 + rpcSubscriptions.clear();
  373 + }
366 374 }
... ...
... ... @@ -15,6 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.actors.service;
17 17
  18 +import org.thingsboard.server.common.data.id.DeviceId;
18 19 import org.thingsboard.server.common.data.id.PluginId;
19 20 import org.thingsboard.server.common.data.id.RuleId;
20 21 import org.thingsboard.server.common.data.id.TenantId;
... ... @@ -28,4 +29,6 @@ public interface ActorService extends SessionMsgProcessor, WebSocketMsgProcessor
28 29 void onPluginStateChange(TenantId tenantId, PluginId pluginId, ComponentLifecycleEvent state);
29 30
30 31 void onRuleStateChange(TenantId tenantId, RuleId ruleId, ComponentLifecycleEvent state);
  32 +
  33 + void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId);
31 34 }
... ...
... ... @@ -32,16 +32,19 @@ import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
32 32 import org.thingsboard.server.actors.rpc.RpcSessionTellMsg;
33 33 import org.thingsboard.server.actors.session.SessionManagerActor;
34 34 import org.thingsboard.server.actors.stats.StatsActor;
  35 +import org.thingsboard.server.common.data.id.DeviceId;
35 36 import org.thingsboard.server.common.data.id.PluginId;
36 37 import org.thingsboard.server.common.data.id.RuleId;
37 38 import org.thingsboard.server.common.data.id.TenantId;
38 39 import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
39 40 import org.thingsboard.server.common.msg.aware.SessionAwareMsg;
40 41 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
  42 +import org.thingsboard.server.common.msg.cluster.ServerAddress;
41 43 import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
42 44 import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
43 45 import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
44 46 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
  47 +import org.thingsboard.server.extensions.api.device.DeviceCredentialsUpdateNotificationMsg;
45 48 import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
46 49 import org.thingsboard.server.extensions.api.plugins.msg.ToPluginActorMsg;
47 50 import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg;
... ... @@ -56,6 +59,7 @@ import scala.concurrent.duration.Duration;
56 59
57 60 import javax.annotation.PostConstruct;
58 61 import javax.annotation.PreDestroy;
  62 +import java.util.Optional;
59 63
60 64 @Service
61 65 @Slf4j
... ... @@ -221,6 +225,17 @@ public class DefaultActorService implements ActorService {
221 225 broadcast(ComponentLifecycleMsg.forRule(tenantId, ruleId, state));
222 226 }
223 227
  228 + @Override
  229 + public void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId) {
  230 + DeviceCredentialsUpdateNotificationMsg msg = new DeviceCredentialsUpdateNotificationMsg(tenantId, deviceId);
  231 + Optional<ServerAddress> address = actorContext.getRoutingService().resolve(deviceId);
  232 + if (address.isPresent()) {
  233 + rpcService.tell(address.get(), msg);
  234 + } else {
  235 + onMsg(msg);
  236 + }
  237 + }
  238 +
224 239 public void broadcast(ToAllNodesMsg msg) {
225 240 rpcService.broadcast(msg);
226 241 appActor.tell(msg, ActorRef.noSender());
... ...
... ... @@ -20,15 +20,14 @@ import org.thingsboard.server.actors.shared.SessionTimeoutMsg;
20 20 import org.thingsboard.server.common.data.id.SessionId;
21 21 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
22 22 import org.thingsboard.server.common.msg.cluster.ServerAddress;
23   -import org.thingsboard.server.common.msg.core.AttributesSubscribeMsg;
24   -import org.thingsboard.server.common.msg.core.ResponseMsg;
25   -import org.thingsboard.server.common.msg.core.RpcSubscribeMsg;
  23 +import org.thingsboard.server.common.msg.core.*;
26 24 import org.thingsboard.server.common.msg.core.SessionCloseMsg;
27 25 import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
28 26 import org.thingsboard.server.common.msg.session.*;
29 27
30 28 import akka.actor.ActorContext;
31 29 import akka.event.LoggingAdapter;
  30 +import org.thingsboard.server.common.msg.session.ctrl.*;
32 31 import org.thingsboard.server.common.msg.session.ex.SessionException;
33 32
34 33 import java.util.HashMap;
... ... @@ -37,7 +36,8 @@ import java.util.Optional;
37 36
38 37 class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
39 38
40   - Map<Integer, ToDeviceActorMsg> pendingMap = new HashMap<>();
  39 + private boolean firstMsg = true;
  40 + private Map<Integer, ToDeviceActorMsg> pendingMap = new HashMap<>();
41 41 private Optional<ServerAddress> currentTargetServer;
42 42 private boolean subscribedToAttributeUpdates;
43 43 private boolean subscribedToRpcCommands;
... ... @@ -49,6 +49,10 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
49 49 @Override
50 50 protected void processToDeviceActorMsg(ActorContext ctx, ToDeviceActorSessionMsg msg) {
51 51 updateSessionCtx(msg, SessionType.ASYNC);
  52 + if (firstMsg) {
  53 + toDeviceMsg(new SessionOpenMsg()).ifPresent(m -> forwardToAppActor(ctx, m));
  54 + firstMsg = false;
  55 + }
52 56 ToDeviceActorMsg pendingMsg = toDeviceMsg(msg);
53 57 FromDeviceMsg fromDeviceMsg = pendingMsg.getPayload();
54 58 switch (fromDeviceMsg.getMsgType()) {
... ... @@ -80,17 +84,21 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
80 84 @Override
81 85 public void processToDeviceMsg(ActorContext context, ToDeviceMsg msg) {
82 86 try {
83   - switch (msg.getMsgType()) {
84   - case STATUS_CODE_RESPONSE:
85   - case GET_ATTRIBUTES_RESPONSE:
86   - ResponseMsg responseMsg = (ResponseMsg) msg;
87   - if (responseMsg.getRequestId() >= 0) {
88   - logger.debug("[{}] Pending request processed: {}", responseMsg.getRequestId(), responseMsg);
89   - pendingMap.remove(responseMsg.getRequestId());
90   - }
91   - break;
  87 + if (msg.getMsgType() != MsgType.SESSION_CLOSE) {
  88 + switch (msg.getMsgType()) {
  89 + case STATUS_CODE_RESPONSE:
  90 + case GET_ATTRIBUTES_RESPONSE:
  91 + ResponseMsg responseMsg = (ResponseMsg) msg;
  92 + if (responseMsg.getRequestId() >= 0) {
  93 + logger.debug("[{}] Pending request processed: {}", responseMsg.getRequestId(), responseMsg);
  94 + pendingMap.remove(responseMsg.getRequestId());
  95 + }
  96 + break;
  97 + }
  98 + sessionCtx.onMsg(new BasicSessionActorToAdaptorMsg(this.sessionCtx, msg));
  99 + } else {
  100 + sessionCtx.onMsg(org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg.onCredentialsRevoked(sessionCtx.getSessionId()));
92 101 }
93   - sessionCtx.onMsg(new BasicSessionActorToAdaptorMsg(this.sessionCtx, msg));
94 102 } catch (SessionException e) {
95 103 logger.warning("Failed to push session response msg", e);
96 104 }
... ... @@ -102,7 +110,7 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
102 110 }
103 111
104 112 protected void cleanupSession(ActorContext ctx) {
105   - toDeviceMsg(new SessionCloseMsg()).ifPresent(msg -> forwardToAppActor(ctx, msg));
  113 + toDeviceMsg(new SessionCloseMsg()).ifPresent(m -> forwardToAppActor(ctx, m));
106 114 }
107 115
108 116 @Override
... ... @@ -110,6 +118,7 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
110 118 if (pendingMap.size() > 0 || subscribedToAttributeUpdates || subscribedToRpcCommands) {
111 119 Optional<ServerAddress> newTargetServer = systemContext.getRoutingService().resolve(getDeviceId());
112 120 if (!newTargetServer.equals(currentTargetServer)) {
  121 + firstMsg = true;
113 122 currentTargetServer = newTargetServer;
114 123 pendingMap.values().forEach(v -> {
115 124 forwardToAppActor(context, v, currentTargetServer);
... ...
... ... @@ -52,7 +52,7 @@ class SyncMsgProcessor extends AbstractSessionActorMsgProcessor {
52 52 public void processTimeoutMsg(ActorContext context, SessionTimeoutMsg msg) {
53 53 if (pendingResponse) {
54 54 try {
55   - sessionCtx.onMsg(new SessionCloseMsg(sessionId, true));
  55 + sessionCtx.onMsg(SessionCloseMsg.onTimeout(sessionId));
56 56 } catch (SessionException e) {
57 57 logger.warning("Failed to push session close msg", e);
58 58 }
... ...
... ... @@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.security.DeviceCredentials;
28 28 import org.thingsboard.server.dao.exception.IncorrectParameterException;
29 29 import org.thingsboard.server.dao.model.ModelConstants;
30 30 import org.thingsboard.server.exception.ThingsboardException;
  31 +import org.thingsboard.server.extensions.api.device.DeviceCredentialsUpdateNotificationMsg;
31 32
32 33 @RestController
33 34 @RequestMapping("/api")
... ... @@ -48,7 +49,7 @@ public class DeviceController extends BaseController {
48 49
49 50 @PreAuthorize("hasAuthority('TENANT_ADMIN')")
50 51 @RequestMapping(value = "/device", method = RequestMethod.POST)
51   - @ResponseBody
  52 + @ResponseBody
52 53 public Device saveDevice(@RequestBody Device device) throws ThingsboardException {
53 54 try {
54 55 device.setTenantId(getCurrentUser().getTenantId());
... ... @@ -74,7 +75,7 @@ public class DeviceController extends BaseController {
74 75
75 76 @PreAuthorize("hasAuthority('TENANT_ADMIN')")
76 77 @RequestMapping(value = "/customer/{customerId}/device/{deviceId}", method = RequestMethod.POST)
77   - @ResponseBody
  78 + @ResponseBody
78 79 public Device assignDeviceToCustomer(@PathVariable("customerId") String strCustomerId,
79 80 @PathVariable("deviceId") String strDeviceId) throws ThingsboardException {
80 81 checkParameter("customerId", strCustomerId);
... ... @@ -85,7 +86,7 @@ public class DeviceController extends BaseController {
85 86
86 87 DeviceId deviceId = new DeviceId(toUUID(strDeviceId));
87 88 checkDeviceId(deviceId);
88   -
  89 +
89 90 return checkNotNull(deviceService.assignDeviceToCustomer(deviceId, customerId));
90 91 } catch (Exception e) {
91 92 throw handleException(e);
... ... @@ -94,7 +95,7 @@ public class DeviceController extends BaseController {
94 95
95 96 @PreAuthorize("hasAuthority('TENANT_ADMIN')")
96 97 @RequestMapping(value = "/customer/device/{deviceId}", method = RequestMethod.DELETE)
97   - @ResponseBody
  98 + @ResponseBody
98 99 public Device unassignDeviceFromCustomer(@PathVariable("deviceId") String strDeviceId) throws ThingsboardException {
99 100 checkParameter("deviceId", strDeviceId);
100 101 try {
... ... @@ -125,19 +126,21 @@ public class DeviceController extends BaseController {
125 126
126 127 @PreAuthorize("hasAuthority('TENANT_ADMIN')")
127 128 @RequestMapping(value = "/device/credentials", method = RequestMethod.POST)
128   - @ResponseBody
  129 + @ResponseBody
129 130 public DeviceCredentials saveDeviceCredentials(@RequestBody DeviceCredentials deviceCredentials) throws ThingsboardException {
130 131 checkNotNull(deviceCredentials);
131 132 try {
132 133 checkDeviceId(deviceCredentials.getDeviceId());
133   - return checkNotNull(deviceCredentialsService.updateDeviceCredentials(deviceCredentials));
  134 + DeviceCredentials result = checkNotNull(deviceCredentialsService.updateDeviceCredentials(deviceCredentials));
  135 + actorService.onCredentialsUpdate(getCurrentUser().getTenantId(), deviceCredentials.getDeviceId());
  136 + return result;
134 137 } catch (Exception e) {
135 138 throw handleException(e);
136 139 }
137 140 }
138 141
139 142 @PreAuthorize("hasAuthority('TENANT_ADMIN')")
140   - @RequestMapping(value = "/tenant/devices", params = { "limit" }, method = RequestMethod.GET)
  143 + @RequestMapping(value = "/tenant/devices", params = {"limit"}, method = RequestMethod.GET)
141 144 @ResponseBody
142 145 public TextPageData<Device> getTenantDevices(
143 146 @RequestParam int limit,
... ... @@ -154,7 +157,7 @@ public class DeviceController extends BaseController {
154 157 }
155 158
156 159 @PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')")
157   - @RequestMapping(value = "/customer/{customerId}/devices", params = { "limit" }, method = RequestMethod.GET)
  160 + @RequestMapping(value = "/customer/{customerId}/devices", params = {"limit"}, method = RequestMethod.GET)
158 161 @ResponseBody
159 162 public TextPageData<Device> getCustomerDevices(
160 163 @PathVariable("customerId") String strCustomerId,
... ...
... ... @@ -29,7 +29,7 @@ server:
29 29 # Zookeeper connection parameters. Used for service discovery.
30 30 zk:
31 31 # Enable/disable zookeeper discovery service.
32   - enabled: "${ZOOKEEPER_ENABLED:false}"
  32 + enabled: "${ZOOKEEPER_ENABLED:true}"
33 33 # Zookeeper connect string
34 34 url: "${ZOOKEEPER_URL:localhost:2181}"
35 35 # Zookeeper retry interval in milliseconds
... ...
  1 +/**
  2 + * Copyright © 2016 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.msg.core;
  17 +
  18 +import lombok.ToString;
  19 +import org.thingsboard.server.common.msg.kv.AttributesKVMsg;
  20 +import org.thingsboard.server.common.msg.session.MsgType;
  21 +import org.thingsboard.server.common.msg.session.ToDeviceMsg;
  22 +
  23 +@ToString
  24 +public class SessionCloseNotification implements ToDeviceMsg {
  25 +
  26 + private static final long serialVersionUID = 1L;
  27 +
  28 + @Override
  29 + public boolean isSuccess() {
  30 + return true;
  31 + }
  32 +
  33 + @Override
  34 + public MsgType getMsgType() {
  35 + return MsgType.SESSION_CLOSE;
  36 + }
  37 +
  38 +}
... ...
  1 +/**
  2 + * Copyright © 2016 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.msg.core;
  17 +
  18 +import org.thingsboard.server.common.msg.session.FromDeviceMsg;
  19 +import org.thingsboard.server.common.msg.session.MsgType;
  20 +
  21 +/**
  22 + * @author Andrew Shvayka
  23 + */
  24 +public class SessionOpenMsg implements FromDeviceMsg {
  25 + @Override
  26 + public MsgType getMsgType() {
  27 + return MsgType.SESSION_OPEN;
  28 + }
  29 +}
... ...
... ... @@ -28,7 +28,7 @@ public enum MsgType {
28 28
29 29 RULE_ENGINE_ERROR,
30 30
31   - SESSION_CLOSE;
  31 + SESSION_OPEN, SESSION_CLOSE;
32 32
33 33 private final boolean requiresRulesProcessing;
34 34
... ...
... ... @@ -21,11 +21,25 @@ import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
21 21 public class SessionCloseMsg implements SessionCtrlMsg {
22 22
23 23 private final SessionId sessionId;
  24 + private final boolean revoked;
24 25 private final boolean timeout;
25 26
26   - public SessionCloseMsg(SessionId sessionId, boolean timeout) {
  27 + public static SessionCloseMsg onError(SessionId sessionId) {
  28 + return new SessionCloseMsg(sessionId, false, false);
  29 + }
  30 +
  31 + public static SessionCloseMsg onTimeout(SessionId sessionId) {
  32 + return new SessionCloseMsg(sessionId, false, true);
  33 + }
  34 +
  35 + public static SessionCloseMsg onCredentialsRevoked(SessionId sessionId) {
  36 + return new SessionCloseMsg(sessionId, true, false);
  37 + }
  38 +
  39 + private SessionCloseMsg(SessionId sessionId, boolean unauthorized, boolean timeout) {
27 40 super();
28 41 this.sessionId = sessionId;
  42 + this.revoked = unauthorized;
29 43 this.timeout = timeout;
30 44 }
31 45
... ... @@ -34,6 +48,10 @@ public class SessionCloseMsg implements SessionCtrlMsg {
34 48 return sessionId;
35 49 }
36 50
  51 + public boolean isCredentialsRevoked() {
  52 + return revoked;
  53 + }
  54 +
37 55 public boolean isTimeout() {
38 56 return timeout;
39 57 }
... ...
  1 +/**
  2 + * Copyright © 2016 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.extensions.api.device;
  17 +
  18 +import lombok.Data;
  19 +import lombok.Getter;
  20 +import lombok.ToString;
  21 +import org.thingsboard.server.common.data.id.DeviceId;
  22 +import org.thingsboard.server.common.data.id.TenantId;
  23 +import org.thingsboard.server.common.data.kv.AttributeKey;
  24 +
  25 +import java.util.Set;
  26 +
  27 +/**
  28 + * @author Andrew Shvayka
  29 + */
  30 +@Data
  31 +public class DeviceCredentialsUpdateNotificationMsg implements ToDeviceActorNotificationMsg {
  32 +
  33 + private final TenantId tenantId;
  34 + private final DeviceId deviceId;
  35 +
  36 +}
... ...
  1 +restUrl=http://localhost:8080
  2 +mqttUrls=tcp://localhost:1883
  3 +deviceCount=1
  4 +durationMs=60000
  5 +iterationIntervalMs=1000
... ...
... ... @@ -36,6 +36,7 @@ import org.slf4j.Logger;
36 36 import org.slf4j.LoggerFactory;
37 37
38 38 import java.util.concurrent.atomic.AtomicInteger;
  39 +
39 40 @Slf4j
40 41 public class CoapSessionCtx extends DeviceAwareSessionContext {
41 42
... ... @@ -87,6 +88,8 @@ public class CoapSessionCtx extends DeviceAwareSessionContext {
87 88 private void onSessionClose(SessionCloseMsg msg) {
88 89 if (msg.isTimeout()) {
89 90 exchange.respond(ResponseCode.SERVICE_UNAVAILABLE);
  91 + } else if (msg.isCredentialsRevoked()) {
  92 + exchange.respond(ResponseCode.UNAUTHORIZED);
90 93 } else {
91 94 exchange.respond(ResponseCode.INTERNAL_SERVER_ERROR);
92 95 }
... ... @@ -120,7 +123,7 @@ public class CoapSessionCtx extends DeviceAwareSessionContext {
120 123
121 124 public void close() {
122 125 log.info("[{}] Closing processing context. Timeout: {}", sessionId, exchange.advanced().isTimedOut());
123   - processor.process(new SessionCloseMsg(sessionId, exchange.advanced().isTimedOut()));
  126 + processor.process(exchange.advanced().isTimedOut() ? SessionCloseMsg.onTimeout(sessionId) : SessionCloseMsg.onError(sessionId));
124 127 }
125 128
126 129 @Override
... ...
... ... @@ -210,7 +210,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
210 210 }
211 211
212 212 private void processDisconnect(ChannelHandlerContext ctx) {
213   - processor.process(new SessionCloseMsg(sessionCtx.getSessionId(), false));
214 213 ctx.close();
215 214 }
216 215
... ... @@ -255,6 +254,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
255 254
256 255 @Override
257 256 public void operationComplete(Future<? super Void> future) throws Exception {
258   - processor.process(new SessionCloseMsg(sessionCtx.getSessionId(), false));
  257 + processor.process(SessionCloseMsg.onError(sessionCtx.getSessionId()));
259 258 }
260 259 }
... ...
... ... @@ -16,12 +16,13 @@
16 16 package org.thingsboard.server.transport.mqtt.session;
17 17
18 18 import io.netty.channel.ChannelHandlerContext;
19   -import io.netty.handler.codec.mqtt.MqttMessage;
  19 +import io.netty.handler.codec.mqtt.*;
20 20 import lombok.extern.slf4j.Slf4j;
21 21 import org.thingsboard.server.common.data.id.SessionId;
22 22 import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg;
23 23 import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
24 24 import org.thingsboard.server.common.msg.session.SessionType;
  25 +import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
25 26 import org.thingsboard.server.common.msg.session.ex.SessionException;
26 27 import org.thingsboard.server.common.transport.SessionMsgProcessor;
27 28 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
... ... @@ -75,7 +76,10 @@ public class MqttSessionCtx extends DeviceAwareSessionContext {
75 76
76 77 @Override
77 78 public void onMsg(SessionCtrlMsg msg) throws SessionException {
78   -
  79 + if (msg instanceof SessionCloseMsg) {
  80 + pushToNetwork(new MqttMessage(new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0)));
  81 + channel.close();
  82 + }
79 83 }
80 84
81 85 @Override
... ...