Commit a9151b5100539b0ce4a734726eb1aacbbe3a22dc

Authored by YevhenBondarenko
1 parent 77b9a8c1

fixed NPE

@@ -278,8 +278,8 @@ public class LwM2mClientContextImpl implements LwM2mClientContext { @@ -278,8 +278,8 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
278 public LwM2mClient getClientBySessionInfo(TransportProtos.SessionInfoProto sessionInfo) { 278 public LwM2mClient getClientBySessionInfo(TransportProtos.SessionInfoProto sessionInfo) {
279 LwM2mClient lwM2mClient = null; 279 LwM2mClient lwM2mClient = null;
280 UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()); 280 UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
281 - Predicate<LwM2mClient> isClientFilter = c ->  
282 - sessionId.equals((new UUID(c.getSession().getSessionIdMSB(), c.getSession().getSessionIdLSB()))); 281 + Predicate<LwM2mClient> isClientFilter =
  282 + c -> c.getSession() != null && sessionId.equals((new UUID(c.getSession().getSessionIdMSB(), c.getSession().getSessionIdLSB())));
283 if (this.lwM2mClientsByEndpoint.size() > 0) { 283 if (this.lwM2mClientsByEndpoint.size() > 0) {
284 lwM2mClient = this.lwM2mClientsByEndpoint.values().stream().filter(isClientFilter).findAny().orElse(null); 284 lwM2mClient = this.lwM2mClientsByEndpoint.values().stream().filter(isClientFilter).findAny().orElse(null);
285 } 285 }
@@ -17,15 +17,12 @@ package org.thingsboard.server.transport.lwm2m.server.rpc; @@ -17,15 +17,12 @@ package org.thingsboard.server.transport.lwm2m.server.rpc;
17 17
18 import lombok.RequiredArgsConstructor; 18 import lombok.RequiredArgsConstructor;
19 import lombok.extern.slf4j.Slf4j; 19 import lombok.extern.slf4j.Slf4j;
  20 +import org.apache.commons.lang3.exception.ExceptionUtils;
20 import org.eclipse.leshan.core.ResponseCode; 21 import org.eclipse.leshan.core.ResponseCode;
21 -import org.eclipse.leshan.core.request.ReadCompositeRequest;  
22 -import org.eclipse.leshan.core.response.ReadCompositeResponse;  
23 import org.springframework.stereotype.Service; 22 import org.springframework.stereotype.Service;
24 import org.thingsboard.common.util.JacksonUtil; 23 import org.thingsboard.common.util.JacksonUtil;
25 import org.thingsboard.server.common.data.StringUtils; 24 import org.thingsboard.server.common.data.StringUtils;
26 -import org.thingsboard.server.common.data.rpc.RpcStatus;  
27 import org.thingsboard.server.common.transport.TransportService; 25 import org.thingsboard.server.common.transport.TransportService;
28 -import org.thingsboard.server.common.transport.TransportServiceCallback;  
29 import org.thingsboard.server.gen.transport.TransportProtos; 26 import org.thingsboard.server.gen.transport.TransportProtos;
30 import org.thingsboard.server.queue.util.TbLwM2mTransportComponent; 27 import org.thingsboard.server.queue.util.TbLwM2mTransportComponent;
31 import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig; 28 import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig;
@@ -63,11 +60,9 @@ import org.thingsboard.server.transport.lwm2m.server.rpc.composite.RpcReadRespon @@ -63,11 +60,9 @@ import org.thingsboard.server.transport.lwm2m.server.rpc.composite.RpcReadRespon
63 import org.thingsboard.server.transport.lwm2m.server.rpc.composite.RpcWriteCompositeRequest; 60 import org.thingsboard.server.transport.lwm2m.server.rpc.composite.RpcWriteCompositeRequest;
64 import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; 61 import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler;
65 62
66 -import java.util.Map;  
67 import java.util.Set; 63 import java.util.Set;
68 import java.util.UUID; 64 import java.util.UUID;
69 import java.util.concurrent.ConcurrentHashMap; 65 import java.util.concurrent.ConcurrentHashMap;
70 -import java.util.stream.Collectors;  
71 66
72 @Slf4j 67 @Slf4j
73 @Service 68 @Service
@@ -85,91 +80,96 @@ public class DefaultLwM2MRpcRequestHandler implements LwM2MRpcRequestHandler { @@ -85,91 +80,96 @@ public class DefaultLwM2MRpcRequestHandler implements LwM2MRpcRequestHandler {
85 @Override 80 @Override
86 public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg rpcRequest, TransportProtos.SessionInfoProto sessionInfo) { 81 public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg rpcRequest, TransportProtos.SessionInfoProto sessionInfo) {
87 log.debug("Received params: {}", rpcRequest.getParams()); 82 log.debug("Received params: {}", rpcRequest.getParams());
88 - LwM2mOperationType operationType = LwM2mOperationType.fromType(rpcRequest.getMethodName());  
89 - if (operationType == null) {  
90 - this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.METHOD_NOT_ALLOWED, "Unsupported operation type: " + rpcRequest.getMethodName());  
91 - return;  
92 - }  
93 - LwM2mClient client = clientContext.getClientBySessionInfo(sessionInfo);  
94 - if (client.getRegistration() == null) {  
95 - this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.INTERNAL_SERVER_ERROR, "Registration is empty");  
96 - return;  
97 - }  
98 - UUID rpcId = new UUID(rpcRequest.getRequestIdMSB(), rpcRequest.getRequestIdLSB());  
99 -  
100 - if (rpcId.equals(client.getLastSentRpcId())) {  
101 - log.debug("[{}]][{}] Rpc has already sent!", client.getEndpoint(), rpcId);  
102 - return;  
103 - }  
104 try { 83 try {
105 - if (operationType.isHasObjectId()) {  
106 - String objectId = getIdFromParameters(client, rpcRequest);  
107 - switch (operationType) {  
108 - case READ:  
109 - sendReadRequest(client, rpcRequest, objectId);  
110 - break;  
111 - case OBSERVE:  
112 - sendObserveRequest(client, rpcRequest, objectId);  
113 - break;  
114 - case DISCOVER:  
115 - sendDiscoverRequest(client, rpcRequest, objectId);  
116 - break;  
117 - case EXECUTE:  
118 - sendExecuteRequest(client, rpcRequest, objectId);  
119 - break;  
120 - case WRITE_ATTRIBUTES:  
121 - sendWriteAttributesRequest(client, rpcRequest, objectId);  
122 - break;  
123 - case OBSERVE_CANCEL:  
124 - sendCancelObserveRequest(client, rpcRequest, objectId);  
125 - break;  
126 - case DELETE:  
127 - sendDeleteRequest(client, rpcRequest, objectId);  
128 - break;  
129 - case WRITE_UPDATE:  
130 - sendWriteUpdateRequest(client, rpcRequest, objectId);  
131 - break;  
132 - case WRITE_REPLACE:  
133 - sendWriteReplaceRequest(client, rpcRequest, objectId);  
134 - break;  
135 - default:  
136 - throw new IllegalArgumentException("Unsupported operation: " + operationType.name());  
137 - }  
138 - } else if (operationType.isComposite()) {  
139 - if (clientContext.isComposite(client)) { 84 + LwM2mOperationType operationType = LwM2mOperationType.fromType(rpcRequest.getMethodName());
  85 + if (operationType == null) {
  86 + this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.METHOD_NOT_ALLOWED, "Unsupported operation type: " + rpcRequest.getMethodName());
  87 + return;
  88 + }
  89 + LwM2mClient client = clientContext.getClientBySessionInfo(sessionInfo);
  90 + if (client.getRegistration() == null) {
  91 + this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.INTERNAL_SERVER_ERROR, "Registration is empty");
  92 + return;
  93 + }
  94 + UUID rpcId = new UUID(rpcRequest.getRequestIdMSB(), rpcRequest.getRequestIdLSB());
  95 +
  96 + if (rpcId.equals(client.getLastSentRpcId())) {
  97 + log.debug("[{}]][{}] Rpc has already sent!", client.getEndpoint(), rpcId);
  98 + return;
  99 + }
  100 + try {
  101 + if (operationType.isHasObjectId()) {
  102 + String objectId = getIdFromParameters(client, rpcRequest);
140 switch (operationType) { 103 switch (operationType) {
141 - case READ_COMPOSITE:  
142 - sendReadCompositeRequest(client, rpcRequest); 104 + case READ:
  105 + sendReadRequest(client, rpcRequest, objectId);
  106 + break;
  107 + case OBSERVE:
  108 + sendObserveRequest(client, rpcRequest, objectId);
  109 + break;
  110 + case DISCOVER:
  111 + sendDiscoverRequest(client, rpcRequest, objectId);
  112 + break;
  113 + case EXECUTE:
  114 + sendExecuteRequest(client, rpcRequest, objectId);
  115 + break;
  116 + case WRITE_ATTRIBUTES:
  117 + sendWriteAttributesRequest(client, rpcRequest, objectId);
143 break; 118 break;
144 - case WRITE_COMPOSITE:  
145 - sendWriteCompositeRequest(client, rpcRequest); 119 + case OBSERVE_CANCEL:
  120 + sendCancelObserveRequest(client, rpcRequest, objectId);
  121 + break;
  122 + case DELETE:
  123 + sendDeleteRequest(client, rpcRequest, objectId);
  124 + break;
  125 + case WRITE_UPDATE:
  126 + sendWriteUpdateRequest(client, rpcRequest, objectId);
  127 + break;
  128 + case WRITE_REPLACE:
  129 + sendWriteReplaceRequest(client, rpcRequest, objectId);
146 break; 130 break;
147 default: 131 default:
148 throw new IllegalArgumentException("Unsupported operation: " + operationType.name()); 132 throw new IllegalArgumentException("Unsupported operation: " + operationType.name());
149 } 133 }
  134 + } else if (operationType.isComposite()) {
  135 + if (clientContext.isComposite(client)) {
  136 + switch (operationType) {
  137 + case READ_COMPOSITE:
  138 + sendReadCompositeRequest(client, rpcRequest);
  139 + break;
  140 + case WRITE_COMPOSITE:
  141 + sendWriteCompositeRequest(client, rpcRequest);
  142 + break;
  143 + default:
  144 + throw new IllegalArgumentException("Unsupported operation: " + operationType.name());
  145 + }
  146 + } else {
  147 + this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(),
  148 + ResponseCode.INTERNAL_SERVER_ERROR, "This device does not support Composite Operation");
  149 + }
150 } else { 150 } else {
151 - this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(),  
152 - ResponseCode.INTERNAL_SERVER_ERROR, "This device does not support Composite Operation");  
153 - }  
154 - } else {  
155 - switch (operationType) {  
156 - case OBSERVE_CANCEL_ALL:  
157 - sendCancelAllObserveRequest(client, rpcRequest);  
158 - break;  
159 - case OBSERVE_READ_ALL:  
160 - sendObserveAllRequest(client, rpcRequest);  
161 - break;  
162 - case DISCOVER_ALL:  
163 - sendDiscoverAllRequest(client, rpcRequest);  
164 - break;  
165 - case FW_UPDATE:  
166 - //TODO: implement and add break statement  
167 - default:  
168 - throw new IllegalArgumentException("Unsupported operation: " + operationType.name()); 151 + switch (operationType) {
  152 + case OBSERVE_CANCEL_ALL:
  153 + sendCancelAllObserveRequest(client, rpcRequest);
  154 + break;
  155 + case OBSERVE_READ_ALL:
  156 + sendObserveAllRequest(client, rpcRequest);
  157 + break;
  158 + case DISCOVER_ALL:
  159 + sendDiscoverAllRequest(client, rpcRequest);
  160 + break;
  161 + case FW_UPDATE:
  162 + //TODO: implement and add break statement
  163 + default:
  164 + throw new IllegalArgumentException("Unsupported operation: " + operationType.name());
  165 + }
169 } 166 }
  167 + } catch (IllegalArgumentException e) {
  168 + this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.BAD_REQUEST, e.getMessage());
170 } 169 }
171 - } catch (IllegalArgumentException e) {  
172 - this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.BAD_REQUEST, e.getMessage()); 170 + } catch (Exception e) {
  171 + log.error("[{}] Failed to send RPC: [{}]", sessionInfo, rpcRequest, e);
  172 + this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.INTERNAL_SERVER_ERROR, ExceptionUtils.getRootCauseMessage(e));
173 } 173 }
174 } 174 }
175 175