Commit 7fc712a3f7aaaf243eda351c7d095032ba70b396
1 parent
42134788
NPE fix for data aggregation function in case of TEXT values
Showing
2 changed files
with
106 additions
and
96 deletions
... | ... | @@ -17,6 +17,7 @@ package org.thingsboard.server.dao.timeseries; |
17 | 17 | |
18 | 18 | import com.datastax.driver.core.ResultSet; |
19 | 19 | import com.datastax.driver.core.Row; |
20 | +import lombok.extern.slf4j.Slf4j; | |
20 | 21 | import org.thingsboard.server.common.data.kv.*; |
21 | 22 | |
22 | 23 | import javax.annotation.Nullable; |
... | ... | @@ -26,6 +27,7 @@ import java.util.Optional; |
26 | 27 | /** |
27 | 28 | * Created by ashvayka on 20.02.17. |
28 | 29 | */ |
30 | +@Slf4j | |
29 | 31 | public class AggregatePartitionsFunction implements com.google.common.base.Function<List<ResultSet>, Optional<TsKvEntry>> { |
30 | 32 | |
31 | 33 | private static final int LONG_CNT_POS = 0; |
... | ... | @@ -50,111 +52,118 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct |
50 | 52 | @Nullable |
51 | 53 | @Override |
52 | 54 | public Optional<TsKvEntry> apply(@Nullable List<ResultSet> rsList) { |
53 | - if (rsList == null || rsList.size() == 0) { | |
54 | - return Optional.empty(); | |
55 | - } | |
56 | - long count = 0; | |
57 | - DataType dataType = null; | |
58 | - | |
59 | - Boolean bValue = null; | |
60 | - String sValue = null; | |
61 | - Double dValue = null; | |
62 | - Long lValue = null; | |
63 | - | |
64 | - for (ResultSet rs : rsList) { | |
65 | - for (Row row : rs.all()) { | |
66 | - long curCount; | |
67 | - | |
68 | - Long curLValue = null; | |
69 | - Double curDValue = null; | |
70 | - Boolean curBValue = null; | |
71 | - String curSValue = null; | |
72 | - | |
73 | - long longCount = row.getLong(LONG_CNT_POS); | |
74 | - long doubleCount = row.getLong(DOUBLE_CNT_POS); | |
75 | - long boolCount = row.getLong(BOOL_CNT_POS); | |
76 | - long strCount = row.getLong(STR_CNT_POS); | |
77 | - | |
78 | - if (longCount > 0) { | |
79 | - dataType = DataType.LONG; | |
80 | - curCount = longCount; | |
81 | - curLValue = getLongValue(row); | |
82 | - } else if (doubleCount > 0) { | |
83 | - dataType = DataType.DOUBLE; | |
84 | - curCount = doubleCount; | |
85 | - curDValue = getDoubleValue(row); | |
86 | - } else if (boolCount > 0) { | |
87 | - dataType = DataType.BOOLEAN; | |
88 | - curCount = boolCount; | |
89 | - curBValue = getBooleanValue(row); | |
90 | - } else if (strCount > 0) { | |
91 | - dataType = DataType.STRING; | |
92 | - curCount = strCount; | |
93 | - curSValue = getStringValue(row); | |
94 | - } else { | |
95 | - continue; | |
96 | - } | |
97 | - | |
98 | - if (aggregation == Aggregation.COUNT) { | |
99 | - count += curCount; | |
100 | - } else if (aggregation == Aggregation.AVG || aggregation == Aggregation.SUM) { | |
101 | - count += curCount; | |
102 | - if (curDValue != null) { | |
103 | - dValue = dValue == null ? curDValue : dValue + curDValue; | |
104 | - } else if (curLValue != null) { | |
105 | - lValue = lValue == null ? curLValue : lValue + curLValue; | |
55 | + try { | |
56 | + log.trace("[{}][{}][{}] Going to aggregate data", key, ts, aggregation); | |
57 | + if (rsList == null || rsList.size() == 0) { | |
58 | + return Optional.empty(); | |
59 | + } | |
60 | + long count = 0; | |
61 | + DataType dataType = null; | |
62 | + | |
63 | + Boolean bValue = null; | |
64 | + String sValue = null; | |
65 | + Double dValue = null; | |
66 | + Long lValue = null; | |
67 | + | |
68 | + for (ResultSet rs : rsList) { | |
69 | + for (Row row : rs.all()) { | |
70 | + long curCount; | |
71 | + | |
72 | + Long curLValue = null; | |
73 | + Double curDValue = null; | |
74 | + Boolean curBValue = null; | |
75 | + String curSValue = null; | |
76 | + | |
77 | + long longCount = row.getLong(LONG_CNT_POS); | |
78 | + long doubleCount = row.getLong(DOUBLE_CNT_POS); | |
79 | + long boolCount = row.getLong(BOOL_CNT_POS); | |
80 | + long strCount = row.getLong(STR_CNT_POS); | |
81 | + | |
82 | + if (longCount > 0) { | |
83 | + dataType = DataType.LONG; | |
84 | + curCount = longCount; | |
85 | + curLValue = getLongValue(row); | |
86 | + } else if (doubleCount > 0) { | |
87 | + dataType = DataType.DOUBLE; | |
88 | + curCount = doubleCount; | |
89 | + curDValue = getDoubleValue(row); | |
90 | + } else if (boolCount > 0) { | |
91 | + dataType = DataType.BOOLEAN; | |
92 | + curCount = boolCount; | |
93 | + curBValue = getBooleanValue(row); | |
94 | + } else if (strCount > 0) { | |
95 | + dataType = DataType.STRING; | |
96 | + curCount = strCount; | |
97 | + curSValue = getStringValue(row); | |
98 | + } else { | |
99 | + continue; | |
106 | 100 | } |
107 | - } else if (aggregation == Aggregation.MIN) { | |
108 | - if (curDValue != null) { | |
109 | - dValue = dValue == null ? curDValue : Math.min(dValue, curDValue); | |
110 | - } else if (curLValue != null) { | |
111 | - lValue = lValue == null ? curLValue : Math.min(lValue, curLValue); | |
112 | - } else if (curBValue != null) { | |
113 | - bValue = bValue == null ? curBValue : bValue && curBValue; | |
114 | - } else if (curSValue != null) { | |
115 | - if (sValue == null || curSValue.compareTo(sValue) < 0) { | |
116 | - sValue = curSValue; | |
101 | + | |
102 | + if (aggregation == Aggregation.COUNT) { | |
103 | + count += curCount; | |
104 | + } else if (aggregation == Aggregation.AVG || aggregation == Aggregation.SUM) { | |
105 | + count += curCount; | |
106 | + if (curDValue != null) { | |
107 | + dValue = dValue == null ? curDValue : dValue + curDValue; | |
108 | + } else if (curLValue != null) { | |
109 | + lValue = lValue == null ? curLValue : lValue + curLValue; | |
117 | 110 | } |
118 | - } | |
119 | - } else if (aggregation == Aggregation.MAX) { | |
120 | - if (curDValue != null) { | |
121 | - dValue = dValue == null ? curDValue : Math.max(dValue, curDValue); | |
122 | - } else if (curLValue != null) { | |
123 | - lValue = lValue == null ? curLValue : Math.max(lValue, curLValue); | |
124 | - } else if (curBValue != null) { | |
125 | - bValue = bValue == null ? curBValue : bValue || curBValue; | |
126 | - } else if (curSValue != null) { | |
127 | - if (sValue == null || curSValue.compareTo(sValue) > 0) { | |
128 | - sValue = curSValue; | |
111 | + } else if (aggregation == Aggregation.MIN) { | |
112 | + if (curDValue != null) { | |
113 | + dValue = dValue == null ? curDValue : Math.min(dValue, curDValue); | |
114 | + } else if (curLValue != null) { | |
115 | + lValue = lValue == null ? curLValue : Math.min(lValue, curLValue); | |
116 | + } else if (curBValue != null) { | |
117 | + bValue = bValue == null ? curBValue : bValue && curBValue; | |
118 | + } else if (curSValue != null) { | |
119 | + if (sValue == null || curSValue.compareTo(sValue) < 0) { | |
120 | + sValue = curSValue; | |
121 | + } | |
122 | + } | |
123 | + } else if (aggregation == Aggregation.MAX) { | |
124 | + if (curDValue != null) { | |
125 | + dValue = dValue == null ? curDValue : Math.max(dValue, curDValue); | |
126 | + } else if (curLValue != null) { | |
127 | + lValue = lValue == null ? curLValue : Math.max(lValue, curLValue); | |
128 | + } else if (curBValue != null) { | |
129 | + bValue = bValue == null ? curBValue : bValue || curBValue; | |
130 | + } else if (curSValue != null) { | |
131 | + if (sValue == null || curSValue.compareTo(sValue) > 0) { | |
132 | + sValue = curSValue; | |
133 | + } | |
129 | 134 | } |
130 | 135 | } |
131 | 136 | } |
132 | 137 | } |
133 | - } | |
134 | - if (dataType == null) { | |
135 | - return Optional.empty(); | |
136 | - } else if (aggregation == Aggregation.COUNT) { | |
137 | - return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, (long) count))); | |
138 | - } else if (aggregation == Aggregation.AVG || aggregation == Aggregation.SUM) { | |
139 | - if (count == 0 || (dataType == DataType.DOUBLE && dValue == null) || (dataType == DataType.LONG && lValue == null)) { | |
138 | + if (dataType == null) { | |
140 | 139 | return Optional.empty(); |
141 | - } else if (dataType == DataType.DOUBLE) { | |
142 | - return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.SUM ? dValue : (dValue / count)))); | |
143 | - } else if (dataType == DataType.LONG) { | |
144 | - return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, aggregation == Aggregation.SUM ? lValue : (lValue / count)))); | |
145 | - } | |
146 | - } else if (aggregation == Aggregation.MIN || aggregation == Aggregation.MAX) { | |
147 | - if (dataType == DataType.DOUBLE) { | |
148 | - return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, dValue))); | |
149 | - } else if (dataType == DataType.LONG) { | |
150 | - return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, lValue))); | |
151 | - } else if (dataType == DataType.STRING) { | |
152 | - return Optional.of(new BasicTsKvEntry(ts, new StringDataEntry(key, sValue))); | |
153 | - } else { | |
154 | - return Optional.of(new BasicTsKvEntry(ts, new BooleanDataEntry(key, bValue))); | |
140 | + } else if (aggregation == Aggregation.COUNT) { | |
141 | + return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, (long) count))); | |
142 | + } else if (aggregation == Aggregation.AVG || aggregation == Aggregation.SUM) { | |
143 | + if (count == 0 || (dataType == DataType.DOUBLE && dValue == null) || (dataType == DataType.LONG && lValue == null)) { | |
144 | + return Optional.empty(); | |
145 | + } else if (dataType == DataType.DOUBLE) { | |
146 | + return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.SUM ? dValue : (dValue / count)))); | |
147 | + } else if (dataType == DataType.LONG) { | |
148 | + return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, aggregation == Aggregation.SUM ? lValue : (lValue / count)))); | |
149 | + } | |
150 | + } else if (aggregation == Aggregation.MIN || aggregation == Aggregation.MAX) { | |
151 | + if (dataType == DataType.DOUBLE) { | |
152 | + return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, dValue))); | |
153 | + } else if (dataType == DataType.LONG) { | |
154 | + return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, lValue))); | |
155 | + } else if (dataType == DataType.STRING) { | |
156 | + return Optional.of(new BasicTsKvEntry(ts, new StringDataEntry(key, sValue))); | |
157 | + } else { | |
158 | + return Optional.of(new BasicTsKvEntry(ts, new BooleanDataEntry(key, bValue))); | |
159 | + } | |
155 | 160 | } |
161 | + log.trace("[{}][{}][{}] Aggregated data is empty.", key, ts, aggregation); | |
162 | + return Optional.empty(); | |
163 | + }catch (Exception e){ | |
164 | + log.error("[{}][{}][{}] Failed to aggregate data", key, ts, aggregation, e); | |
165 | + return Optional.empty(); | |
156 | 166 | } |
157 | - return null; | |
158 | 167 | } |
159 | 168 | |
160 | 169 | private Boolean getBooleanValue(Row row) { | ... | ... |
... | ... | @@ -215,6 +215,7 @@ public class BaseTimeseriesDao extends AbstractAsyncDao implements TimeseriesDao |
215 | 215 | PreparedStatement proto = getFetchStmt(aggregation); |
216 | 216 | List<ResultSetFuture> futures = new ArrayList<>(partitions.size()); |
217 | 217 | for (Long partition : partitions) { |
218 | + log.trace("Fetching data for partition [{}] for entityType {} and entityId {}", partition, entityType, entityId); | |
218 | 219 | BoundStatement stmt = proto.bind(); |
219 | 220 | stmt.setString(0, entityType); |
220 | 221 | stmt.setUUID(1, entityId); | ... | ... |