Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.graylog2.inputs.Input;
import org.graylog2.inputs.InputService;
import org.graylog2.plugin.configuration.ConfigurationException;
import org.graylog2.plugin.configuration.ConfigurationRequest;
import org.graylog2.plugin.database.ValidationException;
import org.graylog2.plugin.inputs.transports.NettyTransport;
import org.graylog2.rest.models.system.inputs.requests.InputCreateRequest;
Expand Down Expand Up @@ -85,16 +86,7 @@ public void createInput(Subject subject, String userName, int port) throws Valid
throw new NotFoundException(f("No input of type %s registered", inputType));
}

// Start with the default configuration values for the input type and then only override the port
final var inputConfig = inputDescription.getConfigurationRequest().getFields().entrySet().stream()
.filter(entry -> entry.getValue().getDefaultValue() != null)
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().getDefaultValue(),
(a, b) -> b,
HashMap::new
));
inputConfig.put(NettyTransport.CK_PORT, port);
final var inputConfig = getHttpIngestInputConfig(port, inputDescription.getConfigurationRequest());

try {
final var inputCreateRequest = InputCreateRequest.create(
Expand All @@ -120,4 +112,18 @@ public void createInput(Subject subject, String userName, int port) throws Valid
throw new BadRequestException("Invalid input configuration", e);
}
}

public static Map<String, Object> getHttpIngestInputConfig(int port, ConfigurationRequest configurationRequest) {
// Start with the default configuration values for the input type and then only override the port
final var inputConfig = configurationRequest.getFields().entrySet().stream()
.filter(entry -> entry.getValue().getDefaultValue() != null)
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().getDefaultValue(),
(a, b) -> b,
HashMap::new
));
inputConfig.put(NettyTransport.CK_PORT, port);
return inputConfig;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.collectors;

import jakarta.inject.Inject;
import org.graylog.collectors.opamp.auth.EnrollmentTokenService;

public class CollectorsInitializer {
private final CollectorCaService caService;
private final CollectorLogsDestinationService logsDestinationService;
private final EnrollmentTokenService enrollmentTokenService;
private final CollectorsConfigService configService;

@Inject
public CollectorsInitializer(CollectorCaService caService,
CollectorLogsDestinationService logsDestinationService,
EnrollmentTokenService enrollmentTokenService,
CollectorsConfigService configService) {
this.caService = caService;
this.logsDestinationService = logsDestinationService;
this.enrollmentTokenService = enrollmentTokenService;
this.configService = configService;
}

public CollectorsConfig initialize(CollectorsConfig config) {
final var caHierarchy = caService.initializeCa();
final var tokenSigningKey = createTokenSigningKey();

logsDestinationService.ensureExists();

return config.toBuilder()
.caCertId(caHierarchy.caCert().id())
.signingCertId(caHierarchy.signingCert().id())
.tokenSigningKey(tokenSigningKey)
.otlpServerCertId(caHierarchy.otlpServerCert().id())
.build();
}

private TokenSigningKey createTokenSigningKey() {
try {
return enrollmentTokenService.createTokenSigningKey();
} catch (Exception e) {
throw new IllegalStateException("Could not create token signing key", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.inject.Scopes;
import com.google.inject.multibindings.MapBinder;
import com.google.inject.multibindings.Multibinder;
import org.graylog.collectors.cloud.CloudCollectorIngestService;
import org.graylog.collectors.config.receiver.FilelogReceiverConfig;
import org.graylog.collectors.config.receiver.JournaldReceiverConfig;
import org.graylog.collectors.config.receiver.WindowsEventLogReceiverConfig;
Expand Down Expand Up @@ -46,6 +47,7 @@
import org.graylog.collectors.rest.CollectorsConfigResource;
import org.graylog.collectors.rest.FleetResource;
import org.graylog.collectors.rest.SourceResource;
import org.graylog2.Configuration;
import org.graylog2.database.SequenceTopics;
import org.graylog2.featureflag.FeatureFlags;
import org.graylog2.indexer.template.IndexTemplateProvider;
Expand All @@ -57,10 +59,12 @@ public class CollectorsModule extends PluginModule {

private final boolean collectorsEnabled;
private final boolean otlpDumpEnabled;
private final boolean isCloud;

public CollectorsModule(FeatureFlags featureFlags) {
public CollectorsModule(FeatureFlags featureFlags, Configuration configuration) {
this.collectorsEnabled = featureFlags.isOn(COLLECTORS_FLAG);
this.otlpDumpEnabled = featureFlags.isOn(OTLP_DUMP_FLAG);
this.isCloud = configuration.isCloud();
}

@Override
Expand All @@ -84,6 +88,10 @@ protected void configure() {
addTransport(CollectorIngestHttpTransport.NAME, CollectorIngestHttpTransport.class);
addCodec(CollectorIngestCodec.NAME, CollectorIngestCodec.class);

if (isCloud) {
serviceBinder().addBinding().to(CloudCollectorIngestService.class).in(Scopes.SINGLETON);
}

final var logRecordProcessorBinder = MapBinder.newMapBinder(binder(), String.class, LogRecordProcessor.class);

logRecordProcessorBinder.addBinding(FilelogReceiverConfig.RECEIVER_TYPE).to(FilelogRecordProcessor.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.collectors.cloud;

import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.common.util.concurrent.Uninterruptibles;
import jakarta.inject.Inject;
import org.graylog.collectors.CollectorsConfig;
import org.graylog.collectors.CollectorsConfigService;
import org.graylog.collectors.input.CollectorIngestHttpInput;
import org.graylog2.cluster.ClusterConfigChangedEvent;
import org.graylog2.inputs.ReservedInputIds;
import org.graylog2.plugin.IOState;
import org.graylog2.plugin.InputFailureRecorder;
import org.graylog2.plugin.buffers.InputBuffer;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.inputs.MisfireException;
import org.slf4j.Logger;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import static org.graylog.collectors.CollectorIngestInputService.getHttpIngestInputConfig;
import static org.slf4j.LoggerFactory.getLogger;

public class CloudCollectorIngestService extends AbstractExecutionThreadService {
private static final Logger LOG = getLogger(CloudCollectorIngestService.class);

private final InputBuffer inputBuffer;
private final EventBus eventBus;
private final CollectorsConfigService collectorsConfigService;
private final CollectorIngestHttpInput.Factory httpInputFactory;
private final Semaphore configChanged = new Semaphore(0);

private volatile Thread executionThread;
private CollectorIngestHttpInput input;
private final CountDownLatch shutdownLatch = new CountDownLatch(1);

@Inject
public CloudCollectorIngestService(InputBuffer inputBuffer,
EventBus eventBus,
CollectorsConfigService collectorsConfigService,
CollectorIngestHttpInput.Factory httpInputFactory) {
this.inputBuffer = inputBuffer;
this.eventBus = eventBus;
this.collectorsConfigService = collectorsConfigService;
this.httpInputFactory = httpInputFactory;
}

@Subscribe
public void handleClusterConfigChanged(ClusterConfigChangedEvent event) {
// At the moment we only care about the initial config being present, which means the collectors features
// was enabled. If we need to react to actual changes, the code below needs to be adjusted.
if (CollectorsConfig.class.getCanonicalName().equals(event.type())) {
configChanged.release();
}
}

@Override
protected void startUp() throws Exception {
this.executionThread = Thread.currentThread();
eventBus.register(this);
}

@Override
protected void run() throws Exception {
final var config = waitForCollectorConfig();
if (config == null) {
return; // shutdown requested before config became available
}

final var retryer = RetryerBuilder.<Void>newBuilder()
.retryIfException(t -> t instanceof MisfireException)
.withStopStrategy(StopStrategies.neverStop())
.withWaitStrategy(WaitStrategies.exponentialWait(1, TimeUnit.MINUTES))
.build();

try {
retryer.call(() -> {
launchInput(config);
LOG.info("Collector Ingest on [{}:{}] launched successfully.", config.http().hostname(),
config.http().port());
return null;
});
} catch (Exception e) {
if (!isRunning()) {
return; // shutdown requested
}
throw new RuntimeException("Failed to launch Collector Ingest Input", e);
}

Uninterruptibles.awaitUninterruptibly(shutdownLatch);
}

@Override
protected void triggerShutdown() {
shutdownLatch.countDown();
if (executionThread != null) {
executionThread.interrupt();
}
}

@Override
protected void shutDown() throws Exception {
if (input != null) {
input.stop();
}
try {
eventBus.unregister(this);
} catch (Exception e) {
// Ignore. We might have already unregistered after receiving the initial config.
}
}

private CollectorsConfig waitForCollectorConfig() {
while (isRunning()) {
final var config = collectorsConfigService.get();
if (config.isPresent()) {
eventBus.unregister(this);
return config.get();
}
try {
configChanged.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null; // shutting down
}
configChanged.drainPermits();
}
return null;
}

private void launchInput(CollectorsConfig collectorsConfig) throws MisfireException {
final var input = createInput(collectorsConfig);
// A failure recorder that won't propagate state to the global event bus, because it has its own private copy
final var sideEffectFreeFailureRecorder = new InputFailureRecorder(new IOState<>(new EventBus(), input));

input.initialize();
input.launch(inputBuffer, sideEffectFreeFailureRecorder);

this.input = input;
}

private CollectorIngestHttpInput createInput(CollectorsConfig collectorsConfig) {
final var inputConfig = new Configuration(getHttpIngestInputConfig(
collectorsConfig.http().port(), httpInputFactory.getConfig().combinedRequestedConfiguration()));
final var input = httpInputFactory.create(inputConfig);
input.setPersistId(ReservedInputIds.EPHEMERAL_COLLECTOR_INGEST);
input.setTitle("Managed Collector Ingest");
return input;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.graylog2.rest.resources.entities.EntityDefaults;
import org.graylog2.rest.resources.entities.Sorting;
import org.graylog2.search.SearchQueryField;
import org.graylog2.shared.rest.PublicCloudAPI;
import org.graylog2.shared.rest.resources.RestResource;

import java.util.ArrayList;
Expand All @@ -79,6 +80,7 @@
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@RequiresAuthentication
@PublicCloudAPI
public class EnrollmentTokenResource extends RestResource {

private static final String DEFAULT_SORT_FIELD = EnrollmentTokenDTO.FIELD_CREATED_AT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.graylog.collectors.rest.RecentActivityResponse.ActivityDetails;
import org.graylog.collectors.rest.RecentActivityResponse.FleetReassignedDetails;
import org.graylog.collectors.rest.RecentActivityResponse.TargetInfo;
import org.graylog2.shared.rest.PublicCloudAPI;
import org.graylog2.shared.rest.resources.RestResource;
import org.graylog2.shared.security.RestPermissions;
import org.graylog2.shared.users.UserService;
Expand All @@ -59,6 +60,7 @@
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@RequiresAuthentication
@PublicCloudAPI
public class CollectorsActivityResource extends RestResource {

private static final int RECENT_ACTIVITY_LIMIT = 20;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import org.graylog.collectors.CollectorsConfig;
import org.graylog.collectors.IngestEndpointConfig;

import java.time.Duration;
Expand All @@ -44,4 +45,18 @@ public IngestEndpointConfig toConfig() {
return new IngestEndpointConfig(hostname(), port());
}
}

public CollectorsConfig.Builder applyTo(CollectorsConfig.Builder configBuilder) {
configBuilder.http(http.toConfig());
if (collectorOfflineThreshold() != null) {
configBuilder.collectorOfflineThreshold(collectorOfflineThreshold());
}
if (collectorDefaultVisibilityThreshold() != null) {
configBuilder.collectorDefaultVisibilityThreshold(collectorDefaultVisibilityThreshold());
}
if (collectorExpirationThreshold() != null) {
configBuilder.collectorExpirationThreshold(collectorExpirationThreshold());
}
return configBuilder;
}
}
Loading
Loading