Commit e9f5fd27066f2bfeaf1316d70d83d59c7e52a0cf

Authored by YevhenBondarenko
Committed by Andrew Shvayka
1 parent 61a5047e

added ability to use redis cluster for lwm2m redis stores.

@@ -146,6 +146,10 @@ @@ -146,6 +146,10 @@
146 <scope>runtime</scope> 146 <scope>runtime</scope>
147 </dependency> 147 </dependency>
148 <dependency> 148 <dependency>
  149 + <groupId>org.springframework.integration</groupId>
  150 + <artifactId>spring-integration-redis</artifactId>
  151 + </dependency>
  152 + <dependency>
149 <groupId>org.springframework.boot</groupId> 153 <groupId>org.springframework.boot</groupId>
150 <artifactId>spring-boot-starter-security</artifactId> 154 <artifactId>spring-boot-starter-security</artifactId>
151 </dependency> 155 </dependency>
@@ -82,7 +82,7 @@ public class MqttTopics { @@ -82,7 +82,7 @@ public class MqttTopics {
82 public static final String DEVICE_FIRMWARE_REQUEST_TOPIC_PATTERN = BASE_DEVICE_API_TOPIC_V2 + FIRMWARE + REQUEST + "/" + REQUEST_ID_PATTERN + CHUNK + CHUNK_PATTERN; 82 public static final String DEVICE_FIRMWARE_REQUEST_TOPIC_PATTERN = BASE_DEVICE_API_TOPIC_V2 + FIRMWARE + REQUEST + "/" + REQUEST_ID_PATTERN + CHUNK + CHUNK_PATTERN;
83 public static final String DEVICE_FIRMWARE_RESPONSES_TOPIC = BASE_DEVICE_API_TOPIC_V2 + FIRMWARE + RESPONSE + "/" + SUB_TOPIC + CHUNK + SUB_TOPIC; 83 public static final String DEVICE_FIRMWARE_RESPONSES_TOPIC = BASE_DEVICE_API_TOPIC_V2 + FIRMWARE + RESPONSE + "/" + SUB_TOPIC + CHUNK + SUB_TOPIC;
84 public static final String DEVICE_FIRMWARE_ERROR_TOPIC = BASE_DEVICE_API_TOPIC_V2 + FIRMWARE + ERROR; 84 public static final String DEVICE_FIRMWARE_ERROR_TOPIC = BASE_DEVICE_API_TOPIC_V2 + FIRMWARE + ERROR;
85 - public static final String DEVICE_FIRMWARE_RESPONSES_TOPIC_FORMAT = BASE_DEVICE_API_TOPIC_V2 + "%s" + RESPONSE + "/"+ "%s" + CHUNK + "%d"; 85 + public static final String DEVICE_FIRMWARE_RESPONSES_TOPIC_FORMAT = BASE_DEVICE_API_TOPIC_V2 + "/%s" + RESPONSE + "/"+ "%s" + CHUNK + "%d";
86 86
87 public static final String DEVICE_SOFTWARE_REQUEST_TOPIC_PATTERN = BASE_DEVICE_API_TOPIC_V2 + SOFTWARE + REQUEST + "/" + REQUEST_ID_PATTERN + CHUNK + CHUNK_PATTERN; 87 public static final String DEVICE_SOFTWARE_REQUEST_TOPIC_PATTERN = BASE_DEVICE_API_TOPIC_V2 + SOFTWARE + REQUEST + "/" + REQUEST_ID_PATTERN + CHUNK + CHUNK_PATTERN;
88 public static final String DEVICE_SOFTWARE_RESPONSES_TOPIC = BASE_DEVICE_API_TOPIC_V2 + SOFTWARE + RESPONSE + "/" + SUB_TOPIC + CHUNK + SUB_TOPIC; 88 public static final String DEVICE_SOFTWARE_RESPONSES_TOPIC = BASE_DEVICE_API_TOPIC_V2 + SOFTWARE + RESPONSE + "/" + SUB_TOPIC + CHUNK + SUB_TOPIC;
@@ -53,6 +53,10 @@ @@ -53,6 +53,10 @@
53 <artifactId>spring-context</artifactId> 53 <artifactId>spring-context</artifactId>
54 </dependency> 54 </dependency>
55 <dependency> 55 <dependency>
  56 + <groupId>org.springframework.integration</groupId>
  57 + <artifactId>spring-integration-redis</artifactId>
  58 + </dependency>
  59 + <dependency>
56 <groupId>org.slf4j</groupId> 60 <groupId>org.slf4j</groupId>
57 <artifactId>slf4j-api</artifactId> 61 <artifactId>slf4j-api</artifactId>
58 </dependency> 62 </dependency>
@@ -24,7 +24,6 @@ import org.eclipse.leshan.core.util.Hex; @@ -24,7 +24,6 @@ import org.eclipse.leshan.core.util.Hex;
24 import org.eclipse.leshan.server.californium.LeshanServer; 24 import org.eclipse.leshan.server.californium.LeshanServer;
25 import org.eclipse.leshan.server.californium.LeshanServerBuilder; 25 import org.eclipse.leshan.server.californium.LeshanServerBuilder;
26 import org.eclipse.leshan.server.californium.registration.CaliforniumRegistrationStore; 26 import org.eclipse.leshan.server.californium.registration.CaliforniumRegistrationStore;
27 -import org.eclipse.leshan.server.californium.registration.InMemoryRegistrationStore;  
28 import org.eclipse.leshan.server.model.LwM2mModelProvider; 27 import org.eclipse.leshan.server.model.LwM2mModelProvider;
29 import org.eclipse.leshan.server.security.DefaultAuthorizer; 28 import org.eclipse.leshan.server.security.DefaultAuthorizer;
30 import org.eclipse.leshan.server.security.EditableSecurityStore; 29 import org.eclipse.leshan.server.security.EditableSecurityStore;
@@ -127,8 +126,8 @@ public class DefaultLwM2mTransportService implements LwM2MTransportService { @@ -127,8 +126,8 @@ public class DefaultLwM2mTransportService implements LwM2MTransportService {
127 builder.setEncoder(new DefaultLwM2mNodeEncoder(LwM2mValueConverterImpl.getInstance())); 126 builder.setEncoder(new DefaultLwM2mNodeEncoder(LwM2mValueConverterImpl.getInstance()));
128 127
129 /* InMemoryRegistrationStore(ScheduledExecutorService schedExecutor, long cleanPeriodInSec) */ 128 /* InMemoryRegistrationStore(ScheduledExecutorService schedExecutor, long cleanPeriodInSec) */
130 - InMemoryRegistrationStore registrationStore = new InMemoryRegistrationStore(this.registrationStoreExecutor, this.config.getCleanPeriodInSec());  
131 - builder.setRegistrationStore(registrationStore); 129 +//// InMemoryRegistrationStore registrationStore = new InMemoryRegistrationStore(this.registrationStoreExecutor, this.config.getCleanPeriodInSec());
  130 +// builder.setRegistrationStore(registrationStore);
132 131
133 /* Create CoAP Config */ 132 /* Create CoAP Config */
134 builder.setCoapConfig(getCoapConfig(config.getPort(), config.getSecurePort())); 133 builder.setCoapConfig(getCoapConfig(config.getPort(), config.getSecurePort()));
@@ -26,9 +26,7 @@ import org.eclipse.leshan.core.util.NamedThreadFactory; @@ -26,9 +26,7 @@ import org.eclipse.leshan.core.util.NamedThreadFactory;
26 import org.eclipse.leshan.core.util.Validate; 26 import org.eclipse.leshan.core.util.Validate;
27 import org.eclipse.leshan.server.californium.observation.ObserveUtil; 27 import org.eclipse.leshan.server.californium.observation.ObserveUtil;
28 import org.eclipse.leshan.server.californium.registration.CaliforniumRegistrationStore; 28 import org.eclipse.leshan.server.californium.registration.CaliforniumRegistrationStore;
29 -import org.eclipse.leshan.server.redis.JedisLock;  
30 import org.eclipse.leshan.server.redis.RedisRegistrationStore; 29 import org.eclipse.leshan.server.redis.RedisRegistrationStore;
31 -import org.eclipse.leshan.server.redis.SingleInstanceJedisLock;  
32 import org.eclipse.leshan.server.redis.serialization.ObservationSerDes; 30 import org.eclipse.leshan.server.redis.serialization.ObservationSerDes;
33 import org.eclipse.leshan.server.redis.serialization.RegistrationSerDes; 31 import org.eclipse.leshan.server.redis.serialization.RegistrationSerDes;
34 import org.eclipse.leshan.server.registration.Deregistration; 32 import org.eclipse.leshan.server.registration.Deregistration;
@@ -38,11 +36,12 @@ import org.eclipse.leshan.server.registration.RegistrationUpdate; @@ -38,11 +36,12 @@ import org.eclipse.leshan.server.registration.RegistrationUpdate;
38 import org.eclipse.leshan.server.registration.UpdatedRegistration; 36 import org.eclipse.leshan.server.registration.UpdatedRegistration;
39 import org.slf4j.Logger; 37 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory; 38 import org.slf4j.LoggerFactory;
  39 +import org.springframework.data.redis.connection.RedisClusterConnection;
  40 +import org.springframework.data.redis.connection.RedisConnection;
41 import org.springframework.data.redis.connection.RedisConnectionFactory; 41 import org.springframework.data.redis.connection.RedisConnectionFactory;
42 -import redis.clients.jedis.Jedis;  
43 -import redis.clients.jedis.ScanParams;  
44 -import redis.clients.jedis.ScanResult;  
45 -import redis.clients.jedis.Transaction; 42 +import org.springframework.data.redis.core.Cursor;
  43 +import org.springframework.data.redis.core.ScanOptions;
  44 +import org.springframework.integration.redis.util.RedisLockRegistry;
46 45
47 import java.net.InetSocketAddress; 46 import java.net.InetSocketAddress;
48 import java.util.ArrayList; 47 import java.util.ArrayList;
@@ -50,13 +49,14 @@ import java.util.Arrays; @@ -50,13 +49,14 @@ import java.util.Arrays;
50 import java.util.Collection; 49 import java.util.Collection;
51 import java.util.Collections; 50 import java.util.Collections;
52 import java.util.Iterator; 51 import java.util.Iterator;
  52 +import java.util.LinkedList;
53 import java.util.List; 53 import java.util.List;
54 -import java.util.NoSuchElementException;  
55 import java.util.Set; 54 import java.util.Set;
56 import java.util.concurrent.Executors; 55 import java.util.concurrent.Executors;
57 import java.util.concurrent.ScheduledExecutorService; 56 import java.util.concurrent.ScheduledExecutorService;
58 import java.util.concurrent.ScheduledFuture; 57 import java.util.concurrent.ScheduledFuture;
59 import java.util.concurrent.TimeUnit; 58 import java.util.concurrent.TimeUnit;
  59 +import java.util.concurrent.locks.Lock;
60 60
61 import static java.nio.charset.StandardCharsets.UTF_8; 61 import static java.nio.charset.StandardCharsets.UTF_8;
62 62
@@ -92,7 +92,7 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto @@ -92,7 +92,7 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto
92 private final int cleanLimit; // maximum number to clean in a clean period 92 private final int cleanLimit; // maximum number to clean in a clean period
93 private final long gracePeriod; // in seconds 93 private final long gracePeriod; // in seconds
94 94
95 - private final JedisLock lock; 95 + private final RedisLockRegistry redisLock;
96 96
97 public TbLwM2mRedisRegistrationStore(RedisConnectionFactory connectionFactory) { 97 public TbLwM2mRedisRegistrationStore(RedisConnectionFactory connectionFactory) {
98 this(connectionFactory, DEFAULT_CLEAN_PERIOD, DEFAULT_GRACE_PERIOD, DEFAULT_CLEAN_LIMIT); // default clean period 60s 98 this(connectionFactory, DEFAULT_CLEAN_PERIOD, DEFAULT_GRACE_PERIOD, DEFAULT_CLEAN_LIMIT); // default clean period 60s
@@ -106,20 +106,12 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto @@ -106,20 +106,12 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto
106 106
107 public TbLwM2mRedisRegistrationStore(RedisConnectionFactory connectionFactory, ScheduledExecutorService schedExecutor, long cleanPeriodInSec, 107 public TbLwM2mRedisRegistrationStore(RedisConnectionFactory connectionFactory, ScheduledExecutorService schedExecutor, long cleanPeriodInSec,
108 long lifetimeGracePeriodInSec, int cleanLimit) { 108 long lifetimeGracePeriodInSec, int cleanLimit) {
109 - this(connectionFactory, schedExecutor, cleanPeriodInSec, lifetimeGracePeriodInSec, cleanLimit, new SingleInstanceJedisLock());  
110 - }  
111 -  
112 - /**  
113 - * @since 1.1  
114 - */  
115 - public TbLwM2mRedisRegistrationStore(RedisConnectionFactory connectionFactory, ScheduledExecutorService schedExecutor, long cleanPeriodInSec,  
116 - long lifetimeGracePeriodInSec, int cleanLimit, JedisLock redisLock) {  
117 this.connectionFactory = connectionFactory; 109 this.connectionFactory = connectionFactory;
118 this.schedExecutor = schedExecutor; 110 this.schedExecutor = schedExecutor;
119 this.cleanPeriod = cleanPeriodInSec; 111 this.cleanPeriod = cleanPeriodInSec;
120 this.cleanLimit = cleanLimit; 112 this.cleanLimit = cleanLimit;
121 this.gracePeriod = lifetimeGracePeriodInSec; 113 this.gracePeriod = lifetimeGracePeriodInSec;
122 - this.lock = redisLock; 114 + this.redisLock = new RedisLockRegistry(connectionFactory, "Registration");
123 } 115 }
124 116
125 /* *************** Redis Key utility function **************** */ 117 /* *************** Redis Key utility function **************** */
@@ -135,76 +127,79 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto @@ -135,76 +127,79 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto
135 return (prefix + registrationID).getBytes(); 127 return (prefix + registrationID).getBytes();
136 } 128 }
137 129
138 - private byte[] toLockKey(String endpoint) {  
139 - return toKey(LOCK_EP, endpoint); 130 + private String toLockKey(String endpoint) {
  131 + return new String(toKey(LOCK_EP, endpoint));
140 } 132 }
141 133
142 - private byte[] toLockKey(byte[] endpoint) {  
143 - return toKey(LOCK_EP.getBytes(UTF_8), endpoint); 134 + private String toLockKey(byte[] endpoint) {
  135 + return new String(toKey(LOCK_EP.getBytes(UTF_8), endpoint));
144 } 136 }
145 137
146 /* *************** Leshan Registration API **************** */ 138 /* *************** Leshan Registration API **************** */
147 139
148 @Override 140 @Override
149 public Deregistration addRegistration(Registration registration) { 141 public Deregistration addRegistration(Registration registration) {
150 - try (Jedis j = (Jedis) connectionFactory.getConnection().getNativeConnection()) {  
151 - byte[] lockValue = null;  
152 - byte[] lockKey = toLockKey(registration.getEndpoint()); 142 + Lock lock = null;
  143 + try (var connection = connectionFactory.getConnection()) {
  144 + String lockKey = toLockKey(registration.getEndpoint());
153 145
154 try { 146 try {
155 - lockValue = lock.acquire(j, lockKey);  
156 - 147 + lock = redisLock.obtain(lockKey);
  148 + lock.lock();
157 // add registration 149 // add registration
158 byte[] k = toEndpointKey(registration.getEndpoint()); 150 byte[] k = toEndpointKey(registration.getEndpoint());
159 - byte[] old = j.getSet(k, serializeReg(registration)); 151 + byte[] old = connection.getSet(k, serializeReg(registration));
160 152
161 // add registration: secondary indexes 153 // add registration: secondary indexes
162 byte[] regid_idx = toRegIdKey(registration.getId()); 154 byte[] regid_idx = toRegIdKey(registration.getId());
163 - j.set(regid_idx, registration.getEndpoint().getBytes(UTF_8)); 155 + connection.set(regid_idx, registration.getEndpoint().getBytes(UTF_8));
164 byte[] addr_idx = toRegAddrKey(registration.getSocketAddress()); 156 byte[] addr_idx = toRegAddrKey(registration.getSocketAddress());
165 - j.set(addr_idx, registration.getEndpoint().getBytes(UTF_8)); 157 + connection.set(addr_idx, registration.getEndpoint().getBytes(UTF_8));
166 158
167 // Add or update expiration 159 // Add or update expiration
168 - addOrUpdateExpiration(j, registration); 160 + addOrUpdateExpiration(connection, registration);
169 161
170 if (old != null) { 162 if (old != null) {
171 Registration oldRegistration = deserializeReg(old); 163 Registration oldRegistration = deserializeReg(old);
172 // remove old secondary index 164 // remove old secondary index
173 if (!registration.getId().equals(oldRegistration.getId())) 165 if (!registration.getId().equals(oldRegistration.getId()))
174 - j.del(toRegIdKey(oldRegistration.getId())); 166 + connection.del(toRegIdKey(oldRegistration.getId()));
175 if (!oldRegistration.getSocketAddress().equals(registration.getSocketAddress())) { 167 if (!oldRegistration.getSocketAddress().equals(registration.getSocketAddress())) {
176 - removeAddrIndex(j, oldRegistration); 168 + removeAddrIndex(connection, oldRegistration);
177 } 169 }
178 // remove old observation 170 // remove old observation
179 - Collection<Observation> obsRemoved = unsafeRemoveAllObservations(j, oldRegistration.getId()); 171 + Collection<Observation> obsRemoved = unsafeRemoveAllObservations(connection, oldRegistration.getId());
180 172
181 return new Deregistration(oldRegistration, obsRemoved); 173 return new Deregistration(oldRegistration, obsRemoved);
182 } 174 }
183 175
184 return null; 176 return null;
185 } finally { 177 } finally {
186 - lock.release(j, lockKey, lockValue); 178 + if (lock != null) {
  179 + lock.unlock();
  180 + }
187 } 181 }
188 } 182 }
189 } 183 }
190 184
191 @Override 185 @Override
192 public UpdatedRegistration updateRegistration(RegistrationUpdate update) { 186 public UpdatedRegistration updateRegistration(RegistrationUpdate update) {
193 - try (Jedis j = (Jedis) connectionFactory.getConnection().getNativeConnection()) { 187 + Lock lock = null;
  188 + try (var connection = connectionFactory.getConnection()) {
194 189
195 // Fetch the registration ep by registration ID index 190 // Fetch the registration ep by registration ID index
196 - byte[] ep = j.get(toRegIdKey(update.getRegistrationId())); 191 + byte[] ep = connection.get(toRegIdKey(update.getRegistrationId()));
197 if (ep == null) { 192 if (ep == null) {
198 return null; 193 return null;
199 } 194 }
200 195
201 - byte[] lockValue = null;  
202 - byte[] lockKey = toLockKey(ep); 196 + String lockKey = toLockKey(ep);
203 try { 197 try {
204 - lockValue = lock.acquire(j, lockKey); 198 + lock = redisLock.obtain(lockKey);
  199 + lock.lock();
205 200
206 // Fetch the registration 201 // Fetch the registration
207 - byte[] data = j.get(toEndpointKey(ep)); 202 + byte[] data = connection.get(toEndpointKey(ep));
208 if (data == null) { 203 if (data == null) {
209 return null; 204 return null;
210 } 205 }
@@ -214,40 +209,42 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto @@ -214,40 +209,42 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto
214 Registration updatedRegistration = update.update(r); 209 Registration updatedRegistration = update.update(r);
215 210
216 // Store the new registration 211 // Store the new registration
217 - j.set(toEndpointKey(updatedRegistration.getEndpoint()), serializeReg(updatedRegistration)); 212 + connection.set(toEndpointKey(updatedRegistration.getEndpoint()), serializeReg(updatedRegistration));
218 213
219 // Add or update expiration 214 // Add or update expiration
220 - addOrUpdateExpiration(j, updatedRegistration); 215 + addOrUpdateExpiration(connection, updatedRegistration);
221 216
222 // Update secondary index : 217 // Update secondary index :
223 // If registration is already associated to this address we don't care as we only want to keep the most 218 // If registration is already associated to this address we don't care as we only want to keep the most
224 // recent binding. 219 // recent binding.
225 byte[] addr_idx = toRegAddrKey(updatedRegistration.getSocketAddress()); 220 byte[] addr_idx = toRegAddrKey(updatedRegistration.getSocketAddress());
226 - j.set(addr_idx, updatedRegistration.getEndpoint().getBytes(UTF_8)); 221 + connection.set(addr_idx, updatedRegistration.getEndpoint().getBytes(UTF_8));
227 if (!r.getSocketAddress().equals(updatedRegistration.getSocketAddress())) { 222 if (!r.getSocketAddress().equals(updatedRegistration.getSocketAddress())) {
228 - removeAddrIndex(j, r); 223 + removeAddrIndex(connection, r);
229 } 224 }
230 225
231 return new UpdatedRegistration(r, updatedRegistration); 226 return new UpdatedRegistration(r, updatedRegistration);
232 227
233 } finally { 228 } finally {
234 - lock.release(j, lockKey, lockValue); 229 + if (lock != null) {
  230 + lock.unlock();
  231 + }
235 } 232 }
236 } 233 }
237 } 234 }
238 235
239 @Override 236 @Override
240 public Registration getRegistration(String registrationId) { 237 public Registration getRegistration(String registrationId) {
241 - try (Jedis j = (Jedis) connectionFactory.getConnection().getNativeConnection()) {  
242 - return getRegistration(j, registrationId); 238 + try (var connection = connectionFactory.getConnection()) {
  239 + return getRegistration(connection, registrationId);
243 } 240 }
244 } 241 }
245 242
246 @Override 243 @Override
247 public Registration getRegistrationByEndpoint(String endpoint) { 244 public Registration getRegistrationByEndpoint(String endpoint) {
248 Validate.notNull(endpoint); 245 Validate.notNull(endpoint);
249 - try (Jedis j = (Jedis) connectionFactory.getConnection().getNativeConnection()) {  
250 - byte[] data = j.get(toEndpointKey(endpoint)); 246 + try (var connection = connectionFactory.getConnection()) {
  247 + byte[] data = connection.get(toEndpointKey(endpoint));
251 if (data == null) { 248 if (data == null) {
252 return null; 249 return null;
253 } 250 }
@@ -258,12 +255,12 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto @@ -258,12 +255,12 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto
258 @Override 255 @Override
259 public Registration getRegistrationByAdress(InetSocketAddress address) { 256 public Registration getRegistrationByAdress(InetSocketAddress address) {
260 Validate.notNull(address); 257 Validate.notNull(address);
261 - try (Jedis j = (Jedis) connectionFactory.getConnection().getNativeConnection()) {  
262 - byte[] ep = j.get(toRegAddrKey(address)); 258 + try (var connection = connectionFactory.getConnection()) {
  259 + byte[] ep = connection.get(toRegAddrKey(address));
263 if (ep == null) { 260 if (ep == null) {
264 return null; 261 return null;
265 } 262 }
266 - byte[] data = j.get(toEndpointKey(ep)); 263 + byte[] data = connection.get(toEndpointKey(ep));
267 if (data == null) { 264 if (data == null) {
268 return null; 265 return null;
269 } 266 }
@@ -273,140 +270,99 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto @@ -273,140 +270,99 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto
273 270
274 @Override 271 @Override
275 public Iterator<Registration> getAllRegistrations() { 272 public Iterator<Registration> getAllRegistrations() {
276 - return new TbLwM2mRedisRegistrationStore.RedisIterator(connectionFactory, new ScanParams().match(REG_EP + "*").count(100));  
277 - }  
278 -  
279 - protected class RedisIterator implements Iterator<Registration> {  
280 -  
281 - private final RedisConnectionFactory connectionFactory;  
282 - private final ScanParams scanParams;  
283 -  
284 - private String cursor;  
285 - private List<Registration> scanResult;  
286 -  
287 - public RedisIterator(RedisConnectionFactory connectionFactory, ScanParams scanParams) {  
288 - this.connectionFactory = connectionFactory;  
289 - this.scanParams = scanParams;  
290 - // init scan result  
291 - scanNext("0");  
292 - }  
293 -  
294 - private void scanNext(String cursor) {  
295 - try (Jedis j = (Jedis) connectionFactory.getConnection().getNativeConnection()) {  
296 - do {  
297 - ScanResult<byte[]> sr = j.scan(cursor.getBytes(), scanParams);  
298 -  
299 - this.scanResult = new ArrayList<>();  
300 - if (sr.getResult() != null && !sr.getResult().isEmpty()) {  
301 - for (byte[] value : j.mget(sr.getResult().toArray(new byte[][]{}))) {  
302 - this.scanResult.add(deserializeReg(value));  
303 - }  
304 - }  
305 -  
306 - cursor = sr.getCursor();  
307 - } while (!"0".equals(cursor) && scanResult.isEmpty());  
308 -  
309 - this.cursor = cursor;  
310 - }  
311 - }  
312 -  
313 - @Override  
314 - public boolean hasNext() {  
315 - if (!scanResult.isEmpty()) {  
316 - return true;  
317 - }  
318 - if ("0".equals(cursor)) {  
319 - // no more elements to scan  
320 - return false;  
321 - }  
322 -  
323 - // read more elements  
324 - scanNext(cursor);  
325 - return !scanResult.isEmpty();  
326 - }  
327 -  
328 - @Override  
329 - public Registration next() {  
330 - if (!hasNext()) {  
331 - throw new NoSuchElementException(); 273 + try (var connection = connectionFactory.getConnection()) {
  274 + Collection<Registration> list = new LinkedList<>();
  275 + ScanOptions scanOptions = ScanOptions.scanOptions().count(100).match(REG_EP + "*").build();
  276 + List<Cursor<byte[]>> scans = new ArrayList<>();
  277 + if (connection instanceof RedisClusterConnection) {
  278 + ((RedisClusterConnection) connection).clusterGetNodes().forEach(node -> {
  279 + scans.add(((RedisClusterConnection) connection).scan(node, scanOptions));
  280 + });
  281 + } else {
  282 + scans.add(connection.scan(scanOptions));
332 } 283 }
333 - return scanResult.remove(0);  
334 - }  
335 284
336 - @Override  
337 - public void remove() {  
338 - throw new UnsupportedOperationException(); 285 + scans.forEach(scan -> {
  286 + scan.forEachRemaining(key -> {
  287 + byte[] element = connection.get(key);
  288 + list.add(deserializeReg(element));
  289 + });
  290 + });
  291 + return list.iterator();
339 } 292 }
340 } 293 }
341 294
342 @Override 295 @Override
343 public Deregistration removeRegistration(String registrationId) { 296 public Deregistration removeRegistration(String registrationId) {
344 - try (Jedis j = (Jedis) connectionFactory.getConnection().getNativeConnection()) {  
345 - return removeRegistration(j, registrationId, false); 297 + try (var connection = connectionFactory.getConnection()) {
  298 + return removeRegistration(connection, registrationId, false);
346 } 299 }
347 } 300 }
348 301
349 - private Deregistration removeRegistration(Jedis j, String registrationId, boolean removeOnlyIfNotAlive) { 302 + private Deregistration removeRegistration(RedisConnection connection, String registrationId, boolean removeOnlyIfNotAlive) {
350 // fetch the client ep by registration ID index 303 // fetch the client ep by registration ID index
351 - byte[] ep = j.get(toRegIdKey(registrationId)); 304 + byte[] ep = connection.get(toRegIdKey(registrationId));
352 if (ep == null) { 305 if (ep == null) {
353 return null; 306 return null;
354 } 307 }
355 308
356 - byte[] lockValue = null;  
357 - byte[] lockKey = toLockKey(ep); 309 + Lock lock = null;
  310 + String lockKey = toLockKey(ep);
358 try { 311 try {
359 - lockValue = lock.acquire(j, lockKey); 312 + lock = redisLock.obtain(lockKey);
  313 + lock.lock();
360 314
361 // fetch the client 315 // fetch the client
362 - byte[] data = j.get(toEndpointKey(ep)); 316 + byte[] data = connection.get(toEndpointKey(ep));
363 if (data == null) { 317 if (data == null) {
364 return null; 318 return null;
365 } 319 }
366 Registration r = deserializeReg(data); 320 Registration r = deserializeReg(data);
367 321
368 if (!removeOnlyIfNotAlive || !r.isAlive(gracePeriod)) { 322 if (!removeOnlyIfNotAlive || !r.isAlive(gracePeriod)) {
369 - long nbRemoved = j.del(toRegIdKey(r.getId())); 323 + long nbRemoved = connection.del(toRegIdKey(r.getId()));
370 if (nbRemoved > 0) { 324 if (nbRemoved > 0) {
371 - j.del(toEndpointKey(r.getEndpoint()));  
372 - Collection<Observation> obsRemoved = unsafeRemoveAllObservations(j, r.getId());  
373 - removeAddrIndex(j, r);  
374 - removeExpiration(j, r); 325 + connection.del(toEndpointKey(r.getEndpoint()));
  326 + Collection<Observation> obsRemoved = unsafeRemoveAllObservations(connection, r.getId());
  327 + removeAddrIndex(connection, r);
  328 + removeExpiration(connection, r);
375 return new Deregistration(r, obsRemoved); 329 return new Deregistration(r, obsRemoved);
376 } 330 }
377 } 331 }
378 return null; 332 return null;
379 } finally { 333 } finally {
380 - lock.release(j, lockKey, lockValue); 334 + if (lock != null) {
  335 + lock.unlock();
  336 + }
381 } 337 }
382 } 338 }
383 339
384 - private void removeAddrIndex(Jedis j, Registration registration) { 340 + private void removeAddrIndex(RedisConnection connection, Registration registration) {
385 // Watch the key to remove. 341 // Watch the key to remove.
386 byte[] regAddrKey = toRegAddrKey(registration.getSocketAddress()); 342 byte[] regAddrKey = toRegAddrKey(registration.getSocketAddress());
387 - j.watch(regAddrKey); 343 + connection.watch(regAddrKey);
388 344
389 - byte[] epFromAddr = j.get(regAddrKey); 345 + byte[] epFromAddr = connection.get(regAddrKey);
390 // Delete the key if needed. 346 // Delete the key if needed.
391 if (Arrays.equals(epFromAddr, registration.getEndpoint().getBytes(UTF_8))) { 347 if (Arrays.equals(epFromAddr, registration.getEndpoint().getBytes(UTF_8))) {
392 // Try to delete the key 348 // Try to delete the key
393 - Transaction transaction = j.multi();  
394 - transaction.del(regAddrKey);  
395 - transaction.exec(); 349 + connection.multi();
  350 + connection.del(regAddrKey);
  351 + connection.exec();
396 // if transaction failed this is not an issue as the socket address is probably reused and we don't neeed to 352 // if transaction failed this is not an issue as the socket address is probably reused and we don't neeed to
397 // delete it anymore. 353 // delete it anymore.
398 } else { 354 } else {
399 // the key must not be deleted. 355 // the key must not be deleted.
400 - j.unwatch(); 356 + connection.unwatch();
401 } 357 }
402 } 358 }
403 359
404 - private void addOrUpdateExpiration(Jedis j, Registration registration) {  
405 - j.zadd(EXP_EP, registration.getExpirationTimeStamp(gracePeriod), registration.getEndpoint().getBytes(UTF_8)); 360 + private void addOrUpdateExpiration(RedisConnection connection, Registration registration) {
  361 + connection.zAdd(EXP_EP, registration.getExpirationTimeStamp(gracePeriod), registration.getEndpoint().getBytes(UTF_8));
406 } 362 }
407 363
408 - private void removeExpiration(Jedis j, Registration registration) {  
409 - j.zrem(EXP_EP, registration.getEndpoint().getBytes(UTF_8)); 364 + private void removeExpiration(RedisConnection connection, Registration registration) {
  365 + connection.zRem(EXP_EP, registration.getEndpoint().getBytes(UTF_8));
410 } 366 }
411 367
412 private byte[] toRegIdKey(String registrationId) { 368 private byte[] toRegIdKey(String registrationId) {
@@ -441,33 +397,35 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto @@ -441,33 +397,35 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto
441 */ 397 */
442 @Override 398 @Override
443 public Collection<Observation> addObservation(String registrationId, Observation observation) { 399 public Collection<Observation> addObservation(String registrationId, Observation observation) {
444 -  
445 List<Observation> removed = new ArrayList<>(); 400 List<Observation> removed = new ArrayList<>();
446 - try (Jedis j = (Jedis) connectionFactory.getConnection().getNativeConnection()) { 401 + try (var connection = connectionFactory.getConnection()) {
447 402
448 // fetch the client ep by registration ID index 403 // fetch the client ep by registration ID index
449 - byte[] ep = j.get(toRegIdKey(registrationId)); 404 + byte[] ep = connection.get(toRegIdKey(registrationId));
450 if (ep == null) { 405 if (ep == null) {
451 return null; 406 return null;
452 } 407 }
453 408
454 - byte[] lockValue = null;  
455 - byte[] lockKey = toLockKey(ep); 409 + Lock lock = null;
  410 + String lockKey = toLockKey(ep);
456 411
457 try { 412 try {
458 - lockValue = lock.acquire(j, lockKey); 413 + lock = redisLock.obtain(lockKey);
  414 + lock.lock();
459 415
460 // cancel existing observations for the same path and registration id. 416 // cancel existing observations for the same path and registration id.
461 - for (Observation obs : getObservations(j, registrationId)) { 417 + for (Observation obs : getObservations(connection, registrationId)) {
462 if (observation.getPath().equals(obs.getPath()) 418 if (observation.getPath().equals(obs.getPath())
463 && !Arrays.equals(observation.getId(), obs.getId())) { 419 && !Arrays.equals(observation.getId(), obs.getId())) {
464 removed.add(obs); 420 removed.add(obs);
465 - unsafeRemoveObservation(j, registrationId, obs.getId()); 421 + unsafeRemoveObservation(connection, registrationId, obs.getId());
466 } 422 }
467 } 423 }
468 424
469 } finally { 425 } finally {
470 - lock.release(j, lockKey, lockValue); 426 + if (lock != null) {
  427 + lock.unlock();
  428 + }
471 } 429 }
472 } 430 }
473 return removed; 431 return removed;
@@ -475,29 +433,32 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto @@ -475,29 +433,32 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto
475 433
476 @Override 434 @Override
477 public Observation removeObservation(String registrationId, byte[] observationId) { 435 public Observation removeObservation(String registrationId, byte[] observationId) {
478 - try (Jedis j = (Jedis) connectionFactory.getConnection().getNativeConnection()) { 436 + try (var connection = connectionFactory.getConnection()) {
479 437
480 // fetch the client ep by registration ID index 438 // fetch the client ep by registration ID index
481 - byte[] ep = j.get(toRegIdKey(registrationId)); 439 + byte[] ep = connection.get(toRegIdKey(registrationId));
482 if (ep == null) { 440 if (ep == null) {
483 return null; 441 return null;
484 } 442 }
485 443
486 // remove observation 444 // remove observation
487 - byte[] lockValue = null;  
488 - byte[] lockKey = toLockKey(ep); 445 + Lock lock = null;
  446 + String lockKey = toLockKey(ep);
489 try { 447 try {
490 - lockValue = lock.acquire(j, lockKey); 448 + lock = redisLock.obtain(lockKey);
  449 + lock.lock();
491 450
492 Observation observation = build(get(new Token(observationId))); 451 Observation observation = build(get(new Token(observationId)));
493 if (observation != null && registrationId.equals(observation.getRegistrationId())) { 452 if (observation != null && registrationId.equals(observation.getRegistrationId())) {
494 - unsafeRemoveObservation(j, registrationId, observationId); 453 + unsafeRemoveObservation(connection, registrationId, observationId);
495 return observation; 454 return observation;
496 } 455 }
497 return null; 456 return null;
498 457
499 } finally { 458 } finally {
500 - lock.release(j, lockKey, lockValue); 459 + if (lock != null) {
  460 + lock.unlock();
  461 + }
501 } 462 }
502 } 463 }
503 } 464 }
@@ -509,15 +470,15 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto @@ -509,15 +470,15 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto
509 470
510 @Override 471 @Override
511 public Collection<Observation> getObservations(String registrationId) { 472 public Collection<Observation> getObservations(String registrationId) {
512 - try (Jedis j = (Jedis) connectionFactory.getConnection().getNativeConnection()) {  
513 - return getObservations(j, registrationId); 473 + try (var connection = connectionFactory.getConnection()) {
  474 + return getObservations(connection, registrationId);
514 } 475 }
515 } 476 }
516 477
517 - private Collection<Observation> getObservations(Jedis j, String registrationId) { 478 + private Collection<Observation> getObservations(RedisConnection connection, String registrationId) {
518 Collection<Observation> result = new ArrayList<>(); 479 Collection<Observation> result = new ArrayList<>();
519 - for (byte[] token : j.lrange(toKey(OBS_TKNS_REGID_IDX, registrationId), 0, -1)) {  
520 - byte[] obs = j.get(toKey(OBS_TKN, token)); 480 + for (byte[] token : connection.lRange(toKey(OBS_TKNS_REGID_IDX, registrationId), 0, -1)) {
  481 + byte[] obs = connection.get(toKey(OBS_TKN, token));
521 if (obs != null) { 482 if (obs != null) {
522 result.add(build(deserializeObs(obs))); 483 result.add(build(deserializeObs(obs)));
523 } 484 }
@@ -527,22 +488,24 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto @@ -527,22 +488,24 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto
527 488
528 @Override 489 @Override
529 public Collection<Observation> removeObservations(String registrationId) { 490 public Collection<Observation> removeObservations(String registrationId) {
530 - try (Jedis j = (Jedis) connectionFactory.getConnection().getNativeConnection()) { 491 + try (var connection = connectionFactory.getConnection()) {
531 // check registration exists 492 // check registration exists
532 - Registration registration = getRegistration(j, registrationId); 493 + Registration registration = getRegistration(connection, registrationId);
533 if (registration == null) 494 if (registration == null)
534 return Collections.emptyList(); 495 return Collections.emptyList();
535 496
536 // get endpoint and create lock 497 // get endpoint and create lock
537 String endpoint = registration.getEndpoint(); 498 String endpoint = registration.getEndpoint();
538 - byte[] lockValue = null;  
539 - byte[] lockKey = toKey(LOCK_EP, endpoint); 499 + Lock lock = null;
  500 + String lockKey = toLockKey(endpoint);
540 try { 501 try {
541 - lockValue = lock.acquire(j, lockKey);  
542 -  
543 - return unsafeRemoveAllObservations(j, registrationId); 502 + lock = redisLock.obtain(lockKey);
  503 + lock.lock();
  504 + return unsafeRemoveAllObservations(connection, registrationId);
544 } finally { 505 } finally {
545 - lock.release(j, lockKey, lockValue); 506 + if (lock != null) {
  507 + lock.unlock();
  508 + }
546 } 509 }
547 } 510 }
548 } 511 }
@@ -565,31 +528,32 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto @@ -565,31 +528,32 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto
565 String endpoint = ObserveUtil.validateCoapObservation(obs); 528 String endpoint = ObserveUtil.validateCoapObservation(obs);
566 org.eclipse.californium.core.observe.Observation previousObservation = null; 529 org.eclipse.californium.core.observe.Observation previousObservation = null;
567 530
568 - try (Jedis j = (Jedis) connectionFactory.getConnection().getNativeConnection()) {  
569 - byte[] lockValue = null;  
570 - byte[] lockKey = toKey(LOCK_EP, endpoint); 531 + try (var connection = connectionFactory.getConnection()) {
  532 + Lock lock = null;
  533 + String lockKey = toLockKey(endpoint);
571 try { 534 try {
572 - lockValue = lock.acquire(j, lockKey); 535 + lock = redisLock.obtain(lockKey);
  536 + lock.lock();
573 537
574 String registrationId = ObserveUtil.extractRegistrationId(obs); 538 String registrationId = ObserveUtil.extractRegistrationId(obs);
575 - if (!j.exists(toRegIdKey(registrationId))) 539 + if (!connection.exists(toRegIdKey(registrationId)))
576 throw new ObservationStoreException("no registration for this Id"); 540 throw new ObservationStoreException("no registration for this Id");
577 byte[] key = toKey(OBS_TKN, obs.getRequest().getToken().getBytes()); 541 byte[] key = toKey(OBS_TKN, obs.getRequest().getToken().getBytes());
578 byte[] serializeObs = serializeObs(obs); 542 byte[] serializeObs = serializeObs(obs);
579 byte[] previousValue; 543 byte[] previousValue;
580 if (ifAbsent) { 544 if (ifAbsent) {
581 - previousValue = j.get(key); 545 + previousValue = connection.get(key);
582 if (previousValue == null || previousValue.length == 0) { 546 if (previousValue == null || previousValue.length == 0) {
583 - j.set(key, serializeObs); 547 + connection.set(key, serializeObs);
584 } else { 548 } else {
585 return deserializeObs(previousValue); 549 return deserializeObs(previousValue);
586 } 550 }
587 } else { 551 } else {
588 - previousValue = j.getSet(key, serializeObs); 552 + previousValue = connection.getSet(key, serializeObs);
589 } 553 }
590 554
591 // secondary index to get the list by registrationId 555 // secondary index to get the list by registrationId
592 - j.lpush(toKey(OBS_TKNS_REGID_IDX, registrationId), obs.getRequest().getToken().getBytes()); 556 + connection.lPush(toKey(OBS_TKNS_REGID_IDX, registrationId), obs.getRequest().getToken().getBytes());
593 557
594 // log any collisions 558 // log any collisions
595 if (previousValue != null && previousValue.length != 0) { 559 if (previousValue != null && previousValue.length != 0) {
@@ -599,7 +563,9 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto @@ -599,7 +563,9 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto
599 previousObservation.getRequest(), obs.getRequest()); 563 previousObservation.getRequest(), obs.getRequest());
600 } 564 }
601 } finally { 565 } finally {
602 - lock.release(j, lockKey, lockValue); 566 + if (lock != null) {
  567 + lock.unlock();
  568 + }
603 } 569 }
604 } 570 }
605 return previousObservation; 571 return previousObservation;
@@ -607,17 +573,17 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto @@ -607,17 +573,17 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto
607 573
608 @Override 574 @Override
609 public void remove(Token token) { 575 public void remove(Token token) {
610 - try (Jedis j = (Jedis) connectionFactory.getConnection().getNativeConnection()) { 576 + try (var connection = connectionFactory.getConnection()) {
611 byte[] tokenKey = toKey(OBS_TKN, token.getBytes()); 577 byte[] tokenKey = toKey(OBS_TKN, token.getBytes());
612 578
613 // fetch the observation by token 579 // fetch the observation by token
614 - byte[] serializedObs = j.get(tokenKey); 580 + byte[] serializedObs = connection.get(tokenKey);
615 if (serializedObs == null) 581 if (serializedObs == null)
616 return; 582 return;
617 583
618 org.eclipse.californium.core.observe.Observation obs = deserializeObs(serializedObs); 584 org.eclipse.californium.core.observe.Observation obs = deserializeObs(serializedObs);
619 String registrationId = ObserveUtil.extractRegistrationId(obs); 585 String registrationId = ObserveUtil.extractRegistrationId(obs);
620 - Registration registration = getRegistration(j, registrationId); 586 + Registration registration = getRegistration(connection, registrationId);
621 if (registration == null) { 587 if (registration == null) {
622 LOG.warn("Unable to remove observation {}, registration {} does not exist anymore", obs.getRequest(), 588 LOG.warn("Unable to remove observation {}, registration {} does not exist anymore", obs.getRequest(),
623 registrationId); 589 registrationId);
@@ -625,14 +591,17 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto @@ -625,14 +591,17 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto
625 } 591 }
626 592
627 String endpoint = registration.getEndpoint(); 593 String endpoint = registration.getEndpoint();
628 - byte[] lockValue = null;  
629 - byte[] lockKey = toKey(LOCK_EP, endpoint); 594 + Lock lock = null;
  595 + String lockKey = toLockKey(endpoint);
630 try { 596 try {
631 - lockValue = lock.acquire(j, lockKey); 597 + lock = redisLock.obtain(lockKey);
  598 + lock.lock();
632 599
633 - unsafeRemoveObservation(j, registrationId, token.getBytes()); 600 + unsafeRemoveObservation(connection, registrationId, token.getBytes());
634 } finally { 601 } finally {
635 - lock.release(j, lockKey, lockValue); 602 + if (lock != null) {
  603 + lock.unlock();
  604 + }
636 } 605 }
637 } 606 }
638 607
@@ -640,8 +609,8 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto @@ -640,8 +609,8 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto
640 609
641 @Override 610 @Override
642 public org.eclipse.californium.core.observe.Observation get(Token token) { 611 public org.eclipse.californium.core.observe.Observation get(Token token) {
643 - try (Jedis j = (Jedis) connectionFactory.getConnection().getNativeConnection()) {  
644 - byte[] obs = j.get(toKey(OBS_TKN, token.getBytes())); 612 + try (var connection = connectionFactory.getConnection()) {
  613 + byte[] obs = connection.get(toKey(OBS_TKN, token.getBytes()));
645 if (obs == null) { 614 if (obs == null) {
646 return null; 615 return null;
647 } else { 616 } else {
@@ -652,12 +621,12 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto @@ -652,12 +621,12 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto
652 621
653 /* *************** Observation utility functions **************** */ 622 /* *************** Observation utility functions **************** */
654 623
655 - private Registration getRegistration(Jedis j, String registrationId) {  
656 - byte[] ep = j.get(toRegIdKey(registrationId)); 624 + private Registration getRegistration(RedisConnection connection, String registrationId) {
  625 + byte[] ep = connection.get(toRegIdKey(registrationId));
657 if (ep == null) { 626 if (ep == null) {
658 return null; 627 return null;
659 } 628 }
660 - byte[] data = j.get(toEndpointKey(ep)); 629 + byte[] data = connection.get(toEndpointKey(ep));
661 if (data == null) { 630 if (data == null) {
662 return null; 631 return null;
663 } 632 }
@@ -665,25 +634,25 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto @@ -665,25 +634,25 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto
665 return deserializeReg(data); 634 return deserializeReg(data);
666 } 635 }
667 636
668 - private void unsafeRemoveObservation(Jedis j, String registrationId, byte[] observationId) {  
669 - if (j.del(toKey(OBS_TKN, observationId)) > 0L) {  
670 - j.lrem(toKey(OBS_TKNS_REGID_IDX, registrationId), 0, observationId); 637 + private void unsafeRemoveObservation(RedisConnection connection, String registrationId, byte[] observationId) {
  638 + if (connection.del(toKey(OBS_TKN, observationId)) > 0L) {
  639 + connection.lRem(toKey(OBS_TKNS_REGID_IDX, registrationId), 0, observationId);
671 } 640 }
672 } 641 }
673 642
674 - private Collection<Observation> unsafeRemoveAllObservations(Jedis j, String registrationId) { 643 + private Collection<Observation> unsafeRemoveAllObservations(RedisConnection connection, String registrationId) {
675 Collection<Observation> removed = new ArrayList<>(); 644 Collection<Observation> removed = new ArrayList<>();
676 byte[] regIdKey = toKey(OBS_TKNS_REGID_IDX, registrationId); 645 byte[] regIdKey = toKey(OBS_TKNS_REGID_IDX, registrationId);
677 646
678 // fetch all observations by token 647 // fetch all observations by token
679 - for (byte[] token : j.lrange(regIdKey, 0, -1)) {  
680 - byte[] obs = j.get(toKey(OBS_TKN, token)); 648 + for (byte[] token : connection.lRange(regIdKey, 0, -1)) {
  649 + byte[] obs = connection.get(toKey(OBS_TKN, token));
681 if (obs != null) { 650 if (obs != null) {
682 removed.add(build(deserializeObs(obs))); 651 removed.add(build(deserializeObs(obs)));
683 } 652 }
684 - j.del(toKey(OBS_TKN, token)); 653 + connection.del(toKey(OBS_TKN, token));
685 } 654 }
686 - j.del(regIdKey); 655 + connection.del(regIdKey);
687 656
688 return removed; 657 return removed;
689 } 658 }
@@ -754,14 +723,14 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto @@ -754,14 +723,14 @@ public class TbLwM2mRedisRegistrationStore implements CaliforniumRegistrationSto
754 @Override 723 @Override
755 public void run() { 724 public void run() {
756 725
757 - try (Jedis j = (Jedis) connectionFactory.getConnection().getNativeConnection()) {  
758 - Set<byte[]> endpointsExpired = j.zrangeByScore(EXP_EP, Double.NEGATIVE_INFINITY, 726 + try (var connection = connectionFactory.getConnection()) {
  727 + Set<byte[]> endpointsExpired = connection.zRangeByScore(EXP_EP, Double.NEGATIVE_INFINITY,
759 System.currentTimeMillis(), 0, cleanLimit); 728 System.currentTimeMillis(), 0, cleanLimit);
760 729
761 for (byte[] endpoint : endpointsExpired) { 730 for (byte[] endpoint : endpointsExpired) {
762 - Registration r = deserializeReg(j.get(toEndpointKey(endpoint))); 731 + Registration r = deserializeReg(connection.get(toEndpointKey(endpoint)));
763 if (!r.isAlive(gracePeriod)) { 732 if (!r.isAlive(gracePeriod)) {
764 - Deregistration dereg = removeRegistration(j, r.getId(), true); 733 + Deregistration dereg = removeRegistration(connection, r.getId(), true);
765 if (dereg != null) 734 if (dereg != null)
766 expirationListener.registrationExpired(dereg.getRegistration(), dereg.getObservations()); 735 expirationListener.registrationExpired(dereg.getRegistration(), dereg.getObservations());
767 } 736 }
@@ -20,13 +20,15 @@ import org.eclipse.leshan.server.security.EditableSecurityStore; @@ -20,13 +20,15 @@ import org.eclipse.leshan.server.security.EditableSecurityStore;
20 import org.eclipse.leshan.server.security.NonUniqueSecurityInfoException; 20 import org.eclipse.leshan.server.security.NonUniqueSecurityInfoException;
21 import org.eclipse.leshan.server.security.SecurityInfo; 21 import org.eclipse.leshan.server.security.SecurityInfo;
22 import org.eclipse.leshan.server.security.SecurityStoreListener; 22 import org.eclipse.leshan.server.security.SecurityStoreListener;
  23 +import org.springframework.data.redis.connection.RedisClusterConnection;
23 import org.springframework.data.redis.connection.RedisConnectionFactory; 24 import org.springframework.data.redis.connection.RedisConnectionFactory;
24 -import redis.clients.jedis.Jedis;  
25 -import redis.clients.jedis.ScanParams;  
26 -import redis.clients.jedis.ScanResult; 25 +import org.springframework.data.redis.core.Cursor;
  26 +import org.springframework.data.redis.core.ScanOptions;
27 27
  28 +import java.util.ArrayList;
28 import java.util.Collection; 29 import java.util.Collection;
29 import java.util.LinkedList; 30 import java.util.LinkedList;
  31 +import java.util.List;
30 32
31 public class TbLwM2mRedisSecurityStore implements EditableSecurityStore { 33 public class TbLwM2mRedisSecurityStore implements EditableSecurityStore {
32 private static final String SEC_EP = "SEC#EP#"; 34 private static final String SEC_EP = "SEC#EP#";
@@ -42,8 +44,8 @@ public class TbLwM2mRedisSecurityStore implements EditableSecurityStore { @@ -42,8 +44,8 @@ public class TbLwM2mRedisSecurityStore implements EditableSecurityStore {
42 44
43 @Override 45 @Override
44 public SecurityInfo getByEndpoint(String endpoint) { 46 public SecurityInfo getByEndpoint(String endpoint) {
45 - try (Jedis j = (Jedis) connectionFactory.getConnection().getNativeConnection()) {  
46 - byte[] data = j.get((SEC_EP + endpoint).getBytes()); 47 + try (var connection = connectionFactory.getConnection()) {
  48 + byte[] data = connection.get((SEC_EP + endpoint).getBytes());
47 if (data == null) { 49 if (data == null) {
48 return null; 50 return null;
49 } else { 51 } else {
@@ -54,12 +56,12 @@ public class TbLwM2mRedisSecurityStore implements EditableSecurityStore { @@ -54,12 +56,12 @@ public class TbLwM2mRedisSecurityStore implements EditableSecurityStore {
54 56
55 @Override 57 @Override
56 public SecurityInfo getByIdentity(String identity) { 58 public SecurityInfo getByIdentity(String identity) {
57 - try (Jedis j = (Jedis) connectionFactory.getConnection().getNativeConnection()) {  
58 - String ep = j.hget(PSKID_SEC, identity); 59 + try (var connection = connectionFactory.getConnection()) {
  60 + byte[] ep = connection.hGet(PSKID_SEC.getBytes(), identity.getBytes());
59 if (ep == null) { 61 if (ep == null) {
60 return null; 62 return null;
61 } else { 63 } else {
62 - byte[] data = j.get((SEC_EP + ep).getBytes()); 64 + byte[] data = connection.get((SEC_EP + new String(ep)).getBytes());
63 if (data == null) { 65 if (data == null) {
64 return null; 66 return null;
65 } else { 67 } else {
@@ -71,18 +73,24 @@ public class TbLwM2mRedisSecurityStore implements EditableSecurityStore { @@ -71,18 +73,24 @@ public class TbLwM2mRedisSecurityStore implements EditableSecurityStore {
71 73
72 @Override 74 @Override
73 public Collection<SecurityInfo> getAll() { 75 public Collection<SecurityInfo> getAll() {
74 - try (Jedis j = (Jedis) connectionFactory.getConnection().getNativeConnection()) {  
75 - ScanParams params = new ScanParams().match(SEC_EP + "*").count(100); 76 + try (var connection = connectionFactory.getConnection()) {
76 Collection<SecurityInfo> list = new LinkedList<>(); 77 Collection<SecurityInfo> list = new LinkedList<>();
77 - String cursor = "0";  
78 - do {  
79 - ScanResult<byte[]> res = j.scan(cursor.getBytes(), params);  
80 - for (byte[] key : res.getResult()) {  
81 - byte[] element = j.get(key); 78 + ScanOptions scanOptions = ScanOptions.scanOptions().count(100).match(SEC_EP + "*").build();
  79 + List<Cursor<byte[]>> scans = new ArrayList<>();
  80 + if (connection instanceof RedisClusterConnection) {
  81 + ((RedisClusterConnection) connection).clusterGetNodes().forEach(node -> {
  82 + scans.add(((RedisClusterConnection) connection).scan(node, scanOptions));
  83 + });
  84 + } else {
  85 + scans.add(connection.scan(scanOptions));
  86 + }
  87 +
  88 + scans.forEach(scan -> {
  89 + scan.forEachRemaining(key -> {
  90 + byte[] element = connection.get(key);
82 list.add(deserialize(element)); 91 list.add(deserialize(element));
83 - }  
84 - cursor = res.getCursor();  
85 - } while (!"0".equals(cursor)); 92 + });
  93 + });
86 return list; 94 return list;
87 } 95 }
88 } 96 }
@@ -90,21 +98,21 @@ public class TbLwM2mRedisSecurityStore implements EditableSecurityStore { @@ -90,21 +98,21 @@ public class TbLwM2mRedisSecurityStore implements EditableSecurityStore {
90 @Override 98 @Override
91 public SecurityInfo add(SecurityInfo info) throws NonUniqueSecurityInfoException { 99 public SecurityInfo add(SecurityInfo info) throws NonUniqueSecurityInfoException {
92 byte[] data = serialize(info); 100 byte[] data = serialize(info);
93 - try (Jedis j = (Jedis) connectionFactory.getConnection().getNativeConnection()) { 101 + try (var connection = connectionFactory.getConnection()) {
94 if (info.getIdentity() != null) { 102 if (info.getIdentity() != null) {
95 // populate the secondary index (security info by PSK id) 103 // populate the secondary index (security info by PSK id)
96 - String oldEndpoint = j.hget(PSKID_SEC, info.getIdentity());  
97 - if (oldEndpoint != null && !oldEndpoint.equals(info.getEndpoint())) { 104 + String oldEndpoint = new String(connection.hGet(PSKID_SEC.getBytes(), info.getIdentity().getBytes()));
  105 + if (!oldEndpoint.equals(info.getEndpoint())) {
98 throw new NonUniqueSecurityInfoException("PSK Identity " + info.getIdentity() + " is already used"); 106 throw new NonUniqueSecurityInfoException("PSK Identity " + info.getIdentity() + " is already used");
99 } 107 }
100 - j.hset(PSKID_SEC.getBytes(), info.getIdentity().getBytes(), info.getEndpoint().getBytes()); 108 + connection.hSet(PSKID_SEC.getBytes(), info.getIdentity().getBytes(), info.getEndpoint().getBytes());
101 } 109 }
102 110
103 - byte[] previousData = j.getSet((SEC_EP + info.getEndpoint()).getBytes(), data); 111 + byte[] previousData = connection.getSet((SEC_EP + info.getEndpoint()).getBytes(), data);
104 SecurityInfo previous = previousData == null ? null : deserialize(previousData); 112 SecurityInfo previous = previousData == null ? null : deserialize(previousData);
105 String previousIdentity = previous == null ? null : previous.getIdentity(); 113 String previousIdentity = previous == null ? null : previous.getIdentity();
106 if (previousIdentity != null && !previousIdentity.equals(info.getIdentity())) { 114 if (previousIdentity != null && !previousIdentity.equals(info.getIdentity())) {
107 - j.hdel(PSKID_SEC, previousIdentity); 115 + connection.hDel(PSKID_SEC.getBytes(), previousIdentity.getBytes());
108 } 116 }
109 117
110 return previous; 118 return previous;
@@ -113,15 +121,15 @@ public class TbLwM2mRedisSecurityStore implements EditableSecurityStore { @@ -113,15 +121,15 @@ public class TbLwM2mRedisSecurityStore implements EditableSecurityStore {
113 121
114 @Override 122 @Override
115 public SecurityInfo remove(String endpoint, boolean infosAreCompromised) { 123 public SecurityInfo remove(String endpoint, boolean infosAreCompromised) {
116 - try (Jedis j = (Jedis) connectionFactory.getConnection().getNativeConnection()) {  
117 - byte[] data = j.get((SEC_EP + endpoint).getBytes()); 124 + try (var connection = connectionFactory.getConnection()) {
  125 + byte[] data = connection.get((SEC_EP + endpoint).getBytes());
118 126
119 if (data != null) { 127 if (data != null) {
120 SecurityInfo info = deserialize(data); 128 SecurityInfo info = deserialize(data);
121 if (info.getIdentity() != null) { 129 if (info.getIdentity() != null) {
122 - j.hdel(PSKID_SEC.getBytes(), info.getIdentity().getBytes()); 130 + connection.hDel(PSKID_SEC.getBytes(), info.getIdentity().getBytes());
123 } 131 }
124 - j.del((SEC_EP + endpoint).getBytes()); 132 + connection.del((SEC_EP + endpoint).getBytes());
125 if (listener != null) { 133 if (listener != null) {
126 listener.securityInfoRemoved(infosAreCompromised, info); 134 listener.securityInfoRemoved(infosAreCompromised, info);
127 } 135 }
@@ -26,6 +26,7 @@ import org.springframework.context.annotation.Lazy; @@ -26,6 +26,7 @@ import org.springframework.context.annotation.Lazy;
26 import org.springframework.stereotype.Component; 26 import org.springframework.stereotype.Component;
27 import org.thingsboard.server.cache.TBRedisCacheConfiguration; 27 import org.thingsboard.server.cache.TBRedisCacheConfiguration;
28 import org.thingsboard.server.queue.util.TbLwM2mTransportComponent; 28 import org.thingsboard.server.queue.util.TbLwM2mTransportComponent;
  29 +import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig;
29 import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClientContext; 30 import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClientContext;
30 31
31 import java.util.Optional; 32 import java.util.Optional;
@@ -38,6 +39,9 @@ public class TbLwM2mStoreFactory { @@ -38,6 +39,9 @@ public class TbLwM2mStoreFactory {
38 private Optional<TBRedisCacheConfiguration> redisConfiguration; 39 private Optional<TBRedisCacheConfiguration> redisConfiguration;
39 40
40 @Autowired 41 @Autowired
  42 + private LwM2MTransportServerConfig config;
  43 +
  44 + @Autowired
41 @Lazy 45 @Lazy
42 private LwM2mClientContext clientContext; 46 private LwM2mClientContext clientContext;
43 47
@@ -47,7 +51,7 @@ public class TbLwM2mStoreFactory { @@ -47,7 +51,7 @@ public class TbLwM2mStoreFactory {
47 @Bean 51 @Bean
48 private CaliforniumRegistrationStore registrationStore() { 52 private CaliforniumRegistrationStore registrationStore() {
49 return redisConfiguration.isPresent() && useRedis ? 53 return redisConfiguration.isPresent() && useRedis ?
50 - new TbLwM2mRedisRegistrationStore(redisConfiguration.get().redisConnectionFactory()) : new InMemoryRegistrationStore(); 54 + new TbLwM2mRedisRegistrationStore(redisConfiguration.get().redisConnectionFactory()) : new InMemoryRegistrationStore(config.getCleanPeriodInSec());
51 } 55 }
52 56
53 @Bean 57 @Bean
@@ -1431,6 +1431,11 @@ @@ -1431,6 +1431,11 @@
1431 <version>${spring-data-redis.version}</version> 1431 <version>${spring-data-redis.version}</version>
1432 </dependency> 1432 </dependency>
1433 <dependency> 1433 <dependency>
  1434 + <groupId>org.springframework.integration</groupId>
  1435 + <artifactId>spring-integration-redis</artifactId>
  1436 + <version>${spring.version}</version>
  1437 + </dependency>
  1438 + <dependency>
1434 <groupId>redis.clients</groupId> 1439 <groupId>redis.clients</groupId>
1435 <artifactId>jedis</artifactId> 1440 <artifactId>jedis</artifactId>
1436 <version>${jedis.version}</version> 1441 <version>${jedis.version}</version>
@@ -98,7 +98,6 @@ transport: @@ -98,7 +98,6 @@ transport:
98 # Enable/disable http/mqtt/coap transport protocols (has higher priority than certain protocol's 'enabled' property) 98 # Enable/disable http/mqtt/coap transport protocols (has higher priority than certain protocol's 'enabled' property)
99 api_enabled: "${TB_TRANSPORT_API_ENABLED:true}" 99 api_enabled: "${TB_TRANSPORT_API_ENABLED:true}"
100 # Local LwM2M transport parameters 100 # Local LwM2M transport parameters
101 - # Local LwM2M transport parameters  
102 lwm2m: 101 lwm2m:
103 # Enable/disable lvm2m transport protocol. 102 # Enable/disable lvm2m transport protocol.
104 enabled: "${LWM2M_ENABLED:true}" 103 enabled: "${LWM2M_ENABLED:true}"
@@ -144,6 +143,8 @@ transport: @@ -144,6 +143,8 @@ transport:
144 recommended_supported_groups: "${LWM2M_RECOMMENDED_SUPPORTED_GROUPS:true}" 143 recommended_supported_groups: "${LWM2M_RECOMMENDED_SUPPORTED_GROUPS:true}"
145 response_pool_size: "${LWM2M_RESPONSE_POOL_SIZE:100}" 144 response_pool_size: "${LWM2M_RESPONSE_POOL_SIZE:100}"
146 registered_pool_size: "${LWM2M_REGISTERED_POOL_SIZE:10}" 145 registered_pool_size: "${LWM2M_REGISTERED_POOL_SIZE:10}"
  146 + registration_store_pool_size: "${LWM2M_REGISTRATION_STORE_POOL_SIZE:100}"
  147 + clean_period_in_sec: "${LWM2M_CLEAN_PERIOD_IN_SEC:2}"
147 update_registered_pool_size: "${LWM2M_UPDATE_REGISTERED_POOL_SIZE:10}" 148 update_registered_pool_size: "${LWM2M_UPDATE_REGISTERED_POOL_SIZE:10}"
148 un_registered_pool_size: "${LWM2M_UN_REGISTERED_POOL_SIZE:10}" 149 un_registered_pool_size: "${LWM2M_UN_REGISTERED_POOL_SIZE:10}"
149 log_max_length: "${LWM2M_LOG_MAX_LENGTH:100}" 150 log_max_length: "${LWM2M_LOG_MAX_LENGTH:100}"