Commit 0851a2fd5783e6eb8cf0eda788b17bb344e6b518

Authored by Igor Kulikov
Committed by GitHub
2 parents 5cf3268d 7f1651c8

Merge pull request #616 from volodymyr-babak/feature/audit-elastic

Added elastic sink. audit log service
... ... @@ -317,3 +317,19 @@ audit_log:
317 317 "user": "${AUDIT_LOG_MASK_USER:W}"
318 318 "rule": "${AUDIT_LOG_MASK_RULE:W}"
319 319 "plugin": "${AUDIT_LOG_MASK_PLUGIN:W}"
  320 + sink:
  321 + # Type of external sink. possible options: none, elasticsearch
  322 + type: "${AUDIT_LOG_SINK_TYPE:none}"
  323 + # Name of the index where audit logs stored
  324 + # Index name could contain next placeholders (not mandatory):
  325 + # @{TENANT} - substituted by tenant ID
  326 + # @{DATE} - substituted by current date in format provided in audit_log.sink.date_format
  327 + index_pattern: "${AUDIT_LOG_SINK_INDEX_PATTERN:@{TENANT}_AUDIT_LOG_@{DATE}}"
  328 + # Date format. Details of the pattern could be found here:
  329 + # https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html
  330 + date_format: "${AUDIT_LOG_SINK_DATE_FORMAT:YYYY.MM.DD}"
  331 + scheme_name: "${AUDIT_LOG_SINK_SCHEME_NAME:http}" # http or https
  332 + host: "${AUDIT_LOG_SINK_HOST:localhost}"
  333 + port: "${AUDIT_LOG_SINK_POST:9200}"
  334 + user_name: "${AUDIT_LOG_SINK_USER_NAME:}"
  335 + password: "${AUDIT_LOG_SINK_PASSWORD:}"
\ No newline at end of file
... ...
... ... @@ -103,7 +103,12 @@
103 103 <dependency>
104 104 <groupId>org.springframework</groupId>
105 105 <artifactId>spring-tx</artifactId>
106   - </dependency>
  106 + </dependency>
  107 + <dependency>
  108 + <groupId>org.springframework</groupId>
  109 + <artifactId>spring-web</artifactId>
  110 + <scope>provided</scope>
  111 + </dependency>
107 112 <dependency>
108 113 <groupId>com.datastax.cassandra</groupId>
109 114 <artifactId>cassandra-driver-core</artifactId>
... ... @@ -190,6 +195,10 @@
190 195 <groupId>redis.clients</groupId>
191 196 <artifactId>jedis</artifactId>
192 197 </dependency>
  198 + <dependency>
  199 + <groupId>org.elasticsearch.client</groupId>
  200 + <artifactId>rest</artifactId>
  201 + </dependency>
193 202 </dependencies>
194 203 <build>
195 204 <plugins>
... ...
... ... @@ -39,6 +39,7 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry;
39 39 import org.thingsboard.server.common.data.page.TimePageData;
40 40 import org.thingsboard.server.common.data.page.TimePageLink;
41 41 import org.thingsboard.server.common.data.security.DeviceCredentials;
  42 +import org.thingsboard.server.dao.audit.sink.AuditLogSink;
42 43 import org.thingsboard.server.dao.entity.EntityService;
43 44 import org.thingsboard.server.dao.exception.DataValidationException;
44 45 import org.thingsboard.server.dao.service.DataValidator;
... ... @@ -69,6 +70,9 @@ public class AuditLogServiceImpl implements AuditLogService {
69 70 @Autowired
70 71 private EntityService entityService;
71 72
  73 + @Autowired
  74 + private AuditLogSink auditLogSink;
  75 +
72 76 @Override
73 77 public TimePageData<AuditLog> findAuditLogsByTenantIdAndCustomerId(TenantId tenantId, CustomerId customerId, TimePageLink pageLink) {
74 78 log.trace("Executing findAuditLogsByTenantIdAndCustomerId [{}], [{}], [{}]", tenantId, customerId, pageLink);
... ... @@ -295,6 +299,9 @@ public class AuditLogServiceImpl implements AuditLogService {
295 299 futures.add(auditLogDao.saveByTenantIdAndEntityId(auditLogEntry));
296 300 futures.add(auditLogDao.saveByTenantIdAndCustomerId(auditLogEntry));
297 301 futures.add(auditLogDao.saveByTenantIdAndUserId(auditLogEntry));
  302 +
  303 + auditLogSink.logAction(auditLogEntry);
  304 +
298 305 return Futures.allAsList(futures);
299 306 }
300 307
... ...
... ... @@ -15,23 +15,20 @@
15 15 */
16 16 package org.thingsboard.server.dao.audit;
17 17
18   -import com.fasterxml.jackson.databind.JsonNode;
19   -import com.google.common.util.concurrent.Futures;
20 18 import com.google.common.util.concurrent.ListenableFuture;
21 19 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  20 +import org.springframework.stereotype.Service;
22 21 import org.thingsboard.server.common.data.BaseData;
23 22 import org.thingsboard.server.common.data.HasName;
24   -import org.thingsboard.server.common.data.User;
25   -import org.thingsboard.server.common.data.audit.ActionStatus;
26 23 import org.thingsboard.server.common.data.audit.ActionType;
27 24 import org.thingsboard.server.common.data.audit.AuditLog;
28 25 import org.thingsboard.server.common.data.id.*;
29 26 import org.thingsboard.server.common.data.page.TimePageData;
30 27 import org.thingsboard.server.common.data.page.TimePageLink;
31 28
32   -import java.util.Collections;
33 29 import java.util.List;
34 30
  31 +@Service
35 32 @ConditionalOnProperty(prefix = "audit_log", value = "enabled", havingValue = "false")
36 33 public class DummyAuditLogServiceImpl implements AuditLogService {
37 34
... ...
  1 +/**
  2 + * Copyright © 2016-2017 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.audit.sink;
  17 +
  18 +import org.thingsboard.server.common.data.audit.AuditLog;
  19 +
  20 +public interface AuditLogSink {
  21 +
  22 + void logAction(AuditLog auditLogEntry);
  23 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2017 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.audit.sink;
  17 +
  18 +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  19 +import org.springframework.stereotype.Component;
  20 +import org.thingsboard.server.common.data.audit.AuditLog;
  21 +
  22 +@Component
  23 +@ConditionalOnProperty(prefix = "audit_log.sink", value = "type", havingValue = "none")
  24 +public class DummyAuditLogSink implements AuditLogSink {
  25 +
  26 + @Override
  27 + public void logAction(AuditLog auditLogEntry) {
  28 + }
  29 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2017 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.audit.sink;
  17 +
  18 +import com.fasterxml.jackson.databind.ObjectMapper;
  19 +import com.fasterxml.jackson.databind.node.ObjectNode;
  20 +import lombok.extern.slf4j.Slf4j;
  21 +import org.apache.commons.lang3.StringUtils;
  22 +import org.apache.http.HttpEntity;
  23 +import org.apache.http.HttpHost;
  24 +import org.apache.http.auth.AuthScope;
  25 +import org.apache.http.auth.UsernamePasswordCredentials;
  26 +import org.apache.http.client.CredentialsProvider;
  27 +import org.apache.http.entity.ContentType;
  28 +import org.apache.http.impl.client.BasicCredentialsProvider;
  29 +import org.apache.http.nio.entity.NStringEntity;
  30 +import org.elasticsearch.client.Response;
  31 +import org.elasticsearch.client.ResponseListener;
  32 +import org.elasticsearch.client.RestClient;
  33 +import org.elasticsearch.client.RestClientBuilder;
  34 +import org.springframework.beans.factory.annotation.Value;
  35 +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  36 +import org.springframework.http.HttpMethod;
  37 +import org.springframework.stereotype.Component;
  38 +import org.thingsboard.server.common.data.audit.AuditLog;
  39 +import org.thingsboard.server.common.data.id.TenantId;
  40 +
  41 +import javax.annotation.PostConstruct;
  42 +import java.time.LocalDateTime;
  43 +import java.time.format.DateTimeFormatter;
  44 +import java.util.Collections;
  45 +
  46 +@Component
  47 +@ConditionalOnProperty(prefix = "audit_log.sink", value = "type", havingValue = "elasticsearch")
  48 +@Slf4j
  49 +public class ElasticsearchAuditLogSink implements AuditLogSink {
  50 +
  51 + private static final String TENANT_PLACEHOLDER = "@{TENANT}";
  52 + private static final String DATE_PLACEHOLDER = "@{DATE}";
  53 + private static final String INDEX_TYPE = "audit_log";
  54 +
  55 + private final ObjectMapper mapper = new ObjectMapper();
  56 +
  57 + @Value("${audit_log.sink.index_pattern}")
  58 + private String indexPattern;
  59 + @Value("${audit_log.sink.scheme_name}")
  60 + private String schemeName;
  61 + @Value("${audit_log.sink.host}")
  62 + private String host;
  63 + @Value("${audit_log.sink.port}")
  64 + private int port;
  65 + @Value("${audit_log.sink.user_name}")
  66 + private String userName;
  67 + @Value("${audit_log.sink.password}")
  68 + private String password;
  69 + @Value("${audit_log.sink.date_format}")
  70 + private String dateFormat;
  71 +
  72 + private RestClient restClient;
  73 +
  74 + @PostConstruct
  75 + public void init() {
  76 + try {
  77 + log.trace("Adding elastic rest endpoint... host [{}], port [{}], scheme name [{}]",
  78 + host, port, schemeName);
  79 + RestClientBuilder builder = RestClient.builder(
  80 + new HttpHost(host, port, schemeName));
  81 +
  82 + if (StringUtils.isNotEmpty(userName) &&
  83 + StringUtils.isNotEmpty(password)) {
  84 + log.trace("...using username [{}] and password ***", userName);
  85 + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
  86 + credentialsProvider.setCredentials(AuthScope.ANY,
  87 + new UsernamePasswordCredentials(userName, password));
  88 + builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
  89 + }
  90 +
  91 + this.restClient = builder.build();
  92 + } catch (Exception e) {
  93 + log.error("Sink init failed!", e);
  94 + throw new RuntimeException(e.getMessage(), e);
  95 + }
  96 + }
  97 +
  98 + @Override
  99 + public void logAction(AuditLog auditLogEntry) {
  100 + String jsonContent = createElasticJsonRecord(auditLogEntry);
  101 +
  102 + HttpEntity entity = new NStringEntity(
  103 + jsonContent,
  104 + ContentType.APPLICATION_JSON);
  105 +
  106 + restClient.performRequestAsync(
  107 + HttpMethod.POST.name(),
  108 + String.format("/%s/%s", getIndexName(auditLogEntry.getTenantId()), INDEX_TYPE),
  109 + Collections.emptyMap(),
  110 + entity,
  111 + responseListener);
  112 + }
  113 +
  114 + private String createElasticJsonRecord(AuditLog auditLog) {
  115 + ObjectNode auditLogNode = mapper.createObjectNode();
  116 + auditLogNode.put("postDate", LocalDateTime.now().toString());
  117 + auditLogNode.put("id", auditLog.getId().getId().toString());
  118 + auditLogNode.put("entityName", auditLog.getEntityName());
  119 + auditLogNode.put("tenantId", auditLog.getTenantId().getId().toString());
  120 + if (auditLog.getCustomerId() != null) {
  121 + auditLogNode.put("customerId", auditLog.getCustomerId().getId().toString());
  122 + }
  123 + auditLogNode.put("entityId", auditLog.getEntityId().getId().toString());
  124 + auditLogNode.put("entityType", auditLog.getEntityId().getEntityType().name());
  125 + auditLogNode.put("userId", auditLog.getUserId().getId().toString());
  126 + auditLogNode.put("userName", auditLog.getUserName());
  127 + auditLogNode.put("actionType", auditLog.getActionType().name());
  128 + if (auditLog.getActionData() != null) {
  129 + auditLogNode.put("actionData", auditLog.getActionData().toString());
  130 + }
  131 + auditLogNode.put("actionStatus", auditLog.getActionStatus().name());
  132 + auditLogNode.put("actionFailureDetails", auditLog.getActionFailureDetails());
  133 + return auditLogNode.toString();
  134 + }
  135 +
  136 + private ResponseListener responseListener = new ResponseListener() {
  137 + @Override
  138 + public void onSuccess(Response response) {
  139 + log.trace("Elasticsearch sink log action method succeeded. Response result [{}]!", response);
  140 + }
  141 +
  142 + @Override
  143 + public void onFailure(Exception exception) {
  144 + log.warn("Elasticsearch sink log action method failed!", exception);
  145 + }
  146 + };
  147 +
  148 + private String getIndexName(TenantId tenantId) {
  149 + String indexName = indexPattern;
  150 + if (indexName.contains(TENANT_PLACEHOLDER) && tenantId != null) {
  151 + indexName = indexName.replace(TENANT_PLACEHOLDER, tenantId.getId().toString());
  152 + }
  153 + if (indexName.contains(DATE_PLACEHOLDER)) {
  154 + LocalDateTime now = LocalDateTime.now();
  155 + DateTimeFormatter formatter = DateTimeFormatter.ofPattern(dateFormat);
  156 + indexName = indexName.replace(DATE_PLACEHOLDER, now.format(formatter));
  157 + }
  158 + return indexName.toLowerCase();
  159 + }
  160 +}
... ...
... ... @@ -7,6 +7,7 @@ updates.enabled=false
7 7 audit_log.enabled=true
8 8 audit_log.by_tenant_partitioning=MONTHS
9 9 audit_log.default_query_period=30
  10 +audit_log.sink.type=none
10 11
11 12 cache.type=caffeine
12 13 #cache.type=redis
... ...
... ... @@ -80,6 +80,7 @@
80 80 <spring-test-dbunit.version>1.2.1</spring-test-dbunit.version>
81 81 <postgresql.driver.version>9.4.1211</postgresql.driver.version>
82 82 <sonar.exclusions>org/thingsboard/server/gen/**/*, org/thingsboard/server/extensions/core/plugin/telemetry/gen/**/*</sonar.exclusions>
  83 + <elasticsearch.version>5.0.2</elasticsearch.version>
83 84 </properties>
84 85
85 86 <modules>
... ... @@ -803,6 +804,11 @@
803 804 <type>exe</type>
804 805 <scope>provided</scope>
805 806 </dependency>
  807 + <dependency>
  808 + <groupId>org.elasticsearch.client</groupId>
  809 + <artifactId>rest</artifactId>
  810 + <version>${elasticsearch.version}</version>
  811 + </dependency>
806 812 </dependencies>
807 813 </dependencyManagement>
808 814
... ...