Commit 1bfe950f583f770113d01c87f2170d4113729ec7

Authored by Volodymyr Babak
1 parent 2af6e5d3

Fixes for downlink msg. Refactoring

application/src/main/data/upgrade/2.6.0/schema_update.cql renamed from application/src/main/data/upgrade/2.4.x/schema_update.cql
application/src/main/data/upgrade/2.6.0/schema_update.sql renamed from application/src/main/data/upgrade/2.4.x/schema_update.sql
... ... @@ -26,6 +26,7 @@ import com.google.protobuf.ByteString;
26 26 import io.grpc.stub.StreamObserver;
27 27 import lombok.Data;
28 28 import lombok.extern.slf4j.Slf4j;
  29 +import org.apache.commons.codec.binary.Base64;
29 30 import org.thingsboard.server.common.data.Customer;
30 31 import org.thingsboard.server.common.data.Dashboard;
31 32 import org.thingsboard.server.common.data.DataConstants;
... ... @@ -81,6 +82,7 @@ import org.thingsboard.server.gen.edge.UplinkResponseMsg;
81 82 import org.thingsboard.server.gen.edge.UserUpdateMsg;
82 83 import org.thingsboard.server.service.edge.EdgeContextComponent;
83 84
  85 +import java.io.Closeable;
84 86 import java.io.IOException;
85 87 import java.util.Collections;
86 88 import java.util.List;
... ... @@ -95,7 +97,7 @@ import static org.thingsboard.server.gen.edge.UpdateMsgType.ENTITY_CREATED_RPC_M
95 97
96 98 @Slf4j
97 99 @Data
98   -public final class EdgeGrpcSession implements Cloneable {
  100 +public final class EdgeGrpcSession implements Closeable {
99 101
100 102 private static final ReentrantLock entityCreationLock = new ReentrantLock();
101 103
... ... @@ -168,7 +170,7 @@ public final class EdgeGrpcSession implements Cloneable {
168 170 UUID ifOffset = null;
169 171 do {
170 172 pageData = ctx.getEdgeService().findQueueEvents(edge.getTenantId(), edge.getId(), pageLink);
171   - if (!pageData.getData().isEmpty()) {
  173 + if (isConnected() && !pageData.getData().isEmpty()) {
172 174 log.trace("[{}] [{}] event(s) are going to be processed.", this.sessionId, pageData.getData().size());
173 175 for (Event event : pageData.getData()) {
174 176 log.trace("[{}] Processing event [{}]", this.sessionId, event);
... ... @@ -196,7 +198,7 @@ public final class EdgeGrpcSession implements Cloneable {
196 198 ifOffset = event.getUuidId();
197 199 }
198 200 }
199   - if (pageData.hasNext()) {
  201 + if (isConnected() && pageData.hasNext()) {
200 202 pageLink = pageData.getNextPageLink();
201 203 try {
202 204 Thread.sleep(ctx.getEdgeEventStorageSettings().getSleepIntervalBetweenBatches());
... ... @@ -204,7 +206,7 @@ public final class EdgeGrpcSession implements Cloneable {
204 206 log.error("Error during sleep between batches", e);
205 207 }
206 208 }
207   - } while (pageData.hasNext());
  209 + } while (isConnected() && pageData.hasNext());
208 210
209 211 if (ifOffset != null) {
210 212 Long newStartTs = UUIDs.unixTimestamp(ifOffset);
... ... @@ -287,7 +289,7 @@ public final class EdgeGrpcSession implements Cloneable {
287 289
288 290 private void processCustomDownlinkMessage(EdgeQueueEntry entry) throws IOException {
289 291 log.trace("Executing processCustomDownlinkMessage, entry [{}]", entry);
290   - TbMsg tbMsg = objectMapper.readValue(entry.getData(), TbMsg.class);
  292 + TbMsg tbMsg = TbMsg.fromBytes(Base64.decodeBase64(entry.getData()), TbMsgCallback.EMPTY);
291 293 String entityName = null;
292 294 switch (entry.getEntityType()) {
293 295 case DEVICE:
... ... @@ -700,4 +702,14 @@ public final class EdgeGrpcSession implements Cloneable {
700 702 .setType(edge.getType().toString())
701 703 .build();
702 704 }
  705 +
  706 + @Override
  707 + public void close() {
  708 + connected = false;
  709 + try {
  710 + outputStream.onCompleted();
  711 + } catch (Exception e) {
  712 + log.debug("[{}] Failed to close output stream: {}", sessionId, e.getMessage());
  713 + }
  714 + }
703 715 }
... ...
... ... @@ -308,7 +308,7 @@ public class CassandraDatabaseUpgradeService extends AbstractCassandraDatabaseUp
308 308 break;
309 309 case "2.5.0":
310 310 log.info("Updating schema ...");
311   - schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.4.x", SCHEMA_UPDATE_CQL);
  311 + schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.6.0", SCHEMA_UPDATE_CQL);
312 312 loadCql(schemaUpdateFile);
313 313
314 314 try {
... ...
... ... @@ -211,26 +211,6 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
211 211 case "2.4.3":
212 212 try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) {
213 213 log.info("Updating schema ...");
214   - schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.4.x", SCHEMA_UPDATE_SQL);
215   - loadSql(schemaUpdateFile, conn);
216   - try {
217   - conn.createStatement().execute("ALTER TABLE asset ADD edge_id varchar(31)"); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script
218   - } catch (Exception e) {}
219   - try {
220   - conn.createStatement().execute("ALTER TABLE device ADD edge_id varchar(31)"); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script
221   - } catch (Exception e) {}
222   - try {
223   - conn.createStatement().execute("ALTER TABLE entity_view ADD edge_id varchar(31)"); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script
224   - } catch (Exception e) {}
225   - try {
226   - conn.createStatement().execute("ALTER TABLE dashboard ADD assigned_edges varchar(10000000)"); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script
227   - } catch (Exception e) {}
228   - try {
229   - conn.createStatement().execute("ALTER TABLE rule_chain ADD assigned_edges varchar(10000000)"); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script
230   - } catch (Exception e) {}
231   - try {
232   - conn.createStatement().execute("ALTER TABLE rule_chain ADD type varchar(255) DEFAULT 'SYSTEM'"); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script
233   - } catch (Exception e) {}
234 214 try {
235 215 conn.createStatement().execute("ALTER TABLE attribute_kv ADD COLUMN json_v json;");
236 216 } catch (Exception e) {
... ... @@ -253,6 +233,32 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
253 233 log.info("Schema updated.");
254 234 }
255 235 break;
  236 + case "2.5.0":
  237 + try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) {
  238 + log.info("Updating schema ...");
  239 + schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.6.0", SCHEMA_UPDATE_SQL);
  240 + loadSql(schemaUpdateFile, conn);
  241 + try {
  242 + conn.createStatement().execute("ALTER TABLE asset ADD edge_id varchar(31)"); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script
  243 + } catch (Exception e) {}
  244 + try {
  245 + conn.createStatement().execute("ALTER TABLE device ADD edge_id varchar(31)"); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script
  246 + } catch (Exception e) {}
  247 + try {
  248 + conn.createStatement().execute("ALTER TABLE entity_view ADD edge_id varchar(31)"); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script
  249 + } catch (Exception e) {}
  250 + try {
  251 + conn.createStatement().execute("ALTER TABLE dashboard ADD assigned_edges varchar(10000000)"); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script
  252 + } catch (Exception e) {}
  253 + try {
  254 + conn.createStatement().execute("ALTER TABLE rule_chain ADD assigned_edges varchar(10000000)"); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script
  255 + } catch (Exception e) {}
  256 + try {
  257 + conn.createStatement().execute("ALTER TABLE rule_chain ADD type varchar(255) DEFAULT 'SYSTEM'"); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script
  258 + } catch (Exception e) {}
  259 + log.info("Schema updated.");
  260 + }
  261 + break;
256 262 default:
257 263 throw new RuntimeException("Unable to upgrade SQL database, unsupported fromVersion: " + fromVersion);
258 264 }
... ...
... ... @@ -124,52 +124,26 @@ public class DashboardInfo extends SearchTextBased<DashboardId> implements HasNa
124 124 }
125 125
126 126 public boolean isAssignedToEdge(EdgeId edgeId) {
127   - return this.assignedEdges != null && this.assignedEdges.contains(new ShortEdgeInfo(edgeId, null, null));
  127 + return EdgeUtils.isAssignedToEdge(this.assignedEdges, edgeId);
128 128 }
129 129
130 130 public ShortEdgeInfo getAssignedEdgeInfo(EdgeId edgeId) {
131   - if (this.assignedEdges != null) {
132   - for (ShortEdgeInfo edgeInfo : this.assignedEdges) {
133   - if (edgeInfo.getEdgeId().equals(edgeId)) {
134   - return edgeInfo;
135   - }
136   - }
137   - }
138   - return null;
  131 + return EdgeUtils.getAssignedEdgeInfo(this.assignedEdges, edgeId);
139 132 }
140 133
141 134 public boolean addAssignedEdge(Edge edge) {
142   - ShortEdgeInfo edgeInfo = edge.toShortEdgeInfo();
143   - if (this.assignedEdges != null && this.assignedEdges.contains(edgeInfo)) {
144   - return false;
145   - } else {
146   - if (this.assignedEdges == null) {
147   - this.assignedEdges = new HashSet<>();
148   - }
149   - this.assignedEdges.add(edgeInfo);
150   - return true;
  135 + if (this.assignedEdges == null) {
  136 + this.assignedEdges = new HashSet<>();
151 137 }
  138 + return EdgeUtils.addAssignedEdge(this.assignedEdges, edge);
152 139 }
153 140
154 141 public boolean updateAssignedEdge(Edge edge) {
155   - ShortEdgeInfo edgeInfo = edge.toShortEdgeInfo();
156   - if (this.assignedEdges != null && this.assignedEdges.contains(edgeInfo)) {
157   - this.assignedEdges.remove(edgeInfo);
158   - this.assignedEdges.add(edgeInfo);
159   - return true;
160   - } else {
161   - return false;
162   - }
  142 + return EdgeUtils.updateAssignedEdge(this.assignedEdges, edge);
163 143 }
164 144
165 145 public boolean removeAssignedEdge(Edge edge) {
166   - ShortEdgeInfo edgeInfo = edge.toShortEdgeInfo();
167   - if (this.assignedEdges != null && this.assignedEdges.contains(edgeInfo)) {
168   - this.assignedEdges.remove(edgeInfo);
169   - return true;
170   - } else {
171   - return false;
172   - }
  146 + return EdgeUtils.removeAssignedEdge(this.assignedEdges, edge);
173 147 }
174 148
175 149 @Override
... ...
  1 +package org.thingsboard.server.common.data;
  2 +
  3 +import org.thingsboard.server.common.data.edge.Edge;
  4 +import org.thingsboard.server.common.data.id.EdgeId;
  5 +
  6 +import java.util.Set;
  7 +
  8 +public final class EdgeUtils {
  9 +
  10 + private EdgeUtils() {}
  11 +
  12 + public static boolean isAssignedToEdge(Set<ShortEdgeInfo> assignedEdges, EdgeId edgeId) {
  13 + return assignedEdges != null && assignedEdges.contains(new ShortEdgeInfo(edgeId, null, null));
  14 + }
  15 +
  16 + public static ShortEdgeInfo getAssignedEdgeInfo(Set<ShortEdgeInfo> assignedEdges, EdgeId edgeId) {
  17 + if (assignedEdges != null) {
  18 + for (ShortEdgeInfo edgeInfo : assignedEdges) {
  19 + if (edgeInfo.getEdgeId().equals(edgeId)) {
  20 + return edgeInfo;
  21 + }
  22 + }
  23 + }
  24 + return null;
  25 + }
  26 +
  27 + public static boolean addAssignedEdge(Set<ShortEdgeInfo> assignedEdges, Edge edge) {
  28 + ShortEdgeInfo edgeInfo = edge.toShortEdgeInfo();
  29 + if (assignedEdges != null && assignedEdges.contains(edgeInfo)) {
  30 + return false;
  31 + } else {
  32 + if (assignedEdges != null) {
  33 + assignedEdges.add(edgeInfo);
  34 + return true;
  35 + } else {
  36 + return false;
  37 + }
  38 + }
  39 + }
  40 +
  41 + public static boolean updateAssignedEdge(Set<ShortEdgeInfo> assignedEdges, Edge edge) {
  42 + ShortEdgeInfo edgeInfo = edge.toShortEdgeInfo();
  43 + if (assignedEdges != null && assignedEdges.contains(edgeInfo)) {
  44 + assignedEdges.remove(edgeInfo);
  45 + assignedEdges.add(edgeInfo);
  46 + return true;
  47 + } else {
  48 + return false;
  49 + }
  50 + }
  51 +
  52 + public static boolean removeAssignedEdge(Set<ShortEdgeInfo> assignedEdges, Edge edge) {
  53 + ShortEdgeInfo edgeInfo = edge.toShortEdgeInfo();
  54 + if (assignedEdges != null && assignedEdges.contains(edgeInfo)) {
  55 + assignedEdges.remove(edgeInfo);
  56 + return true;
  57 + } else {
  58 + return false;
  59 + }
  60 + }
  61 +}
... ...
... ... @@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.JsonNode;
20 20 import lombok.Data;
21 21 import lombok.EqualsAndHashCode;
22 22 import lombok.extern.slf4j.Slf4j;
  23 +import org.thingsboard.server.common.data.EdgeUtils;
23 24 import org.thingsboard.server.common.data.HasName;
24 25 import org.thingsboard.server.common.data.HasTenantId;
25 26 import org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo;
... ... @@ -89,52 +90,28 @@ public class RuleChain extends SearchTextBasedWithAdditionalInfo<RuleChainId> im
89 90 setJson(data, json -> this.configuration = json, bytes -> this.configurationBytes = bytes);
90 91 }
91 92
  93 +
  94 +
92 95 public boolean isAssignedToEdge(EdgeId edgeId) {
93   - return this.assignedEdges != null && this.assignedEdges.contains(new ShortEdgeInfo(edgeId, null, null));
  96 + return EdgeUtils.isAssignedToEdge(this.assignedEdges, edgeId);
94 97 }
95 98
96 99 public ShortEdgeInfo getAssignedEdgeInfo(EdgeId edgeId) {
97   - if (this.assignedEdges != null) {
98   - for (ShortEdgeInfo edgeInfo : this.assignedEdges) {
99   - if (edgeInfo.getEdgeId().equals(edgeId)) {
100   - return edgeInfo;
101   - }
102   - }
103   - }
104   - return null;
  100 + return EdgeUtils.getAssignedEdgeInfo(this.assignedEdges, edgeId);
105 101 }
106 102
107 103 public boolean addAssignedEdge(Edge edge) {
108   - ShortEdgeInfo edgeInfo = edge.toShortEdgeInfo();
109   - if (this.assignedEdges != null && this.assignedEdges.contains(edgeInfo)) {
110   - return false;
111   - } else {
112   - if (this.assignedEdges == null) {
113   - this.assignedEdges = new HashSet<>();
114   - }
115   - this.assignedEdges.add(edgeInfo);
116   - return true;
  104 + if (this.assignedEdges == null) {
  105 + this.assignedEdges = new HashSet<>();
117 106 }
  107 + return EdgeUtils.addAssignedEdge(this.assignedEdges, edge);
118 108 }
119 109
120 110 public boolean updateAssignedEdge(Edge edge) {
121   - ShortEdgeInfo edgeInfo = edge.toShortEdgeInfo();
122   - if (this.assignedEdges != null && this.assignedEdges.contains(edgeInfo)) {
123   - this.assignedEdges.remove(edgeInfo);
124   - this.assignedEdges.add(edgeInfo);
125   - return true;
126   - } else {
127   - return false;
128   - }
  111 + return EdgeUtils.updateAssignedEdge(this.assignedEdges, edge);
129 112 }
130 113
131 114 public boolean removeAssignedEdge(Edge edge) {
132   - ShortEdgeInfo edgeInfo = edge.toShortEdgeInfo();
133   - if (this.assignedEdges != null && this.assignedEdges.contains(edgeInfo)) {
134   - this.assignedEdges.remove(edgeInfo);
135   - return true;
136   - } else {
137   - return false;
138   - }
  115 + return EdgeUtils.removeAssignedEdge(this.assignedEdges, edge);
139 116 }
140 117 }
... ...
... ... @@ -22,6 +22,7 @@ import com.google.common.util.concurrent.Futures;
22 22 import com.google.common.util.concurrent.ListenableFuture;
23 23 import com.google.common.util.concurrent.MoreExecutors;
24 24 import lombok.extern.slf4j.Slf4j;
  25 +import org.apache.commons.codec.binary.Base64;
25 26 import org.springframework.beans.factory.annotation.Autowired;
26 27 import org.springframework.cache.Cache;
27 28 import org.springframework.cache.CacheManager;
... ... @@ -301,7 +302,7 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic
301 302 log.trace("Executing unassignCustomerEdges, tenantId [{}], customerId [{}]", tenantId, customerId);
302 303 validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
303 304 validateId(customerId, INCORRECT_CUSTOMER_ID + customerId);
304   - customerEdgeUnasigner.removeEntities(tenantId, customerId);
  305 + customerEdgeUnassigner.removeEntities(tenantId, customerId);
305 306 }
306 307
307 308 @Override
... ... @@ -387,7 +388,7 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic
387 388 EdgeQueueEntityType edgeQueueEntityType = getEdgeQueueTypeByEntityType(tbMsg.getOriginator().getEntityType());
388 389 if (edgeId != null && edgeQueueEntityType != null) {
389 390 try {
390   - saveEventToEdgeQueue(tenantId, edgeId, edgeQueueEntityType, tbMsg.getType(), mapper.writeValueAsString(tbMsg), callback);
  391 + saveEventToEdgeQueue(tenantId, edgeId, edgeQueueEntityType, tbMsg.getType(), Base64.encodeBase64String(TbMsg.toByteArray(tbMsg)), callback);
391 392 } catch (IOException e) {
392 393 log.error("Error while saving custom tbMsg into Edge Queue", e);
393 394 }
... ... @@ -723,7 +724,7 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic
723 724 }
724 725 };
725 726
726   - private PaginatedRemover<CustomerId, Edge> customerEdgeUnasigner = new PaginatedRemover<CustomerId, Edge>() {
  727 + private PaginatedRemover<CustomerId, Edge> customerEdgeUnassigner = new PaginatedRemover<CustomerId, Edge>() {
727 728
728 729 @Override
729 730 protected List<Edge> findEntities(TenantId tenantId, CustomerId id, TextPageLink pageLink) {
... ...