Showing
15 changed files
with
107 additions
and
133 deletions
... | ... | @@ -258,12 +258,14 @@ public final class EdgeGrpcSession implements Closeable { |
258 | 258 | |
259 | 259 | void processHandleMessages() throws ExecutionException, InterruptedException { |
260 | 260 | if (isConnected()) { |
261 | - Integer queueStartTs = getQueueStartTs().get(); | |
261 | + Long queueStartTs = getQueueStartTs().get(); | |
262 | 262 | TimePageLink pageLink = new TimePageLink( |
263 | 263 | ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), |
264 | - queueStartTs, | |
264 | + 0, | |
265 | 265 | null, |
266 | - new SortOrder("createdTime", SortOrder.Direction.ASC)); | |
266 | + new SortOrder("createdTime", SortOrder.Direction.ASC), | |
267 | + queueStartTs, | |
268 | + null); | |
267 | 269 | PageData<EdgeEvent> pageData; |
268 | 270 | UUID ifOffset = null; |
269 | 271 | boolean success = true; |
... | ... | @@ -398,15 +400,15 @@ public final class EdgeGrpcSession implements Closeable { |
398 | 400 | } |
399 | 401 | |
400 | 402 | |
401 | - private ListenableFuture<Integer> getQueueStartTs() { | |
403 | + private ListenableFuture<Long> getQueueStartTs() { | |
402 | 404 | ListenableFuture<Optional<AttributeKvEntry>> future = |
403 | 405 | ctx.getAttributesService().find(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, QUEUE_START_TS_ATTR_KEY); |
404 | 406 | return Futures.transform(future, attributeKvEntryOpt -> { |
405 | 407 | if (attributeKvEntryOpt != null && attributeKvEntryOpt.isPresent()) { |
406 | 408 | AttributeKvEntry attributeKvEntry = attributeKvEntryOpt.get(); |
407 | - return attributeKvEntry.getLongValue().isPresent() ? attributeKvEntry.getLongValue().get().intValue() : 0; | |
409 | + return attributeKvEntry.getLongValue().isPresent() ? attributeKvEntry.getLongValue().get() : 0L; | |
408 | 410 | } else { |
409 | - return 0; | |
411 | + return 0L; | |
410 | 412 | } |
411 | 413 | }, ctx.getDbCallbackExecutor()); |
412 | 414 | } | ... | ... |
... | ... | @@ -73,14 +73,11 @@ public class DeviceProcessor extends BaseProcessor { |
73 | 73 | saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, ActionType.ENTITY_EXISTS_REQUEST, device.getId(), null); |
74 | 74 | } |
75 | 75 | } else { |
76 | - Device deviceById = deviceService.findDeviceById(edge.getTenantId(), edgeDeviceId); | |
77 | - if (deviceById != null) { | |
78 | - // this ID already used by other device - create new device and update ID on the edge | |
79 | - device = createDevice(tenantId, edge, deviceUpdateMsg); | |
80 | - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, ActionType.ENTITY_EXISTS_REQUEST, device.getId(), null); | |
81 | - } else { | |
82 | - device = createDevice(tenantId, edge, deviceUpdateMsg); | |
83 | - } | |
76 | + device = createDevice(tenantId, edge, deviceUpdateMsg); | |
77 | + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, ActionType.ENTITY_EXISTS_REQUEST, device.getId(), null); | |
78 | + | |
79 | + // TODO: voba - properly handle device credentials from edge to cloud | |
80 | + // saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, ActionType.CREDENTIALS_REQUEST, device.getId(), null); | |
84 | 81 | } |
85 | 82 | // TODO: voba - assign device only in case device is not assigned yet. Missing functionality to check this relation prior assignment |
86 | 83 | deviceService.assignDeviceToEdge(edge.getTenantId(), device.getId(), edge.getId()); |
... | ... | @@ -142,22 +139,19 @@ public class DeviceProcessor extends BaseProcessor { |
142 | 139 | Device device; |
143 | 140 | try { |
144 | 141 | deviceCreationLock.lock(); |
145 | - DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB())); | |
146 | 142 | device = new Device(); |
147 | 143 | device.setTenantId(edge.getTenantId()); |
148 | 144 | device.setCustomerId(edge.getCustomerId()); |
149 | - device.setId(deviceId); | |
150 | 145 | device.setName(deviceUpdateMsg.getName()); |
151 | 146 | device.setType(deviceUpdateMsg.getType()); |
152 | 147 | device.setLabel(deviceUpdateMsg.getLabel()); |
153 | 148 | device.setAdditionalInfo(JacksonUtil.toJsonNode(deviceUpdateMsg.getAdditionalInfo())); |
154 | 149 | device = deviceService.saveDevice(device); |
155 | - createDeviceCredentials(device); | |
150 | + // TODO: voba - is this still required? | |
151 | +// createDeviceCredentials(device); | |
156 | 152 | createRelationFromEdge(tenantId, edge.getId(), device.getId()); |
157 | 153 | deviceStateService.onDeviceAdded(device); |
158 | 154 | pushDeviceCreatedEventToRuleEngine(tenantId, edge, device); |
159 | - | |
160 | - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, ActionType.CREDENTIALS_REQUEST, deviceId, null); | |
161 | 155 | } finally { |
162 | 156 | deviceCreationLock.unlock(); |
163 | 157 | } | ... | ... |
... | ... | @@ -75,7 +75,7 @@ public class BaseEdgeEventControllerTest extends AbstractControllerTest { |
75 | 75 | |
76 | 76 | @Test |
77 | 77 | public void testGetEdgeEvents() throws Exception { |
78 | - Thread.sleep(1000); | |
78 | + Thread.sleep(500); | |
79 | 79 | Edge edge = constructEdge("TestEdge", "default"); |
80 | 80 | edge = doPost("/api/edge", edge, Edge.class); |
81 | 81 | |
... | ... | @@ -83,18 +83,18 @@ public class BaseEdgeEventControllerTest extends AbstractControllerTest { |
83 | 83 | Device savedDevice = doPost("/api/device", device, Device.class); |
84 | 84 | |
85 | 85 | doPost("/api/edge/" + edge.getId().toString() + "/device/" + savedDevice.getId().toString(), Device.class); |
86 | - Thread.sleep(1000); | |
86 | + Thread.sleep(500); | |
87 | 87 | |
88 | 88 | Asset asset = constructAsset("TestAsset", "default"); |
89 | 89 | Asset savedAsset = doPost("/api/asset", asset, Asset.class); |
90 | 90 | |
91 | 91 | doPost("/api/edge/" + edge.getId().toString() + "/asset/" + savedAsset.getId().toString(), Asset.class); |
92 | - Thread.sleep(1000); | |
92 | + Thread.sleep(500); | |
93 | 93 | |
94 | 94 | EntityRelation relation = new EntityRelation(savedAsset.getId(), savedDevice.getId(), EntityRelation.CONTAINS_TYPE); |
95 | 95 | |
96 | 96 | doPost("/api/relation", relation); |
97 | - Thread.sleep(1000); | |
97 | + Thread.sleep(500); | |
98 | 98 | |
99 | 99 | List<EdgeEvent> edgeEvents = doGetTypedWithTimePageLink("/api/edge/" + edge.getId().toString() + "/events?", |
100 | 100 | new TypeReference<PageData<EdgeEvent>>() { |
... | ... | @@ -102,10 +102,10 @@ public class BaseEdgeEventControllerTest extends AbstractControllerTest { |
102 | 102 | |
103 | 103 | Assert.assertFalse(edgeEvents.isEmpty()); |
104 | 104 | Assert.assertEquals(4, edgeEvents.size()); |
105 | - Assert.assertEquals(EdgeEventType.RELATION, edgeEvents.get(0).getType()); | |
106 | - Assert.assertEquals(EdgeEventType.ASSET, edgeEvents.get(1).getType()); | |
107 | - Assert.assertEquals(EdgeEventType.DEVICE, edgeEvents.get(2).getType()); | |
108 | - Assert.assertEquals(EdgeEventType.RULE_CHAIN, edgeEvents.get(3).getType()); | |
105 | + Assert.assertEquals(EdgeEventType.RULE_CHAIN, edgeEvents.get(0).getType()); | |
106 | + Assert.assertEquals(EdgeEventType.DEVICE, edgeEvents.get(1).getType()); | |
107 | + Assert.assertEquals(EdgeEventType.ASSET, edgeEvents.get(2).getType()); | |
108 | + Assert.assertEquals(EdgeEventType.RELATION, edgeEvents.get(3).getType()); | |
109 | 109 | } |
110 | 110 | |
111 | 111 | private Device constructDevice(String name, String type) { | ... | ... |
... | ... | @@ -161,6 +161,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { |
161 | 161 | testTimeseries(); |
162 | 162 | testAttributes(); |
163 | 163 | testSendMessagesToCloud(); |
164 | + // TODO: voba - test conflict messages in case device with current name already exist or ID is already used | |
164 | 165 | } |
165 | 166 | |
166 | 167 | private void testReceivedInitialData() throws Exception { |
... | ... | @@ -808,7 +809,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { |
808 | 809 | private void sendDevice() throws Exception { |
809 | 810 | UUID uuid = Uuids.timeBased(); |
810 | 811 | |
811 | - UplinkMsg.Builder builder = UplinkMsg.newBuilder(); | |
812 | + UplinkMsg.Builder builder = UplinkMsg.newBuilder(); | |
812 | 813 | DeviceUpdateMsg.Builder deviceUpdateMsgBuilder = DeviceUpdateMsg.newBuilder(); |
813 | 814 | deviceUpdateMsgBuilder.setIdMSB(uuid.getMostSignificantBits()); |
814 | 815 | deviceUpdateMsgBuilder.setIdLSB(uuid.getLeastSignificantBits()); |
... | ... | @@ -816,11 +817,17 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { |
816 | 817 | deviceUpdateMsgBuilder.setType("test"); |
817 | 818 | deviceUpdateMsgBuilder.setMsgType(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE); |
818 | 819 | builder.addDeviceUpdateMsg(deviceUpdateMsgBuilder.build()); |
819 | - edgeImitator.expectResponsesAmount(1); | |
820 | + edgeImitator.expectResponsesAmount(2); | |
820 | 821 | edgeImitator.sendUplinkMsg(builder.build()); |
821 | 822 | edgeImitator.waitForResponses(); |
822 | 823 | |
823 | - Device device = doGet("/api/device/" + uuid.toString(), Device.class); | |
824 | + AbstractMessage latestMessage = edgeImitator.getLatestMessage(); | |
825 | + Assert.assertTrue(latestMessage instanceof DeviceUpdateMsg); | |
826 | + DeviceUpdateMsg deviceUpdateMsg = (DeviceUpdateMsg) latestMessage; | |
827 | + | |
828 | + UUID savedDeviceId = new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB()); | |
829 | + | |
830 | + Device device = doGet("/api/device/" + savedDeviceId, Device.class); | |
824 | 831 | Assert.assertNotNull(device); |
825 | 832 | Assert.assertEquals("Edge Device 2", device.getName()); |
826 | 833 | } | ... | ... |
1 | -/** | |
2 | - * Copyright © 2016-2020 The Thingsboard Authors | |
3 | - * | |
4 | - * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | - * you may not use this file except in compliance with the License. | |
6 | - * You may obtain a copy of the License at | |
7 | - * | |
8 | - * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | - * | |
10 | - * Unless required by applicable law or agreed to in writing, software | |
11 | - * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | - * See the License for the specific language governing permissions and | |
14 | - * limitations under the License. | |
15 | - */ | |
16 | -package org.thingsboard.server.edge; | |
17 | - | |
18 | -import org.cassandraunit.dataset.cql.ClassPathCQLDataSet; | |
19 | -import org.junit.BeforeClass; | |
20 | -import org.junit.ClassRule; | |
21 | -import org.junit.extensions.cpsuite.ClasspathSuite; | |
22 | -import org.junit.runner.RunWith; | |
23 | -import org.thingsboard.server.dao.CustomCassandraCQLUnit; | |
24 | -import org.thingsboard.server.queue.memory.InMemoryStorage; | |
25 | - | |
26 | -import java.util.Arrays; | |
27 | - | |
28 | -@RunWith(ClasspathSuite.class) | |
29 | -@ClasspathSuite.ClassnameFilters({ | |
30 | - "org.thingsboard.server.edge.nosql.*Test"}) | |
31 | -public class EdgeNoSqlTestSuite { | |
32 | - | |
33 | - @ClassRule | |
34 | - public static CustomCassandraCQLUnit cassandraUnit = | |
35 | - new CustomCassandraCQLUnit( | |
36 | - Arrays.asList( | |
37 | - new ClassPathCQLDataSet("cassandra/schema-ts.cql", false, false), | |
38 | - new ClassPathCQLDataSet("cassandra/schema-entities.cql", false, false), | |
39 | - new ClassPathCQLDataSet("cassandra/system-data.cql", false, false), | |
40 | - new ClassPathCQLDataSet("cassandra/system-test.cql", false, false)), | |
41 | - "cassandra-test.yaml", 30000l); | |
42 | - | |
43 | - @BeforeClass | |
44 | - public static void cleanupInMemStorage(){ | |
45 | - InMemoryStorage.getInstance().cleanup(); | |
46 | - } | |
47 | -} |
... | ... | @@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.asset.AssetInfo; |
22 | 22 | import org.thingsboard.server.common.data.id.TenantId; |
23 | 23 | import org.thingsboard.server.common.data.page.PageData; |
24 | 24 | import org.thingsboard.server.common.data.page.PageLink; |
25 | +import org.thingsboard.server.common.data.page.TimePageLink; | |
25 | 26 | import org.thingsboard.server.dao.Dao; |
26 | 27 | |
27 | 28 | import java.util.List; |
... | ... | @@ -174,5 +175,5 @@ public interface AssetDao extends Dao<Asset> { |
174 | 175 | * @param pageLink the page link |
175 | 176 | * @return the list of asset objects |
176 | 177 | */ |
177 | - PageData<Asset> findAssetsByTenantIdAndEdgeId(UUID tenantId, UUID edgeId, PageLink pageLink); | |
178 | + PageData<Asset> findAssetsByTenantIdAndEdgeId(UUID tenantId, UUID edgeId, TimePageLink pageLink); | |
178 | 179 | } | ... | ... |
... | ... | @@ -21,7 +21,6 @@ import lombok.Data; |
21 | 21 | import lombok.EqualsAndHashCode; |
22 | 22 | import org.hibernate.annotations.Type; |
23 | 23 | import org.hibernate.annotations.TypeDef; |
24 | -import org.thingsboard.server.common.data.UUIDConverter; | |
25 | 24 | import org.thingsboard.server.common.data.edge.Edge; |
26 | 25 | import org.thingsboard.server.common.data.id.CustomerId; |
27 | 26 | import org.thingsboard.server.common.data.id.EdgeId; |
... | ... | @@ -34,6 +33,7 @@ import org.thingsboard.server.dao.util.mapping.JsonStringType; |
34 | 33 | |
35 | 34 | import javax.persistence.Column; |
36 | 35 | import javax.persistence.MappedSuperclass; |
36 | +import java.util.UUID; | |
37 | 37 | |
38 | 38 | import static org.thingsboard.server.dao.model.ModelConstants.EDGE_CLOUD_ENDPOINT_KEY_PROPERTY; |
39 | 39 | import static org.thingsboard.server.dao.model.ModelConstants.EDGE_CUSTOMER_ID_PROPERTY; |
... | ... | @@ -53,14 +53,14 @@ import static org.thingsboard.server.dao.model.ModelConstants.SEARCH_TEXT_PROPER |
53 | 53 | @MappedSuperclass |
54 | 54 | public abstract class AbstractEdgeEntity<T extends Edge> extends BaseSqlEntity<T> implements SearchTextEntity<T> { |
55 | 55 | |
56 | - @Column(name = EDGE_TENANT_ID_PROPERTY) | |
57 | - private String tenantId; | |
56 | + @Column(name = EDGE_TENANT_ID_PROPERTY, columnDefinition = "uuid") | |
57 | + private UUID tenantId; | |
58 | 58 | |
59 | - @Column(name = EDGE_CUSTOMER_ID_PROPERTY) | |
60 | - private String customerId; | |
59 | + @Column(name = EDGE_CUSTOMER_ID_PROPERTY, columnDefinition = "uuid") | |
60 | + private UUID customerId; | |
61 | 61 | |
62 | - @Column(name = EDGE_ROOT_RULE_CHAIN_ID_PROPERTY) | |
63 | - private String rootRuleChainId; | |
62 | + @Column(name = EDGE_ROOT_RULE_CHAIN_ID_PROPERTY, columnDefinition = "uuid") | |
63 | + private UUID rootRuleChainId; | |
64 | 64 | |
65 | 65 | @Column(name = EDGE_TYPE_PROPERTY) |
66 | 66 | private String type; |
... | ... | @@ -103,13 +103,13 @@ public abstract class AbstractEdgeEntity<T extends Edge> extends BaseSqlEntity<T |
103 | 103 | this.setUuid(edge.getId().getId()); |
104 | 104 | } |
105 | 105 | if (edge.getTenantId() != null) { |
106 | - this.tenantId = UUIDConverter.fromTimeUUID(edge.getTenantId().getId()); | |
106 | + this.tenantId = edge.getTenantId().getId(); | |
107 | 107 | } |
108 | 108 | if (edge.getCustomerId() != null) { |
109 | - this.customerId = UUIDConverter.fromTimeUUID(edge.getCustomerId().getId()); | |
109 | + this.customerId = edge.getCustomerId().getId(); | |
110 | 110 | } |
111 | 111 | if (edge.getRootRuleChainId() != null) { |
112 | - this.rootRuleChainId = UUIDConverter.fromTimeUUID(edge.getRootRuleChainId().getId()); | |
112 | + this.rootRuleChainId = edge.getRootRuleChainId().getId(); | |
113 | 113 | } |
114 | 114 | this.type = edge.getType(); |
115 | 115 | this.name = edge.getName(); |
... | ... | @@ -157,13 +157,13 @@ public abstract class AbstractEdgeEntity<T extends Edge> extends BaseSqlEntity<T |
157 | 157 | Edge edge = new Edge(new EdgeId(getUuid())); |
158 | 158 | edge.setCreatedTime(Uuids.unixTimestamp(getUuid())); |
159 | 159 | if (tenantId != null) { |
160 | - edge.setTenantId(new TenantId(UUIDConverter.fromString(tenantId))); | |
160 | + edge.setTenantId(new TenantId(tenantId)); | |
161 | 161 | } |
162 | 162 | if (customerId != null) { |
163 | - edge.setCustomerId(new CustomerId(UUIDConverter.fromString(customerId))); | |
163 | + edge.setCustomerId(new CustomerId(customerId)); | |
164 | 164 | } |
165 | 165 | if (rootRuleChainId != null) { |
166 | - edge.setRootRuleChainId(new RuleChainId(UUIDConverter.fromString(rootRuleChainId))); | |
166 | + edge.setRootRuleChainId(new RuleChainId(rootRuleChainId)); | |
167 | 167 | } |
168 | 168 | edge.setType(type); |
169 | 169 | edge.setName(name); | ... | ... |
... | ... | @@ -91,6 +91,7 @@ public class EdgeEventEntity extends BaseSqlEntity<EdgeEvent> implements BaseEnt |
91 | 91 | } else { |
92 | 92 | this.ts = System.currentTimeMillis(); |
93 | 93 | } |
94 | + this.setCreatedTime(edgeEvent.getCreatedTime()); | |
94 | 95 | if (edgeEvent.getTenantId() != null) { |
95 | 96 | this.tenantId = edgeEvent.getTenantId().getId(); |
96 | 97 | } | ... | ... |
... | ... | @@ -57,9 +57,6 @@ public class JpaAssetDao extends JpaAbstractSearchTextDao<AssetEntity, Asset> im |
57 | 57 | @Autowired |
58 | 58 | private AssetRepository assetRepository; |
59 | 59 | |
60 | - @Autowired | |
61 | - private RelationDao relationDao; | |
62 | - | |
63 | 60 | @Override |
64 | 61 | protected Class<AssetEntity> getEntityClass() { |
65 | 62 | return AssetEntity.class; |
... | ... | @@ -190,7 +187,7 @@ public class JpaAssetDao extends JpaAbstractSearchTextDao<AssetEntity, Asset> im |
190 | 187 | } |
191 | 188 | |
192 | 189 | @Override |
193 | - public PageData<Asset> findAssetsByTenantIdAndEdgeId(UUID tenantId, UUID edgeId, PageLink pageLink) { | |
190 | + public PageData<Asset> findAssetsByTenantIdAndEdgeId(UUID tenantId, UUID edgeId, TimePageLink pageLink) { | |
194 | 191 | log.debug("Try to find assets by tenantId [{}], edgeId [{}] and pageLink [{}]", tenantId, edgeId, pageLink); |
195 | 192 | return DaoUtil.toPageData(assetRepository |
196 | 193 | .findByTenantIdAndEdgeId( | ... | ... |
... | ... | @@ -171,7 +171,7 @@ public interface DeviceRepository extends PagingAndSortingRepository<DeviceEntit |
171 | 171 | Long countByDeviceProfileId(UUID deviceProfileId); |
172 | 172 | |
173 | 173 | @Query("SELECT d FROM DeviceEntity d, RelationEntity re WHERE d.tenantId = :tenantId " + |
174 | - "AND d.id = re.toId AND re.toType = 'ASSET' AND re.relationTypeGroup = 'EDGE' " + | |
174 | + "AND d.id = re.toId AND re.toType = 'DEVICE' AND re.relationTypeGroup = 'EDGE' " + | |
175 | 175 | "AND re.relationType = 'Contains' AND re.fromId = :edgeId AND re.fromType = 'EDGE' " + |
176 | 176 | "AND LOWER(d.searchText) LIKE LOWER(CONCAT(:searchText, '%'))") |
177 | 177 | Page<DeviceEntity> findByTenantIdAndEdgeId(@Param("tenantId") UUID tenantId, | ... | ... |
... | ... | @@ -36,6 +36,19 @@ public interface EdgeEventRepository extends PagingAndSortingRepository<EdgeEven |
36 | 36 | Page<EdgeEventEntity> findEdgeEventsByTenantIdAndEdgeId(@Param("tenantId") UUID tenantId, |
37 | 37 | @Param("edgeId") UUID edgeId, |
38 | 38 | @Param("startTime") Long startTime, |
39 | - @Param("startTime") Long endTime, | |
39 | + @Param("endTime") Long endTime, | |
40 | 40 | Pageable pageable); |
41 | + | |
42 | + @Query("SELECT e FROM EdgeEventEntity e WHERE " + | |
43 | + "e.tenantId = :tenantId " + | |
44 | + "AND e.edgeId = :edgeId " + | |
45 | + "AND (:startTime IS NULL OR e.createdTime >= :startTime) " + | |
46 | + "AND (:endTime IS NULL OR e.createdTime <= :endTime) " + | |
47 | + "AND e.edgeEventAction <> 'TIMESERIES_UPDATED'" | |
48 | + ) | |
49 | + Page<EdgeEventEntity> findEdgeEventsByTenantIdAndEdgeIdWithoutTimeseriesUpdated(@Param("tenantId") UUID tenantId, | |
50 | + @Param("edgeId") UUID edgeId, | |
51 | + @Param("startTime") Long startTime, | |
52 | + @Param("endTime") Long endTime, | |
53 | + Pageable pageable); | |
41 | 54 | } | ... | ... |
... | ... | @@ -5,7 +5,7 @@ |
5 | 5 | * you may not use this file except in compliance with the License. |
6 | 6 | * You may obtain a copy of the License at |
7 | 7 | * |
8 | - * http://www.apache.org/licenses/LICENSE-2.0 | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | 9 | * |
10 | 10 | * Unless required by applicable law or agreed to in writing, software |
11 | 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
... | ... | @@ -18,17 +18,20 @@ package org.thingsboard.server.dao.sql.edge; |
18 | 18 | import com.datastax.oss.driver.api.core.uuid.Uuids; |
19 | 19 | import com.google.common.util.concurrent.ListenableFuture; |
20 | 20 | import lombok.extern.slf4j.Slf4j; |
21 | +import org.apache.commons.lang3.StringUtils; | |
21 | 22 | import org.springframework.beans.factory.annotation.Autowired; |
22 | 23 | import org.springframework.data.repository.CrudRepository; |
23 | 24 | import org.springframework.stereotype.Component; |
24 | 25 | import org.thingsboard.server.common.data.edge.EdgeEvent; |
25 | 26 | import org.thingsboard.server.common.data.id.EdgeEventId; |
26 | 27 | import org.thingsboard.server.common.data.id.EdgeId; |
28 | +import org.thingsboard.server.common.data.id.EventId; | |
27 | 29 | import org.thingsboard.server.common.data.page.PageData; |
28 | 30 | import org.thingsboard.server.common.data.page.TimePageLink; |
29 | 31 | import org.thingsboard.server.dao.DaoUtil; |
30 | 32 | import org.thingsboard.server.dao.edge.EdgeEventDao; |
31 | 33 | import org.thingsboard.server.dao.model.sql.EdgeEventEntity; |
34 | +import org.thingsboard.server.dao.model.sql.EventEntity; | |
32 | 35 | import org.thingsboard.server.dao.sql.JpaAbstractSearchTextDao; |
33 | 36 | |
34 | 37 | import java.util.Optional; |
... | ... | @@ -59,27 +62,45 @@ public class JpaBaseEdgeEventDao extends JpaAbstractSearchTextDao<EdgeEventEntit |
59 | 62 | public ListenableFuture<EdgeEvent> saveAsync(EdgeEvent edgeEvent) { |
60 | 63 | log.debug("Save edge event [{}] ", edgeEvent); |
61 | 64 | if (edgeEvent.getId() == null) { |
62 | - edgeEvent.setId(new EdgeEventId(Uuids.timeBased())); | |
65 | + UUID timeBased = Uuids.timeBased(); | |
66 | + edgeEvent.setId(new EdgeEventId(timeBased)); | |
67 | + edgeEvent.setCreatedTime(Uuids.unixTimestamp(timeBased)); | |
68 | + } else if (edgeEvent.getCreatedTime() == 0L) { | |
69 | + UUID eventId = edgeEvent.getId().getId(); | |
70 | + if (eventId.version() == 1) { | |
71 | + edgeEvent.setCreatedTime(Uuids.unixTimestamp(eventId)); | |
72 | + } else { | |
73 | + edgeEvent.setCreatedTime(System.currentTimeMillis()); | |
74 | + } | |
75 | + } | |
76 | + if (StringUtils.isEmpty(edgeEvent.getUid())) { | |
77 | + edgeEvent.setUid(edgeEvent.getId().toString()); | |
63 | 78 | } |
64 | 79 | return service.submit(() -> save(new EdgeEventEntity(edgeEvent)).orElse(null)); |
65 | 80 | } |
66 | 81 | |
67 | 82 | @Override |
68 | 83 | public PageData<EdgeEvent> findEdgeEvents(UUID tenantId, EdgeId edgeId, TimePageLink pageLink, boolean withTsUpdate) { |
69 | - return DaoUtil.toPageData( | |
70 | - edgeEventRepository | |
71 | - .findEdgeEventsByTenantIdAndEdgeId( | |
72 | - tenantId, | |
73 | - edgeId.getId(), | |
74 | - pageLink.getStartTime(), | |
75 | - pageLink.getEndTime(), | |
76 | - DaoUtil.toPageable(pageLink))); | |
77 | - // TODO: voba | |
78 | -// Specification<EdgeEventEntity> timeSearchSpec = JpaAbstractSearchTimeDao.getTimeSearchPageSpec(pageLink, "id"); | |
79 | -// Specification<EdgeEventEntity> fieldsSpec = getEntityFieldsSpec(tenantId, edgeId, withTsUpdate); | |
80 | -// Sort.Direction sortDirection = pageLink.isAscOrder() ? Sort.Direction.ASC : Sort.Direction.DESC; | |
81 | -// Pageable pageable = PageRequest.of(0, pageLink.getLimit(), sortDirection, ID_PROPERTY); | |
82 | -// return DaoUtil.convertDataList(edgeEventRepository.findAll(Specification.where(timeSearchSpec).and(fieldsSpec), pageable).getContent()); | |
84 | + if (withTsUpdate) { | |
85 | + return DaoUtil.toPageData( | |
86 | + edgeEventRepository | |
87 | + .findEdgeEventsByTenantIdAndEdgeId( | |
88 | + tenantId, | |
89 | + edgeId.getId(), | |
90 | + pageLink.getStartTime(), | |
91 | + pageLink.getEndTime(), | |
92 | + DaoUtil.toPageable(pageLink))); | |
93 | + } else { | |
94 | + return DaoUtil.toPageData( | |
95 | + edgeEventRepository | |
96 | + .findEdgeEventsByTenantIdAndEdgeIdWithoutTimeseriesUpdated( | |
97 | + tenantId, | |
98 | + edgeId.getId(), | |
99 | + pageLink.getStartTime(), | |
100 | + pageLink.getEndTime(), | |
101 | + DaoUtil.toPageable(pageLink))); | |
102 | + | |
103 | + } | |
83 | 104 | } |
84 | 105 | |
85 | 106 | public Optional<EdgeEvent> save(EdgeEventEntity entity) { |
... | ... | @@ -94,23 +115,4 @@ public class JpaBaseEdgeEventDao extends JpaAbstractSearchTextDao<EdgeEventEntit |
94 | 115 | return Optional.of(DaoUtil.getData(edgeEventRepository.save(entity))); |
95 | 116 | } |
96 | 117 | |
97 | - // TODO: voba | |
98 | -// private Specification<EdgeEventEntity> getEntityFieldsSpec(UUID tenantId, EdgeId edgeId, boolean withTsUpdate) { | |
99 | -// return (root, criteriaQuery, criteriaBuilder) -> { | |
100 | -// List<Predicate> predicates = new ArrayList<>(); | |
101 | -// if (tenantId != null) { | |
102 | -// Predicate tenantIdPredicate = criteriaBuilder.equal(root.get("tenantId"), UUIDConverter.fromTimeUUID(tenantId)); | |
103 | -// predicates.add(tenantIdPredicate); | |
104 | -// } | |
105 | -// if (edgeId != null) { | |
106 | -// Predicate entityIdPredicate = criteriaBuilder.equal(root.get("edgeId"), UUIDConverter.fromTimeUUID(edgeId.getId())); | |
107 | -// predicates.add(entityIdPredicate); | |
108 | -// } | |
109 | -// if (!withTsUpdate) { | |
110 | -// Predicate edgeEventActionPredicate = criteriaBuilder.notEqual(root.get("edgeEventAction"), ActionType.TIMESERIES_UPDATED.name()); | |
111 | -// predicates.add(edgeEventActionPredicate); | |
112 | -// } | |
113 | -// return criteriaBuilder.and(predicates.toArray(new Predicate[]{})); | |
114 | -// }; | |
115 | -// } | |
116 | 118 | } | ... | ... |
... | ... | @@ -122,7 +122,7 @@ public interface EntityViewRepository extends PagingAndSortingRepository<EntityV |
122 | 122 | List<String> findTenantEntityViewTypes(@Param("tenantId") UUID tenantId); |
123 | 123 | |
124 | 124 | @Query("SELECT ev FROM EntityViewEntity ev, RelationEntity re WHERE ev.tenantId = :tenantId " + |
125 | - "AND ev.id = re.toId AND re.toType = 'ASSET' AND re.relationTypeGroup = 'EDGE' " + | |
125 | + "AND ev.id = re.toId AND re.toType = 'ENTITY_VIEW' AND re.relationTypeGroup = 'EDGE' " + | |
126 | 126 | "AND re.relationType = 'Contains' AND re.fromId = :edgeId AND re.fromType = 'EDGE' " + |
127 | 127 | "AND LOWER(ev.searchText) LIKE LOWER(CONCAT(:searchText, '%'))") |
128 | 128 | Page<EntityViewEntity> findByTenantIdAndEdgeId(@Param("tenantId") UUID tenantId, | ... | ... |
... | ... | @@ -334,6 +334,7 @@ CREATE TABLE IF NOT EXISTS ts_kv_dictionary ( |
334 | 334 | ); |
335 | 335 | CREATE TABLE IF NOT EXISTS edge ( |
336 | 336 | id uuid NOT NULL CONSTRAINT edge_pkey PRIMARY KEY, |
337 | + created_time bigint NOT NULL, | |
337 | 338 | additional_info varchar, |
338 | 339 | customer_id uuid, |
339 | 340 | root_rule_chain_id uuid, |
... | ... | @@ -353,6 +354,7 @@ CREATE TABLE IF NOT EXISTS edge ( |
353 | 354 | |
354 | 355 | CREATE TABLE IF NOT EXISTS edge_event ( |
355 | 356 | id uuid NOT NULL CONSTRAINT edge_event_pkey PRIMARY KEY, |
357 | + created_time bigint NOT NULL, | |
356 | 358 | edge_id uuid, |
357 | 359 | edge_event_type varchar(255), |
358 | 360 | edge_event_uid varchar(255), | ... | ... |
... | ... | @@ -362,6 +362,7 @@ CREATE TABLE IF NOT EXISTS ts_kv_dictionary |
362 | 362 | |
363 | 363 | CREATE TABLE IF NOT EXISTS edge ( |
364 | 364 | id uuid NOT NULL CONSTRAINT edge_pkey PRIMARY KEY, |
365 | + created_time bigint NOT NULL, | |
365 | 366 | additional_info varchar, |
366 | 367 | customer_id uuid, |
367 | 368 | root_rule_chain_id uuid, |
... | ... | @@ -381,6 +382,7 @@ CREATE TABLE IF NOT EXISTS edge ( |
381 | 382 | |
382 | 383 | CREATE TABLE IF NOT EXISTS edge_event ( |
383 | 384 | id uuid NOT NULL CONSTRAINT edge_event_pkey PRIMARY KEY, |
385 | + created_time bigint NOT NULL, | |
384 | 386 | edge_id uuid, |
385 | 387 | edge_event_type varchar(255), |
386 | 388 | edge_event_uid varchar(255), | ... | ... |