Commit 2dd2e6f67aba00c26ed377fad7763dde6d211343
Committed by
Andrew Shvayka
1 parent
6fb040a4
Fixed violations on event primary and unique key constraints & code improvement (#1921)
* init commit * fix-violation-of-primary-key-constraint * revert thingsboard.yml changes * remove @Slf4j annotation * update code * update Events Dao * code improvements * revert changes in logback.xml * cleaned code * attributes/on-update-set-null-values
Showing
7 changed files
with
212 additions
and
46 deletions
@@ -15,7 +15,6 @@ | @@ -15,7 +15,6 @@ | ||
15 | */ | 15 | */ |
16 | package org.thingsboard.server.dao.sql.attributes; | 16 | package org.thingsboard.server.dao.sql.attributes; |
17 | 17 | ||
18 | -import lombok.extern.slf4j.Slf4j; | ||
19 | import org.springframework.data.jpa.repository.Modifying; | 18 | import org.springframework.data.jpa.repository.Modifying; |
20 | import org.springframework.stereotype.Repository; | 19 | import org.springframework.stereotype.Repository; |
21 | import org.thingsboard.server.dao.model.sql.AttributeKvEntity; | 20 | import org.thingsboard.server.dao.model.sql.AttributeKvEntity; |
@@ -24,7 +23,6 @@ import org.thingsboard.server.dao.util.SqlDao; | @@ -24,7 +23,6 @@ import org.thingsboard.server.dao.util.SqlDao; | ||
24 | import javax.persistence.EntityManager; | 23 | import javax.persistence.EntityManager; |
25 | import javax.persistence.PersistenceContext; | 24 | import javax.persistence.PersistenceContext; |
26 | 25 | ||
27 | -@Slf4j | ||
28 | @SqlDao | 26 | @SqlDao |
29 | @Repository | 27 | @Repository |
30 | public abstract class AttributeKvInsertRepository { | 28 | public abstract class AttributeKvInsertRepository { |
dao/src/main/java/org/thingsboard/server/dao/sql/attributes/HsqlAttributesInsertRepository.java
0 → 100644
1 | +/** | ||
2 | + * Copyright © 2016-2019 The Thingsboard Authors | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.thingsboard.server.dao.sql.attributes; | ||
17 | + | ||
18 | +import org.springframework.stereotype.Repository; | ||
19 | +import org.springframework.transaction.annotation.Transactional; | ||
20 | +import org.thingsboard.server.dao.model.sql.AttributeKvEntity; | ||
21 | +import org.thingsboard.server.dao.util.HsqlDao; | ||
22 | +import org.thingsboard.server.dao.util.SqlDao; | ||
23 | + | ||
24 | +@SqlDao | ||
25 | +@HsqlDao | ||
26 | +@Repository | ||
27 | +@Transactional | ||
28 | +public class HsqlAttributesInsertRepository extends AttributeKvInsertRepository { | ||
29 | + | ||
30 | + private static final String ON_BOOL_VALUE_UPDATE_SET_NULLS = " attribute_kv.str_v = null, attribute_kv.long_v = null, attribute_kv.dbl_v = null "; | ||
31 | + private static final String ON_STR_VALUE_UPDATE_SET_NULLS = " attribute_kv.bool_v = null, attribute_kv.long_v = null, attribute_kv.dbl_v = null "; | ||
32 | + private static final String ON_LONG_VALUE_UPDATE_SET_NULLS = " attribute_kv.str_v = null, attribute_kv.bool_v = null, attribute_kv.dbl_v = null "; | ||
33 | + private static final String ON_DBL_VALUE_UPDATE_SET_NULLS = " attribute_kv.str_v = null, attribute_kv.long_v = null, attribute_kv.bool_v = null "; | ||
34 | + | ||
35 | + private static final String INSERT_BOOL_STATEMENT = getInsertOrUpdateString(BOOL_V, ON_BOOL_VALUE_UPDATE_SET_NULLS); | ||
36 | + private static final String INSERT_STR_STATEMENT = getInsertOrUpdateString(STR_V, ON_STR_VALUE_UPDATE_SET_NULLS); | ||
37 | + private static final String INSERT_LONG_STATEMENT = getInsertOrUpdateString(LONG_V, ON_LONG_VALUE_UPDATE_SET_NULLS); | ||
38 | + private static final String INSERT_DBL_STATEMENT = getInsertOrUpdateString(DBL_V, ON_DBL_VALUE_UPDATE_SET_NULLS); | ||
39 | + | ||
40 | + @Override | ||
41 | + public void saveOrUpdate(AttributeKvEntity entity) { | ||
42 | + processSaveOrUpdate(entity, INSERT_BOOL_STATEMENT, INSERT_STR_STATEMENT, INSERT_LONG_STATEMENT, INSERT_DBL_STATEMENT); | ||
43 | + } | ||
44 | + | ||
45 | + private static String getInsertOrUpdateString(String value, String nullValues) { | ||
46 | + return "MERGE INTO attribute_kv USING(VALUES :entity_type, :entity_id, :attribute_type, :attribute_key, :" + value + ", :last_update_ts) A (entity_type, entity_id, attribute_type, attribute_key, " + value + ", last_update_ts) ON (attribute_kv.entity_type=A.entity_type AND attribute_kv.entity_id=A.entity_id AND attribute_kv.attribute_type=A.attribute_type AND attribute_kv.attribute_key=A.attribute_key) WHEN MATCHED THEN UPDATE SET attribute_kv." + value + " = A." + value + ", attribute_kv.last_update_ts = A.last_update_ts," + nullValues + "WHEN NOT MATCHED THEN INSERT (entity_type, entity_id, attribute_type, attribute_key, " + value + ", last_update_ts) VALUES (A.entity_type, A.entity_id, A.attribute_type, A.attribute_key, A." + value + ", A.last_update_ts)"; | ||
47 | + } | ||
48 | +} |
dao/src/main/java/org/thingsboard/server/dao/sql/attributes/PsqlAttributesInsertRepository.java
renamed from
dao/src/main/java/org/thingsboard/server/dao/sql/attributes/PsqlInsertRepository.java
@@ -25,19 +25,24 @@ import org.thingsboard.server.dao.util.SqlDao; | @@ -25,19 +25,24 @@ import org.thingsboard.server.dao.util.SqlDao; | ||
25 | @PsqlDao | 25 | @PsqlDao |
26 | @Repository | 26 | @Repository |
27 | @Transactional | 27 | @Transactional |
28 | -public class PsqlInsertRepository extends AttributeKvInsertRepository { | 28 | +public class PsqlAttributesInsertRepository extends AttributeKvInsertRepository { |
29 | 29 | ||
30 | - private static final String INSERT_OR_UPDATE_BOOL_STATEMENT = getInsertOrUpdateString(BOOL_V); | ||
31 | - private static final String INSERT_OR_UPDATE_STR_STATEMENT = getInsertOrUpdateString(STR_V); | ||
32 | - private static final String INSERT_OR_UPDATE_LONG_STATEMENT = getInsertOrUpdateString(LONG_V); | ||
33 | - private static final String INSERT_OR_UPDATE_DBL_STATEMENT = getInsertOrUpdateString(DBL_V); | 30 | + private static final String ON_BOOL_VALUE_UPDATE_SET_NULLS = "str_v = null, long_v = null, dbl_v = null"; |
31 | + private static final String ON_STR_VALUE_UPDATE_SET_NULLS = "bool_v = null, long_v = null, dbl_v = null"; | ||
32 | + private static final String ON_LONG_VALUE_UPDATE_SET_NULLS = "str_v = null, bool_v = null, dbl_v = null"; | ||
33 | + private static final String ON_DBL_VALUE_UPDATE_SET_NULLS = "str_v = null, long_v = null, bool_v = null"; | ||
34 | + | ||
35 | + private static final String INSERT_OR_UPDATE_BOOL_STATEMENT = getInsertOrUpdateString(BOOL_V, ON_BOOL_VALUE_UPDATE_SET_NULLS); | ||
36 | + private static final String INSERT_OR_UPDATE_STR_STATEMENT = getInsertOrUpdateString(STR_V, ON_STR_VALUE_UPDATE_SET_NULLS); | ||
37 | + private static final String INSERT_OR_UPDATE_LONG_STATEMENT = getInsertOrUpdateString(LONG_V , ON_LONG_VALUE_UPDATE_SET_NULLS); | ||
38 | + private static final String INSERT_OR_UPDATE_DBL_STATEMENT = getInsertOrUpdateString(DBL_V, ON_DBL_VALUE_UPDATE_SET_NULLS); | ||
34 | 39 | ||
35 | @Override | 40 | @Override |
36 | public void saveOrUpdate(AttributeKvEntity entity) { | 41 | public void saveOrUpdate(AttributeKvEntity entity) { |
37 | processSaveOrUpdate(entity, INSERT_OR_UPDATE_BOOL_STATEMENT, INSERT_OR_UPDATE_STR_STATEMENT, INSERT_OR_UPDATE_LONG_STATEMENT, INSERT_OR_UPDATE_DBL_STATEMENT); | 42 | processSaveOrUpdate(entity, INSERT_OR_UPDATE_BOOL_STATEMENT, INSERT_OR_UPDATE_STR_STATEMENT, INSERT_OR_UPDATE_LONG_STATEMENT, INSERT_OR_UPDATE_DBL_STATEMENT); |
38 | } | 43 | } |
39 | 44 | ||
40 | - private static String getInsertOrUpdateString(String value) { | ||
41 | - return "INSERT INTO attribute_kv (entity_type, entity_id, attribute_type, attribute_key, " + value + ", last_update_ts) VALUES (:entity_type, :entity_id, :attribute_type, :attribute_key, :" + value + ", :last_update_ts) ON CONFLICT (entity_type, entity_id, attribute_type, attribute_key) DO UPDATE SET " + value + " = :" + value + ", last_update_ts = :last_update_ts"; | 45 | + private static String getInsertOrUpdateString(String value, String nullValues) { |
46 | + return "INSERT INTO attribute_kv (entity_type, entity_id, attribute_type, attribute_key, " + value + ", last_update_ts) VALUES (:entity_type, :entity_id, :attribute_type, :attribute_key, :" + value + ", :last_update_ts) ON CONFLICT (entity_type, entity_id, attribute_type, attribute_key) DO UPDATE SET " + value + " = :" + value + ", last_update_ts = :last_update_ts," + nullValues; | ||
42 | } | 47 | } |
43 | } | 48 | } |
dao/src/main/java/org/thingsboard/server/dao/sql/event/EventInsertRepository.java
renamed from
dao/src/main/java/org/thingsboard/server/dao/sql/attributes/HsqlInsertRepository.java
@@ -13,75 +13,84 @@ | @@ -13,75 +13,84 @@ | ||
13 | * See the License for the specific language governing permissions and | 13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. | 14 | * limitations under the License. |
15 | */ | 15 | */ |
16 | -package org.thingsboard.server.dao.sql.attributes; | 16 | +package org.thingsboard.server.dao.sql.event; |
17 | 17 | ||
18 | import lombok.extern.slf4j.Slf4j; | 18 | import lombok.extern.slf4j.Slf4j; |
19 | import org.hibernate.exception.ConstraintViolationException; | 19 | import org.hibernate.exception.ConstraintViolationException; |
20 | import org.springframework.beans.factory.annotation.Autowired; | 20 | import org.springframework.beans.factory.annotation.Autowired; |
21 | +import org.springframework.data.jpa.repository.Modifying; | ||
21 | import org.springframework.stereotype.Repository; | 22 | import org.springframework.stereotype.Repository; |
22 | import org.springframework.transaction.PlatformTransactionManager; | 23 | import org.springframework.transaction.PlatformTransactionManager; |
23 | import org.springframework.transaction.TransactionDefinition; | 24 | import org.springframework.transaction.TransactionDefinition; |
24 | import org.springframework.transaction.TransactionStatus; | 25 | import org.springframework.transaction.TransactionStatus; |
25 | import org.springframework.transaction.support.DefaultTransactionDefinition; | 26 | import org.springframework.transaction.support.DefaultTransactionDefinition; |
26 | import org.thingsboard.server.common.data.UUIDConverter; | 27 | import org.thingsboard.server.common.data.UUIDConverter; |
27 | -import org.thingsboard.server.dao.model.sql.AttributeKvEntity; | ||
28 | -import org.thingsboard.server.dao.util.HsqlDao; | 28 | +import org.thingsboard.server.dao.model.sql.EventEntity; |
29 | import org.thingsboard.server.dao.util.SqlDao; | 29 | import org.thingsboard.server.dao.util.SqlDao; |
30 | 30 | ||
31 | +import javax.persistence.EntityManager; | ||
32 | +import javax.persistence.PersistenceContext; | ||
33 | +import javax.persistence.Query; | ||
34 | + | ||
31 | @Slf4j | 35 | @Slf4j |
32 | @SqlDao | 36 | @SqlDao |
33 | -@HsqlDao | ||
34 | @Repository | 37 | @Repository |
35 | -public class HsqlInsertRepository extends AttributeKvInsertRepository { | ||
36 | - | ||
37 | - @Autowired | ||
38 | - private PlatformTransactionManager transactionManager; | 38 | +public abstract class EventInsertRepository { |
39 | 39 | ||
40 | - private static final String INSERT_BOOL_STATEMENT = getInsertString(BOOL_V); | ||
41 | - private static final String INSERT_STR_STATEMENT = getInsertString(STR_V); | ||
42 | - private static final String INSERT_LONG_STATEMENT = getInsertString(LONG_V); | ||
43 | - private static final String INSERT_DBL_STATEMENT = getInsertString(DBL_V); | 40 | + @PersistenceContext |
41 | + protected EntityManager entityManager; | ||
44 | 42 | ||
45 | - private static final String WHERE_STATEMENT = " WHERE entity_type = :entity_type AND entity_id = :entity_id AND attribute_type = :attribute_type AND attribute_key = :attribute_key"; | 43 | + @Autowired |
44 | + protected PlatformTransactionManager transactionManager; | ||
46 | 45 | ||
47 | - private static final String UPDATE_BOOL_STATEMENT = getUpdateString(BOOL_V); | ||
48 | - private static final String UPDATE_STR_STATEMENT = getUpdateString(STR_V); | ||
49 | - private static final String UPDATE_LONG_STATEMENT = getUpdateString(LONG_V); | ||
50 | - private static final String UPDATE_DBL_STATEMENT = getUpdateString(DBL_V); | 46 | + public abstract EventEntity saveOrUpdate(EventEntity entity); |
51 | 47 | ||
52 | - @Override | ||
53 | - public void saveOrUpdate(AttributeKvEntity entity) { | ||
54 | - DefaultTransactionDefinition insertDefinition = new DefaultTransactionDefinition(); | ||
55 | - insertDefinition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); | ||
56 | - TransactionStatus insertTransaction = transactionManager.getTransaction(insertDefinition); | 48 | + protected EventEntity saveAndGet(EventEntity entity, String insertOrUpdateOnPrimaryKeyConflict, String insertOrUpdateOnUniqueKeyConflict) { |
49 | + EventEntity eventEntity = null; | ||
50 | + TransactionStatus insertTransaction = getTransactionStatus(TransactionDefinition.PROPAGATION_REQUIRED); | ||
57 | try { | 51 | try { |
58 | - processSaveOrUpdate(entity, INSERT_BOOL_STATEMENT, INSERT_STR_STATEMENT, INSERT_LONG_STATEMENT, INSERT_DBL_STATEMENT); | 52 | + eventEntity = processSaveOrUpdate(entity, insertOrUpdateOnPrimaryKeyConflict); |
59 | transactionManager.commit(insertTransaction); | 53 | transactionManager.commit(insertTransaction); |
60 | - } catch (Throwable e) { | 54 | + } catch (Throwable throwable) { |
61 | transactionManager.rollback(insertTransaction); | 55 | transactionManager.rollback(insertTransaction); |
62 | - if (e.getCause() instanceof ConstraintViolationException) { | ||
63 | - log.trace("Insert request leaded in a violation of a defined integrity constraint {} for Entity with entityId {} and entityType {}", e.getMessage(), UUIDConverter.fromString(entity.getId().getEntityId()), entity.getId().getEntityType()); | ||
64 | - DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); | ||
65 | - definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); | ||
66 | - TransactionStatus transaction = transactionManager.getTransaction(definition); | 56 | + if (throwable.getCause() instanceof ConstraintViolationException) { |
57 | + log.trace("Insert request leaded in a violation of a defined integrity constraint {} for Entity with entityId {} and entityType {}", throwable.getMessage(), entity.getEventUid(), entity.getEventType()); | ||
58 | + TransactionStatus transaction = getTransactionStatus(TransactionDefinition.PROPAGATION_REQUIRES_NEW); | ||
67 | try { | 59 | try { |
68 | - processSaveOrUpdate(entity, UPDATE_BOOL_STATEMENT, UPDATE_STR_STATEMENT, UPDATE_LONG_STATEMENT, UPDATE_DBL_STATEMENT); | 60 | + eventEntity = processSaveOrUpdate(entity, insertOrUpdateOnUniqueKeyConflict); |
69 | } catch (Throwable th) { | 61 | } catch (Throwable th) { |
70 | - log.trace("Could not execute the update statement for Entity with entityId {} and entityType {}", UUIDConverter.fromString(entity.getId().getEntityId()), entity.getId().getEntityType()); | 62 | + log.trace("Could not execute the update statement for Entity with entityId {} and entityType {}", entity.getEventUid(), entity.getEventType()); |
71 | transactionManager.rollback(transaction); | 63 | transactionManager.rollback(transaction); |
72 | } | 64 | } |
73 | transactionManager.commit(transaction); | 65 | transactionManager.commit(transaction); |
74 | } else { | 66 | } else { |
75 | - log.trace("Could not execute the insert statement for Entity with entityId {} and entityType {}", UUIDConverter.fromString(entity.getId().getEntityId()), entity.getId().getEntityType()); | 67 | + log.trace("Could not execute the insert statement for Entity with entityId {} and entityType {}", entity.getEventUid(), entity.getEventType()); |
76 | } | 68 | } |
77 | } | 69 | } |
70 | + return eventEntity; | ||
78 | } | 71 | } |
79 | 72 | ||
80 | - private static String getInsertString(String value) { | ||
81 | - return "INSERT INTO attribute_kv (entity_type, entity_id, attribute_type, attribute_key, " + value + ", last_update_ts) VALUES (:entity_type, :entity_id, :attribute_type, :attribute_key, :" + value + ", :last_update_ts)"; | 73 | + @Modifying |
74 | + protected abstract EventEntity doProcessSaveOrUpdate(EventEntity entity, String query); | ||
75 | + | ||
76 | + protected Query getQuery(EventEntity entity, String query) { | ||
77 | + return entityManager.createNativeQuery(query, EventEntity.class) | ||
78 | + .setParameter("id", UUIDConverter.fromTimeUUID(entity.getId())) | ||
79 | + .setParameter("body", entity.getBody().toString()) | ||
80 | + .setParameter("entity_id", entity.getEntityId()) | ||
81 | + .setParameter("entity_type", entity.getEntityType().name()) | ||
82 | + .setParameter("event_type", entity.getEventType()) | ||
83 | + .setParameter("event_uid", entity.getEventUid()) | ||
84 | + .setParameter("tenant_id", entity.getTenantId()); | ||
85 | + } | ||
86 | + | ||
87 | + private EventEntity processSaveOrUpdate(EventEntity entity, String query) { | ||
88 | + return doProcessSaveOrUpdate(entity, query); | ||
82 | } | 89 | } |
83 | 90 | ||
84 | - private static String getUpdateString(String value) { | ||
85 | - return "UPDATE attribute_kv SET " + value + " = :" + value + ", last_update_ts = :last_update_ts" + WHERE_STATEMENT; | 91 | + private TransactionStatus getTransactionStatus(int propagationRequired) { |
92 | + DefaultTransactionDefinition insertDefinition = new DefaultTransactionDefinition(); | ||
93 | + insertDefinition.setPropagationBehavior(propagationRequired); | ||
94 | + return transactionManager.getTransaction(insertDefinition); | ||
86 | } | 95 | } |
87 | } | 96 | } |
1 | +/** | ||
2 | + * Copyright © 2016-2019 The Thingsboard Authors | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.thingsboard.server.dao.sql.event; | ||
17 | + | ||
18 | +import org.springframework.stereotype.Repository; | ||
19 | +import org.thingsboard.server.common.data.UUIDConverter; | ||
20 | +import org.thingsboard.server.dao.model.sql.EventEntity; | ||
21 | +import org.thingsboard.server.dao.util.HsqlDao; | ||
22 | +import org.thingsboard.server.dao.util.SqlDao; | ||
23 | + | ||
24 | +@SqlDao | ||
25 | +@HsqlDao | ||
26 | +@Repository | ||
27 | +public class HsqlEventInsertRepository extends EventInsertRepository { | ||
28 | + | ||
29 | + private static final String P_KEY_CONFLICT_STATEMENT = "(event.id=I.id)"; | ||
30 | + private static final String UNQ_KEY_CONFLICT_STATEMENT = "(event.tenant_id=I.tenant_id AND event.entity_type=I.entity_type AND event.entity_id=I.entity_id AND event.event_type=I.event_type AND event.event_uid=I.event_uid)"; | ||
31 | + | ||
32 | + private static final String INSERT_OR_UPDATE_ON_P_KEY_CONFLICT = getInsertString(P_KEY_CONFLICT_STATEMENT); | ||
33 | + private static final String INSERT_OR_UPDATE_ON_UNQ_KEY_CONFLICT = getInsertString(UNQ_KEY_CONFLICT_STATEMENT); | ||
34 | + | ||
35 | + @Override | ||
36 | + public EventEntity saveOrUpdate(EventEntity entity) { | ||
37 | + return saveAndGet(entity, INSERT_OR_UPDATE_ON_P_KEY_CONFLICT, INSERT_OR_UPDATE_ON_UNQ_KEY_CONFLICT); | ||
38 | + } | ||
39 | + | ||
40 | + @Override | ||
41 | + protected EventEntity doProcessSaveOrUpdate(EventEntity entity, String query) { | ||
42 | + getQuery(entity, query).executeUpdate(); | ||
43 | + return entityManager.find(EventEntity.class, UUIDConverter.fromTimeUUID(entity.getId())); | ||
44 | + } | ||
45 | + | ||
46 | + private static String getInsertString(String conflictStatement) { | ||
47 | + return "MERGE INTO event USING (VALUES :id, :body, :entity_id, :entity_type, :event_type, :event_uid, :tenant_id) I (id, body, entity_id, entity_type, event_type, event_uid, tenant_id) ON " + conflictStatement + " WHEN MATCHED THEN UPDATE SET event.id = I.id, event.body = I.body, event.entity_id = I.entity_id, event.entity_type = I.entity_type, event.event_type = I.event_type, event.event_uid = I.event_uid, event.tenant_id = I.tenant_id" + | ||
48 | + " WHEN NOT MATCHED THEN INSERT (id, body, entity_id, entity_type, event_type, event_uid, tenant_id) VALUES (I.id, I.body, I.entity_id, I.entity_type, I.event_type, I.event_uid, I.tenant_id)"; | ||
49 | + } | ||
50 | +} |
@@ -61,6 +61,9 @@ public class JpaBaseEventDao extends JpaAbstractSearchTimeDao<EventEntity, Event | @@ -61,6 +61,9 @@ public class JpaBaseEventDao extends JpaAbstractSearchTimeDao<EventEntity, Event | ||
61 | @Autowired | 61 | @Autowired |
62 | private EventRepository eventRepository; | 62 | private EventRepository eventRepository; |
63 | 63 | ||
64 | + @Autowired | ||
65 | + private EventInsertRepository eventInsertRepository; | ||
66 | + | ||
64 | @Override | 67 | @Override |
65 | protected Class<EventEntity> getEntityClass() { | 68 | protected Class<EventEntity> getEntityClass() { |
66 | return EventEntity.class; | 69 | return EventEntity.class; |
@@ -147,7 +150,7 @@ public class JpaBaseEventDao extends JpaAbstractSearchTimeDao<EventEntity, Event | @@ -147,7 +150,7 @@ public class JpaBaseEventDao extends JpaAbstractSearchTimeDao<EventEntity, Event | ||
147 | eventRepository.findByTenantIdAndEntityTypeAndEntityId(entity.getTenantId(), entity.getEntityType(), entity.getEntityId()) != null) { | 150 | eventRepository.findByTenantIdAndEntityTypeAndEntityId(entity.getTenantId(), entity.getEntityType(), entity.getEntityId()) != null) { |
148 | return Optional.empty(); | 151 | return Optional.empty(); |
149 | } | 152 | } |
150 | - return Optional.of(DaoUtil.getData(eventRepository.save(entity))); | 153 | + return Optional.of(DaoUtil.getData(eventInsertRepository.saveOrUpdate(entity))); |
151 | } | 154 | } |
152 | 155 | ||
153 | private Specification<EventEntity> getEntityFieldsSpec(UUID tenantId, EntityId entityId, String eventType) { | 156 | private Specification<EventEntity> getEntityFieldsSpec(UUID tenantId, EntityId entityId, String eventType) { |
1 | +/** | ||
2 | + * Copyright © 2016-2019 The Thingsboard Authors | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.thingsboard.server.dao.sql.event; | ||
17 | + | ||
18 | +import lombok.extern.slf4j.Slf4j; | ||
19 | +import org.springframework.stereotype.Repository; | ||
20 | +import org.thingsboard.server.dao.model.sql.EventEntity; | ||
21 | +import org.thingsboard.server.dao.util.PsqlDao; | ||
22 | +import org.thingsboard.server.dao.util.SqlDao; | ||
23 | + | ||
24 | +@Slf4j | ||
25 | +@SqlDao | ||
26 | +@PsqlDao | ||
27 | +@Repository | ||
28 | +public class PsqlEventInsertRepository extends EventInsertRepository { | ||
29 | + | ||
30 | + private static final String P_KEY_CONFLICT_STATEMENT = "(id)"; | ||
31 | + private static final String UNQ_KEY_CONFLICT_STATEMENT = "(tenant_id, entity_type, entity_id, event_type, event_uid)"; | ||
32 | + | ||
33 | + private static final String UPDATE_P_KEY_STATEMENT = "id = :id"; | ||
34 | + private static final String UPDATE_UNQ_KEY_STATEMENT = "tenant_id = :tenant_id, entity_type = :entity_type, entity_id = :entity_id, event_type = :event_type, event_uid = :event_uid"; | ||
35 | + | ||
36 | + private static final String INSERT_OR_UPDATE_ON_P_KEY_CONFLICT = getInsertOrUpdateString(P_KEY_CONFLICT_STATEMENT, UPDATE_UNQ_KEY_STATEMENT); | ||
37 | + private static final String INSERT_OR_UPDATE_ON_UNQ_KEY_CONFLICT = getInsertOrUpdateString(UNQ_KEY_CONFLICT_STATEMENT, UPDATE_P_KEY_STATEMENT); | ||
38 | + | ||
39 | + @Override | ||
40 | + public EventEntity saveOrUpdate(EventEntity entity) { | ||
41 | + return saveAndGet(entity, INSERT_OR_UPDATE_ON_P_KEY_CONFLICT, INSERT_OR_UPDATE_ON_UNQ_KEY_CONFLICT); | ||
42 | + } | ||
43 | + | ||
44 | + @Override | ||
45 | + protected EventEntity doProcessSaveOrUpdate(EventEntity entity, String query) { | ||
46 | + return (EventEntity) getQuery(entity, query).getSingleResult(); | ||
47 | + | ||
48 | + } | ||
49 | + | ||
50 | + private static String getInsertOrUpdateString(String eventKeyStatement, String updateKeyStatement) { | ||
51 | + return "INSERT INTO event (id, body, entity_id, entity_type, event_type, event_uid, tenant_id) VALUES (:id, :body, :entity_id, :entity_type, :event_type, :event_uid, :tenant_id) ON CONFLICT " + eventKeyStatement + " DO UPDATE SET body = :body, " + updateKeyStatement + " returning *"; | ||
52 | + } | ||
53 | +} |