Showing
6 changed files
with
7 additions
and
7 deletions
@@ -64,7 +64,7 @@ public abstract class DataValidator<D extends BaseData<?>> { | @@ -64,7 +64,7 @@ public abstract class DataValidator<D extends BaseData<?>> { | ||
64 | return actualData.getId() != null && existentData.getId().equals(actualData.getId()); | 64 | return actualData.getId() != null && existentData.getId().equals(actualData.getId()); |
65 | } | 65 | } |
66 | 66 | ||
67 | - public static void validateEmail(String email) { | 67 | + protected static void validateEmail(String email) { |
68 | if (!doValidateEmail(email)) { | 68 | if (!doValidateEmail(email)) { |
69 | throw new DataValidationException("Invalid email address format '" + email + "'!"); | 69 | throw new DataValidationException("Invalid email address format '" + email + "'!"); |
70 | } | 70 | } |
@@ -42,8 +42,7 @@ public class TbSqlBlockingQueueWrapper<E> { | @@ -42,8 +42,7 @@ public class TbSqlBlockingQueueWrapper<E> { | ||
42 | } | 42 | } |
43 | 43 | ||
44 | public ListenableFuture<Void> add(E element) { | 44 | public ListenableFuture<Void> add(E element) { |
45 | - int hash = hashCodeFunction.apply(element); | ||
46 | - int queueIndex = (hash & 0x7FFFFFFF) % maxThreads; | 45 | + int queueIndex = element != null ? (hashCodeFunction.apply(element) & 0x7FFFFFFF) % maxThreads : 0; |
47 | return queues.get(queueIndex).add(element); | 46 | return queues.get(queueIndex).add(element); |
48 | } | 47 | } |
49 | 48 |
@@ -83,7 +83,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl | @@ -83,7 +83,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl | ||
83 | .statsPrintIntervalMs(statsPrintIntervalMs) | 83 | .statsPrintIntervalMs(statsPrintIntervalMs) |
84 | .build(); | 84 | .build(); |
85 | 85 | ||
86 | - Function<AttributeKvEntity, Integer> hashcodeFunction = entity -> entity != null ? entity.getId().getEntityId().hashCode() : 0; | 86 | + Function<AttributeKvEntity, Integer> hashcodeFunction = entity -> entity.getId().getEntityId().hashCode(); |
87 | queue = new TbSqlBlockingQueueWrapper<>(params, hashcodeFunction, batchThreads); | 87 | queue = new TbSqlBlockingQueueWrapper<>(params, hashcodeFunction, batchThreads); |
88 | queue.init(logExecutor, v -> attributeKvInsertRepository.saveOrUpdate(v)); | 88 | queue.init(logExecutor, v -> attributeKvInsertRepository.saveOrUpdate(v)); |
89 | } | 89 | } |
@@ -68,7 +68,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq | @@ -68,7 +68,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq | ||
68 | .statsPrintIntervalMs(tsStatsPrintIntervalMs) | 68 | .statsPrintIntervalMs(tsStatsPrintIntervalMs) |
69 | .build(); | 69 | .build(); |
70 | 70 | ||
71 | - Function<TsKvEntity, Integer> hashcodeFunction = entity -> entity != null ? entity.getEntityId().hashCode() : 0; | 71 | + Function<TsKvEntity, Integer> hashcodeFunction = entity -> entity.getEntityId().hashCode(); |
72 | tsQueue = new TbSqlBlockingQueueWrapper<>(tsParams, hashcodeFunction, tsBatchThreads); | 72 | tsQueue = new TbSqlBlockingQueueWrapper<>(tsParams, hashcodeFunction, tsBatchThreads); |
73 | tsQueue.init(logExecutor, v -> insertRepository.saveOrUpdate(v)); | 73 | tsQueue.init(logExecutor, v -> insertRepository.saveOrUpdate(v)); |
74 | } | 74 | } |
@@ -125,7 +125,7 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx | @@ -125,7 +125,7 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx | ||
125 | .statsPrintIntervalMs(tsLatestStatsPrintIntervalMs) | 125 | .statsPrintIntervalMs(tsLatestStatsPrintIntervalMs) |
126 | .build(); | 126 | .build(); |
127 | 127 | ||
128 | - java.util.function.Function<TsKvLatestEntity, Integer> hashcodeFunction = entity -> entity != null ? entity.getEntityId().hashCode() : 0; | 128 | + java.util.function.Function<TsKvLatestEntity, Integer> hashcodeFunction = entity -> entity.getEntityId().hashCode(); |
129 | tsLatestQueue = new TbSqlBlockingQueueWrapper<>(tsLatestParams, hashcodeFunction, tsLatestBatchThreads); | 129 | tsLatestQueue = new TbSqlBlockingQueueWrapper<>(tsLatestParams, hashcodeFunction, tsLatestBatchThreads); |
130 | 130 | ||
131 | tsLatestQueue.init(logExecutor, v -> { | 131 | tsLatestQueue.init(logExecutor, v -> { |
@@ -76,7 +76,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements | @@ -76,7 +76,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements | ||
76 | .statsPrintIntervalMs(tsStatsPrintIntervalMs) | 76 | .statsPrintIntervalMs(tsStatsPrintIntervalMs) |
77 | .build(); | 77 | .build(); |
78 | 78 | ||
79 | - Function<TimescaleTsKvEntity, Integer> hashcodeFunction = entity -> entity != null ? entity.getEntityId().hashCode() : 0; | 79 | + Function<TimescaleTsKvEntity, Integer> hashcodeFunction = entity -> entity.getEntityId().hashCode(); |
80 | tsQueue = new TbSqlBlockingQueueWrapper<>(tsParams, hashcodeFunction, timescaleBatchThreads); | 80 | tsQueue = new TbSqlBlockingQueueWrapper<>(tsParams, hashcodeFunction, timescaleBatchThreads); |
81 | 81 | ||
82 | tsQueue.init(logExecutor, v -> insertRepository.saveOrUpdate(v)); | 82 | tsQueue.init(logExecutor, v -> insertRepository.saveOrUpdate(v)); |
@@ -281,4 +281,5 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements | @@ -281,4 +281,5 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements | ||
281 | startTs, | 281 | startTs, |
282 | endTs); | 282 | endTs); |
283 | } | 283 | } |
284 | + | ||
284 | } | 285 | } |