Commit b8291be084bc2b843f61842194c82ff90d157239

Authored by Valerii Sosliuk
Committed by Andrew Shvayka
1 parent 3228312c

Update Latest TS by timestamp

There is a bug when the historic data arrives and overrides the value
with morerecent timestamp in ts_kv_latest table. This PR adds
possibility to update the ts_kv_latest table only if the value that
arrives has the newer timestamp than the one that is already in
ts_kv_latest
@@ -243,6 +243,7 @@ sql: @@ -243,6 +243,7 @@ sql:
243 batch_max_delay: "${SQL_TS_LATEST_BATCH_MAX_DELAY_MS:100}" 243 batch_max_delay: "${SQL_TS_LATEST_BATCH_MAX_DELAY_MS:100}"
244 stats_print_interval_ms: "${SQL_TS_LATEST_BATCH_STATS_PRINT_MS:10000}" 244 stats_print_interval_ms: "${SQL_TS_LATEST_BATCH_STATS_PRINT_MS:10000}"
245 batch_threads: "${SQL_TS_LATEST_BATCH_THREADS:4}" 245 batch_threads: "${SQL_TS_LATEST_BATCH_THREADS:4}"
  246 + update_by_latest_ts: "${SQL_TS_UPDATE_BY_LATEST_TIMESTAMP:true}"
246 # Specify whether to sort entities before batch update. Should be enabled for cluster mode to avoid deadlocks 247 # Specify whether to sort entities before batch update. Should be enabled for cluster mode to avoid deadlocks
247 batch_sort: "${SQL_BATCH_SORT:false}" 248 batch_sort: "${SQL_BATCH_SORT:false}"
248 # Specify whether to remove null characters from strValue of attributes and timeseries before insert 249 # Specify whether to remove null characters from strValue of attributes and timeseries before insert
@@ -15,6 +15,7 @@ @@ -15,6 +15,7 @@
15 */ 15 */
16 package org.thingsboard.server.dao.sqlts.insert.latest.psql; 16 package org.thingsboard.server.dao.sqlts.insert.latest.psql;
17 17
  18 +import org.springframework.beans.factory.annotation.Value;
18 import org.springframework.jdbc.core.BatchPreparedStatementSetter; 19 import org.springframework.jdbc.core.BatchPreparedStatementSetter;
19 import org.springframework.stereotype.Repository; 20 import org.springframework.stereotype.Repository;
20 import org.springframework.transaction.TransactionStatus; 21 import org.springframework.transaction.TransactionStatus;
@@ -37,20 +38,34 @@ import java.util.List; @@ -37,20 +38,34 @@ import java.util.List;
37 @Transactional 38 @Transactional
38 public class PsqlLatestInsertTsRepository extends AbstractInsertRepository implements InsertLatestTsRepository { 39 public class PsqlLatestInsertTsRepository extends AbstractInsertRepository implements InsertLatestTsRepository {
39 40
  41 + @Value("${sql.ts_latest.update_by_latest_ts:true}")
  42 + private Boolean updateByLatestTs;
  43 +
40 private static final String BATCH_UPDATE = 44 private static final String BATCH_UPDATE =
41 - "UPDATE ts_kv_latest SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json) WHERE entity_id = ? and key = ?"; 45 + "UPDATE ts_kv_latest SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json) WHERE entity_id = ? AND key = ?;";
42 46
43 47
44 private static final String INSERT_OR_UPDATE = 48 private static final String INSERT_OR_UPDATE =
45 "INSERT INTO ts_kv_latest (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) VALUES(?, ?, ?, ?, ?, ?, ?, cast(? AS json)) " + 49 "INSERT INTO ts_kv_latest (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) VALUES(?, ?, ?, ?, ?, ?, ?, cast(? AS json)) " +
46 "ON CONFLICT (entity_id, key) DO UPDATE SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json);"; 50 "ON CONFLICT (entity_id, key) DO UPDATE SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json);";
47 51
  52 + private static final String BATCH_UPDATE_BY_LATEST_TS =
  53 + "UPDATE ts_kv_latest SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json) WHERE entity_id = ? AND key = ? AND ts <= ?;";
  54 +
  55 +
  56 + private static final String INSERT_OR_UPDATE_BY_LATEST_TS =
  57 + "INSERT INTO ts_kv_latest (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) VALUES(?, ?, ?, ?, ?, ?, ?, cast(? AS json)) " +
  58 + "ON CONFLICT (entity_id, key) DO UPDATE SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json) WHERE ts_kv_latest.ts <= ?;";
  59 +
48 @Override 60 @Override
49 public void saveOrUpdate(List<TsKvLatestEntity> entities) { 61 public void saveOrUpdate(List<TsKvLatestEntity> entities) {
50 transactionTemplate.execute(new TransactionCallbackWithoutResult() { 62 transactionTemplate.execute(new TransactionCallbackWithoutResult() {
51 @Override 63 @Override
52 protected void doInTransactionWithoutResult(TransactionStatus status) { 64 protected void doInTransactionWithoutResult(TransactionStatus status) {
53 - int[] result = jdbcTemplate.batchUpdate(BATCH_UPDATE, new BatchPreparedStatementSetter() { 65 + String batchUpdateQuery = updateByLatestTs ? BATCH_UPDATE_BY_LATEST_TS : BATCH_UPDATE;
  66 + String insertOrUpdateQuery = updateByLatestTs ? INSERT_OR_UPDATE_BY_LATEST_TS : INSERT_OR_UPDATE;
  67 +
  68 + int[] result = jdbcTemplate.batchUpdate(batchUpdateQuery, new BatchPreparedStatementSetter() {
54 @Override 69 @Override
55 public void setValues(PreparedStatement ps, int i) throws SQLException { 70 public void setValues(PreparedStatement ps, int i) throws SQLException {
56 TsKvLatestEntity tsKvLatestEntity = entities.get(i); 71 TsKvLatestEntity tsKvLatestEntity = entities.get(i);
@@ -80,6 +95,9 @@ public class PsqlLatestInsertTsRepository extends AbstractInsertRepository imple @@ -80,6 +95,9 @@ public class PsqlLatestInsertTsRepository extends AbstractInsertRepository imple
80 95
81 ps.setObject(7, tsKvLatestEntity.getEntityId()); 96 ps.setObject(7, tsKvLatestEntity.getEntityId());
82 ps.setInt(8, tsKvLatestEntity.getKey()); 97 ps.setInt(8, tsKvLatestEntity.getKey());
  98 + if (updateByLatestTs) {
  99 + ps.setLong(9, tsKvLatestEntity.getTs());
  100 + }
83 } 101 }
84 102
85 @Override 103 @Override
@@ -102,7 +120,7 @@ public class PsqlLatestInsertTsRepository extends AbstractInsertRepository imple @@ -102,7 +120,7 @@ public class PsqlLatestInsertTsRepository extends AbstractInsertRepository imple
102 } 120 }
103 } 121 }
104 122
105 - jdbcTemplate.batchUpdate(INSERT_OR_UPDATE, new BatchPreparedStatementSetter() { 123 + jdbcTemplate.batchUpdate(insertOrUpdateQuery, new BatchPreparedStatementSetter() {
106 @Override 124 @Override
107 public void setValues(PreparedStatement ps, int i) throws SQLException { 125 public void setValues(PreparedStatement ps, int i) throws SQLException {
108 TsKvLatestEntity tsKvLatestEntity = insertEntities.get(i); 126 TsKvLatestEntity tsKvLatestEntity = insertEntities.get(i);
@@ -111,6 +129,9 @@ public class PsqlLatestInsertTsRepository extends AbstractInsertRepository imple @@ -111,6 +129,9 @@ public class PsqlLatestInsertTsRepository extends AbstractInsertRepository imple
111 129
112 ps.setLong(3, tsKvLatestEntity.getTs()); 130 ps.setLong(3, tsKvLatestEntity.getTs());
113 ps.setLong(9, tsKvLatestEntity.getTs()); 131 ps.setLong(9, tsKvLatestEntity.getTs());
  132 + if (updateByLatestTs) {
  133 + ps.setLong(15, tsKvLatestEntity.getTs());
  134 + }
114 135
115 if (tsKvLatestEntity.getBooleanValue() != null) { 136 if (tsKvLatestEntity.getBooleanValue() != null) {
116 ps.setBoolean(4, tsKvLatestEntity.getBooleanValue()); 137 ps.setBoolean(4, tsKvLatestEntity.getBooleanValue());