Commit 4bf8ef180692cca077a3fac3de5bea4b5ff4c7cd

Authored by Andrii Shvaika
2 parents f1611dcf c500be4e

Fetch LwM2M device profile when restart

@@ -78,7 +78,7 @@ public class LwM2MTransportServerConfig implements LwM2MSecureServerConfig { @@ -78,7 +78,7 @@ public class LwM2MTransportServerConfig implements LwM2MSecureServerConfig {
78 78
79 @Getter 79 @Getter
80 @Value("${transport.lwm2m.security.key_store:}") 80 @Value("${transport.lwm2m.security.key_store:}")
81 - private String keyStorePathFile; 81 + private String keyStoreFilePath;
82 82
83 @Getter 83 @Getter
84 @Setter 84 @Setter
@@ -141,14 +141,27 @@ public class LwM2MTransportServerConfig implements LwM2MSecureServerConfig { @@ -141,14 +141,27 @@ public class LwM2MTransportServerConfig implements LwM2MSecureServerConfig {
141 public void init() { 141 public void init() {
142 URI uri = null; 142 URI uri = null;
143 try { 143 try {
144 - uri = Resources.getResource(keyStorePathFile).toURI();  
145 - log.info("URI: {}", uri);  
146 - File keyStoreFile = new File(uri);  
147 - InputStream inKeyStore = new FileInputStream(keyStoreFile); 144 + InputStream keyStoreInputStream;
  145 + File keyStoreFile = new File(keyStoreFilePath);
  146 + if (keyStoreFile.exists()) {
  147 + log.info("Reading key store from file {}", keyStoreFilePath);
  148 + keyStoreInputStream = new FileInputStream(keyStoreFile);
  149 + } else {
  150 + InputStream classPathStream = this.getClass().getClassLoader().getResourceAsStream(keyStoreFilePath);
  151 + if (classPathStream != null) {
  152 + log.info("Reading key store from class path {}", keyStoreFilePath);
  153 + keyStoreInputStream = classPathStream;
  154 + } else {
  155 + uri = Resources.getResource(keyStoreFilePath).toURI();
  156 + log.info("Reading key store from URI {}", keyStoreFilePath);
  157 + keyStoreInputStream = new FileInputStream(new File(uri));
  158 + }
  159 + }
148 keyStoreValue = KeyStore.getInstance(keyStoreType); 160 keyStoreValue = KeyStore.getInstance(keyStoreType);
149 - keyStoreValue.load(inKeyStore, keyStorePassword == null ? null : keyStorePassword.toCharArray()); 161 + keyStoreValue.load(keyStoreInputStream, keyStorePassword == null ? null : keyStorePassword.toCharArray());
150 } catch (Exception e) { 162 } catch (Exception e) {
151 - log.info("Unable to lookup LwM2M keystore. Reason: {}, {}" , uri, e.getMessage()); 163 + log.info("Unable to lookup LwM2M keystore. Reason: {}, {}", uri, e.getMessage());
152 } 164 }
153 } 165 }
  166 +
154 } 167 }
@@ -24,6 +24,8 @@ import org.eclipse.leshan.server.registration.Registration; @@ -24,6 +24,8 @@ import org.eclipse.leshan.server.registration.Registration;
24 import org.springframework.stereotype.Service; 24 import org.springframework.stereotype.Service;
25 import org.thingsboard.server.common.data.DeviceProfile; 25 import org.thingsboard.server.common.data.DeviceProfile;
26 import org.thingsboard.server.common.data.device.profile.Lwm2mDeviceProfileTransportConfiguration; 26 import org.thingsboard.server.common.data.device.profile.Lwm2mDeviceProfileTransportConfiguration;
  27 +import org.thingsboard.server.common.data.id.DeviceProfileId;
  28 +import org.thingsboard.server.common.transport.TransportDeviceProfileCache;
27 import org.thingsboard.server.common.transport.TransportService; 29 import org.thingsboard.server.common.transport.TransportService;
28 import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; 30 import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
29 import org.thingsboard.server.gen.transport.TransportProtos; 31 import org.thingsboard.server.gen.transport.TransportProtos;
@@ -61,6 +63,7 @@ public class LwM2mClientContextImpl implements LwM2mClientContext { @@ -61,6 +63,7 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
61 private final TbMainSecurityStore securityStore; 63 private final TbMainSecurityStore securityStore;
62 private final TbLwM2MClientStore clientStore; 64 private final TbLwM2MClientStore clientStore;
63 private final LwM2MSessionManager sessionManager; 65 private final LwM2MSessionManager sessionManager;
  66 + private final TransportDeviceProfileCache deviceProfileCache;
64 private final Map<String, LwM2mClient> lwM2mClientsByEndpoint = new ConcurrentHashMap<>(); 67 private final Map<String, LwM2mClient> lwM2mClientsByEndpoint = new ConcurrentHashMap<>();
65 private final Map<String, LwM2mClient> lwM2mClientsByRegistrationId = new ConcurrentHashMap<>(); 68 private final Map<String, LwM2mClient> lwM2mClientsByRegistrationId = new ConcurrentHashMap<>();
66 private final Map<UUID, Lwm2mDeviceProfileTransportConfiguration> profiles = new ConcurrentHashMap<>(); 69 private final Map<UUID, Lwm2mDeviceProfileTransportConfiguration> profiles = new ConcurrentHashMap<>();
@@ -231,12 +234,26 @@ public class LwM2mClientContextImpl implements LwM2mClientContext { @@ -231,12 +234,26 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
231 234
232 @Override 235 @Override
233 public Lwm2mDeviceProfileTransportConfiguration getProfile(UUID profileId) { 236 public Lwm2mDeviceProfileTransportConfiguration getProfile(UUID profileId) {
234 - return profiles.get(profileId); 237 + return doGetAndCache(profileId);
235 } 238 }
236 239
237 @Override 240 @Override
238 public Lwm2mDeviceProfileTransportConfiguration getProfile(Registration registration) { 241 public Lwm2mDeviceProfileTransportConfiguration getProfile(Registration registration) {
239 - return profiles.get(getClientByEndpoint(registration.getEndpoint()).getProfileId()); 242 + UUID profileId = getClientByEndpoint(registration.getEndpoint()).getProfileId();
  243 + Lwm2mDeviceProfileTransportConfiguration result = doGetAndCache(profileId);
  244 + if (result == null) {
  245 + log.debug("[{}] Fetching profile [{}]", registration.getEndpoint(), profileId);
  246 + DeviceProfile deviceProfile = deviceProfileCache.get(new DeviceProfileId(profileId));
  247 + if (deviceProfile != null) {
  248 + profileUpdate(deviceProfile);
  249 + result = doGetAndCache(profileId);
  250 + }
  251 + }
  252 + return result;
  253 + }
  254 +
  255 + private Lwm2mDeviceProfileTransportConfiguration doGetAndCache(UUID profileId) {
  256 + return profiles.get(profileId);
240 } 257 }
241 258
242 @Override 259 @Override
@@ -46,7 +46,7 @@ public class TbLwM2mRedisSecurityStore implements TbEditableSecurityStore { @@ -46,7 +46,7 @@ public class TbLwM2mRedisSecurityStore implements TbEditableSecurityStore {
46 lock = redisLock.obtain(toLockKey(endpoint)); 46 lock = redisLock.obtain(toLockKey(endpoint));
47 lock.lock(); 47 lock.lock();
48 byte[] data = connection.get((SEC_EP + endpoint).getBytes()); 48 byte[] data = connection.get((SEC_EP + endpoint).getBytes());
49 - if (data == null) { 49 + if (data == null || data.length == 0) {
50 return null; 50 return null;
51 } else { 51 } else {
52 return ((TbLwM2MSecurityInfo) serializer.asObject(data)).getSecurityInfo(); 52 return ((TbLwM2MSecurityInfo) serializer.asObject(data)).getSecurityInfo();
@@ -69,7 +69,7 @@ public class TbLwM2mRedisSecurityStore implements TbEditableSecurityStore { @@ -69,7 +69,7 @@ public class TbLwM2mRedisSecurityStore implements TbEditableSecurityStore {
69 return null; 69 return null;
70 } else { 70 } else {
71 byte[] data = connection.get((SEC_EP + new String(ep)).getBytes()); 71 byte[] data = connection.get((SEC_EP + new String(ep)).getBytes());
72 - if (data == null) { 72 + if (data == null || data.length == 0) {
73 return null; 73 return null;
74 } else { 74 } else {
75 return ((TbLwM2MSecurityInfo) serializer.asObject(data)).getSecurityInfo(); 75 return ((TbLwM2MSecurityInfo) serializer.asObject(data)).getSecurityInfo();
@@ -122,7 +122,11 @@ public class TbLwM2mRedisSecurityStore implements TbEditableSecurityStore { @@ -122,7 +122,11 @@ public class TbLwM2mRedisSecurityStore implements TbEditableSecurityStore {
122 lock = redisLock.obtain(endpoint); 122 lock = redisLock.obtain(endpoint);
123 lock.lock(); 123 lock.lock();
124 byte[] data = connection.get((SEC_EP + endpoint).getBytes()); 124 byte[] data = connection.get((SEC_EP + endpoint).getBytes());
125 - return (TbLwM2MSecurityInfo) serializer.asObject(data); 125 + if (data != null && data.length > 0) {
  126 + return (TbLwM2MSecurityInfo) serializer.asObject(data);
  127 + } else {
  128 + return null;
  129 + }
126 } finally { 130 } finally {
127 if (lock != null) { 131 if (lock != null) {
128 lock.unlock(); 132 lock.unlock();
@@ -137,7 +141,7 @@ public class TbLwM2mRedisSecurityStore implements TbEditableSecurityStore { @@ -137,7 +141,7 @@ public class TbLwM2mRedisSecurityStore implements TbEditableSecurityStore {
137 lock = redisLock.obtain(endpoint); 141 lock = redisLock.obtain(endpoint);
138 lock.lock(); 142 lock.lock();
139 byte[] data = connection.get((SEC_EP + endpoint).getBytes()); 143 byte[] data = connection.get((SEC_EP + endpoint).getBytes());
140 - if (data != null) { 144 + if (data != null && data.length > 0) {
141 SecurityInfo info = ((TbLwM2MSecurityInfo) serializer.asObject(data)).getSecurityInfo(); 145 SecurityInfo info = ((TbLwM2MSecurityInfo) serializer.asObject(data)).getSecurityInfo();
142 if (info != null && info.getIdentity() != null) { 146 if (info != null && info.getIdentity() != null) {
143 connection.hDel(PSKID_SEC.getBytes(), info.getIdentity().getBytes()); 147 connection.hDel(PSKID_SEC.getBytes(), info.getIdentity().getBytes());
@@ -321,20 +321,28 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl @@ -321,20 +321,28 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
321 */ 321 */
322 @Override 322 @Override
323 public void onDeviceProfileUpdate(SessionInfoProto sessionInfo, DeviceProfile deviceProfile) { 323 public void onDeviceProfileUpdate(SessionInfoProto sessionInfo, DeviceProfile deviceProfile) {
324 - List<LwM2mClient> clients = clientContext.getLwM2mClients()  
325 - .stream().filter(e -> e.getProfileId().equals(deviceProfile.getUuidId())).collect(Collectors.toList());  
326 - clients.forEach(client -> client.onDeviceProfileUpdate(deviceProfile));  
327 - if (clients.size() > 0) {  
328 - this.onDeviceProfileUpdate(clients, deviceProfile); 324 + try {
  325 + List<LwM2mClient> clients = clientContext.getLwM2mClients()
  326 + .stream().filter(e -> e.getProfileId() != null)
  327 + .filter(e -> e.getProfileId().equals(deviceProfile.getUuidId())).collect(Collectors.toList());
  328 + clients.forEach(client -> client.onDeviceProfileUpdate(deviceProfile));
  329 + if (clients.size() > 0) {
  330 + this.onDeviceProfileUpdate(clients, deviceProfile);
  331 + }
  332 + } catch (Exception e) {
  333 + log.warn("[{}] failed to update profile: {}", deviceProfile.getId(), deviceProfile);
329 } 334 }
330 } 335 }
331 336
332 @Override 337 @Override
333 public void onDeviceUpdate(SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) { 338 public void onDeviceUpdate(SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) {
334 - //TODO: check, maybe device has multiple sessions/registrations? Is this possible according to the standard.  
335 - LwM2mClient client = clientContext.getClientByDeviceId(device.getUuidId());  
336 - if (client != null) {  
337 - this.onDeviceUpdate(client, device, deviceProfileOpt); 339 + try {
  340 + LwM2mClient client = clientContext.getClientByDeviceId(device.getUuidId());
  341 + if (client != null) {
  342 + this.onDeviceUpdate(client, device, deviceProfileOpt);
  343 + }
  344 + } catch (Exception e) {
  345 + log.warn("[{}] failed to update device: {}", device.getId(), device);
338 } 346 }
339 } 347 }
340 348
@@ -210,7 +210,6 @@ public class DefaultTransportService implements TransportService { @@ -210,7 +210,6 @@ public class DefaultTransportService implements TransportService {
210 } 210 }
211 records.forEach(record -> { 211 records.forEach(record -> {
212 try { 212 try {
213 - log.info("[{}] SessionIdMSB, [{}] SessionIdLSB, records", record.getValue().getSessionIdMSB(), record.getValue().getSessionIdLSB());  
214 processToTransportMsg(record.getValue()); 213 processToTransportMsg(record.getValue());
215 } catch (Throwable e) { 214 } catch (Throwable e) {
216 log.warn("Failed to process the notification.", e); 215 log.warn("Failed to process the notification.", e);
@@ -771,6 +770,7 @@ public class DefaultTransportService implements TransportService { @@ -771,6 +770,7 @@ public class DefaultTransportService implements TransportService {
771 UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB()); 770 UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB());
772 SessionMetaData md = sessions.get(sessionId); 771 SessionMetaData md = sessions.get(sessionId);
773 if (md != null) { 772 if (md != null) {
  773 + log.trace("[{}] Processing notification: {}", sessionId, toSessionMsg);
774 SessionMsgListener listener = md.getListener(); 774 SessionMsgListener listener = md.getListener();
775 transportCallbackExecutor.submit(() -> { 775 transportCallbackExecutor.submit(() -> {
776 if (toSessionMsg.hasGetAttributesResponse()) { 776 if (toSessionMsg.hasGetAttributesResponse()) {
@@ -798,12 +798,14 @@ public class DefaultTransportService implements TransportService { @@ -798,12 +798,14 @@ public class DefaultTransportService implements TransportService {
798 deregisterSession(md.getSessionInfo()); 798 deregisterSession(md.getSessionInfo());
799 } 799 }
800 } else { 800 } else {
  801 + log.trace("Processing broadcast notification: {}", toSessionMsg);
801 if (toSessionMsg.hasEntityUpdateMsg()) { 802 if (toSessionMsg.hasEntityUpdateMsg()) {
802 TransportProtos.EntityUpdateMsg msg = toSessionMsg.getEntityUpdateMsg(); 803 TransportProtos.EntityUpdateMsg msg = toSessionMsg.getEntityUpdateMsg();
803 EntityType entityType = EntityType.valueOf(msg.getEntityType()); 804 EntityType entityType = EntityType.valueOf(msg.getEntityType());
804 if (EntityType.DEVICE_PROFILE.equals(entityType)) { 805 if (EntityType.DEVICE_PROFILE.equals(entityType)) {
805 DeviceProfile deviceProfile = deviceProfileCache.put(msg.getData()); 806 DeviceProfile deviceProfile = deviceProfileCache.put(msg.getData());
806 if (deviceProfile != null) { 807 if (deviceProfile != null) {
  808 + log.info("On device profile update: {}", deviceProfile);
807 onProfileUpdate(deviceProfile); 809 onProfileUpdate(deviceProfile);
808 } 810 }
809 } else if (EntityType.TENANT_PROFILE.equals(entityType)) { 811 } else if (EntityType.TENANT_PROFILE.equals(entityType)) {