Commit 055331a89e9ad373e2a5585fdf833b370deadede

Authored by Andrew Shvayka
Committed by GitHub
2 parents 77b9a8c1 a9151b51

Merge pull request #5590 from YevhenBondarenko/fix/lwm2m

[3.3.3] fixed NPE
... ... @@ -278,8 +278,8 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
278 278 public LwM2mClient getClientBySessionInfo(TransportProtos.SessionInfoProto sessionInfo) {
279 279 LwM2mClient lwM2mClient = null;
280 280 UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
281   - Predicate<LwM2mClient> isClientFilter = c ->
282   - sessionId.equals((new UUID(c.getSession().getSessionIdMSB(), c.getSession().getSessionIdLSB())));
  281 + Predicate<LwM2mClient> isClientFilter =
  282 + c -> c.getSession() != null && sessionId.equals((new UUID(c.getSession().getSessionIdMSB(), c.getSession().getSessionIdLSB())));
283 283 if (this.lwM2mClientsByEndpoint.size() > 0) {
284 284 lwM2mClient = this.lwM2mClientsByEndpoint.values().stream().filter(isClientFilter).findAny().orElse(null);
285 285 }
... ...
... ... @@ -17,15 +17,12 @@ package org.thingsboard.server.transport.lwm2m.server.rpc;
17 17
18 18 import lombok.RequiredArgsConstructor;
19 19 import lombok.extern.slf4j.Slf4j;
  20 +import org.apache.commons.lang3.exception.ExceptionUtils;
20 21 import org.eclipse.leshan.core.ResponseCode;
21   -import org.eclipse.leshan.core.request.ReadCompositeRequest;
22   -import org.eclipse.leshan.core.response.ReadCompositeResponse;
23 22 import org.springframework.stereotype.Service;
24 23 import org.thingsboard.common.util.JacksonUtil;
25 24 import org.thingsboard.server.common.data.StringUtils;
26   -import org.thingsboard.server.common.data.rpc.RpcStatus;
27 25 import org.thingsboard.server.common.transport.TransportService;
28   -import org.thingsboard.server.common.transport.TransportServiceCallback;
29 26 import org.thingsboard.server.gen.transport.TransportProtos;
30 27 import org.thingsboard.server.queue.util.TbLwM2mTransportComponent;
31 28 import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig;
... ... @@ -63,11 +60,9 @@ import org.thingsboard.server.transport.lwm2m.server.rpc.composite.RpcReadRespon
63 60 import org.thingsboard.server.transport.lwm2m.server.rpc.composite.RpcWriteCompositeRequest;
64 61 import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler;
65 62
66   -import java.util.Map;
67 63 import java.util.Set;
68 64 import java.util.UUID;
69 65 import java.util.concurrent.ConcurrentHashMap;
70   -import java.util.stream.Collectors;
71 66
72 67 @Slf4j
73 68 @Service
... ... @@ -85,91 +80,96 @@ public class DefaultLwM2MRpcRequestHandler implements LwM2MRpcRequestHandler {
85 80 @Override
86 81 public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg rpcRequest, TransportProtos.SessionInfoProto sessionInfo) {
87 82 log.debug("Received params: {}", rpcRequest.getParams());
88   - LwM2mOperationType operationType = LwM2mOperationType.fromType(rpcRequest.getMethodName());
89   - if (operationType == null) {
90   - this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.METHOD_NOT_ALLOWED, "Unsupported operation type: " + rpcRequest.getMethodName());
91   - return;
92   - }
93   - LwM2mClient client = clientContext.getClientBySessionInfo(sessionInfo);
94   - if (client.getRegistration() == null) {
95   - this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.INTERNAL_SERVER_ERROR, "Registration is empty");
96   - return;
97   - }
98   - UUID rpcId = new UUID(rpcRequest.getRequestIdMSB(), rpcRequest.getRequestIdLSB());
99   -
100   - if (rpcId.equals(client.getLastSentRpcId())) {
101   - log.debug("[{}]][{}] Rpc has already sent!", client.getEndpoint(), rpcId);
102   - return;
103   - }
104 83 try {
105   - if (operationType.isHasObjectId()) {
106   - String objectId = getIdFromParameters(client, rpcRequest);
107   - switch (operationType) {
108   - case READ:
109   - sendReadRequest(client, rpcRequest, objectId);
110   - break;
111   - case OBSERVE:
112   - sendObserveRequest(client, rpcRequest, objectId);
113   - break;
114   - case DISCOVER:
115   - sendDiscoverRequest(client, rpcRequest, objectId);
116   - break;
117   - case EXECUTE:
118   - sendExecuteRequest(client, rpcRequest, objectId);
119   - break;
120   - case WRITE_ATTRIBUTES:
121   - sendWriteAttributesRequest(client, rpcRequest, objectId);
122   - break;
123   - case OBSERVE_CANCEL:
124   - sendCancelObserveRequest(client, rpcRequest, objectId);
125   - break;
126   - case DELETE:
127   - sendDeleteRequest(client, rpcRequest, objectId);
128   - break;
129   - case WRITE_UPDATE:
130   - sendWriteUpdateRequest(client, rpcRequest, objectId);
131   - break;
132   - case WRITE_REPLACE:
133   - sendWriteReplaceRequest(client, rpcRequest, objectId);
134   - break;
135   - default:
136   - throw new IllegalArgumentException("Unsupported operation: " + operationType.name());
137   - }
138   - } else if (operationType.isComposite()) {
139   - if (clientContext.isComposite(client)) {
  84 + LwM2mOperationType operationType = LwM2mOperationType.fromType(rpcRequest.getMethodName());
  85 + if (operationType == null) {
  86 + this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.METHOD_NOT_ALLOWED, "Unsupported operation type: " + rpcRequest.getMethodName());
  87 + return;
  88 + }
  89 + LwM2mClient client = clientContext.getClientBySessionInfo(sessionInfo);
  90 + if (client.getRegistration() == null) {
  91 + this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.INTERNAL_SERVER_ERROR, "Registration is empty");
  92 + return;
  93 + }
  94 + UUID rpcId = new UUID(rpcRequest.getRequestIdMSB(), rpcRequest.getRequestIdLSB());
  95 +
  96 + if (rpcId.equals(client.getLastSentRpcId())) {
  97 + log.debug("[{}]][{}] Rpc has already sent!", client.getEndpoint(), rpcId);
  98 + return;
  99 + }
  100 + try {
  101 + if (operationType.isHasObjectId()) {
  102 + String objectId = getIdFromParameters(client, rpcRequest);
140 103 switch (operationType) {
141   - case READ_COMPOSITE:
142   - sendReadCompositeRequest(client, rpcRequest);
  104 + case READ:
  105 + sendReadRequest(client, rpcRequest, objectId);
  106 + break;
  107 + case OBSERVE:
  108 + sendObserveRequest(client, rpcRequest, objectId);
  109 + break;
  110 + case DISCOVER:
  111 + sendDiscoverRequest(client, rpcRequest, objectId);
  112 + break;
  113 + case EXECUTE:
  114 + sendExecuteRequest(client, rpcRequest, objectId);
  115 + break;
  116 + case WRITE_ATTRIBUTES:
  117 + sendWriteAttributesRequest(client, rpcRequest, objectId);
143 118 break;
144   - case WRITE_COMPOSITE:
145   - sendWriteCompositeRequest(client, rpcRequest);
  119 + case OBSERVE_CANCEL:
  120 + sendCancelObserveRequest(client, rpcRequest, objectId);
  121 + break;
  122 + case DELETE:
  123 + sendDeleteRequest(client, rpcRequest, objectId);
  124 + break;
  125 + case WRITE_UPDATE:
  126 + sendWriteUpdateRequest(client, rpcRequest, objectId);
  127 + break;
  128 + case WRITE_REPLACE:
  129 + sendWriteReplaceRequest(client, rpcRequest, objectId);
146 130 break;
147 131 default:
148 132 throw new IllegalArgumentException("Unsupported operation: " + operationType.name());
149 133 }
  134 + } else if (operationType.isComposite()) {
  135 + if (clientContext.isComposite(client)) {
  136 + switch (operationType) {
  137 + case READ_COMPOSITE:
  138 + sendReadCompositeRequest(client, rpcRequest);
  139 + break;
  140 + case WRITE_COMPOSITE:
  141 + sendWriteCompositeRequest(client, rpcRequest);
  142 + break;
  143 + default:
  144 + throw new IllegalArgumentException("Unsupported operation: " + operationType.name());
  145 + }
  146 + } else {
  147 + this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(),
  148 + ResponseCode.INTERNAL_SERVER_ERROR, "This device does not support Composite Operation");
  149 + }
150 150 } else {
151   - this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(),
152   - ResponseCode.INTERNAL_SERVER_ERROR, "This device does not support Composite Operation");
153   - }
154   - } else {
155   - switch (operationType) {
156   - case OBSERVE_CANCEL_ALL:
157   - sendCancelAllObserveRequest(client, rpcRequest);
158   - break;
159   - case OBSERVE_READ_ALL:
160   - sendObserveAllRequest(client, rpcRequest);
161   - break;
162   - case DISCOVER_ALL:
163   - sendDiscoverAllRequest(client, rpcRequest);
164   - break;
165   - case FW_UPDATE:
166   - //TODO: implement and add break statement
167   - default:
168   - throw new IllegalArgumentException("Unsupported operation: " + operationType.name());
  151 + switch (operationType) {
  152 + case OBSERVE_CANCEL_ALL:
  153 + sendCancelAllObserveRequest(client, rpcRequest);
  154 + break;
  155 + case OBSERVE_READ_ALL:
  156 + sendObserveAllRequest(client, rpcRequest);
  157 + break;
  158 + case DISCOVER_ALL:
  159 + sendDiscoverAllRequest(client, rpcRequest);
  160 + break;
  161 + case FW_UPDATE:
  162 + //TODO: implement and add break statement
  163 + default:
  164 + throw new IllegalArgumentException("Unsupported operation: " + operationType.name());
  165 + }
169 166 }
  167 + } catch (IllegalArgumentException e) {
  168 + this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.BAD_REQUEST, e.getMessage());
170 169 }
171   - } catch (IllegalArgumentException e) {
172   - this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.BAD_REQUEST, e.getMessage());
  170 + } catch (Exception e) {
  171 + log.error("[{}] Failed to send RPC: [{}]", sessionInfo, rpcRequest, e);
  172 + this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.INTERNAL_SERVER_ERROR, ExceptionUtils.getRootCauseMessage(e));
173 173 }
174 174 }
175 175
... ...