Java源码示例:org.apache.flink.metrics.reporter.MetricReporter
示例1
@Test
public void testScopeCachingForMultipleReporters() throws Exception {
Configuration config = new Configuration();
config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "!");
MetricRegistryImpl testRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
try {
MetricGroup tmGroup = new TaskManagerMetricGroup(testRegistry, "host", "id");
tmGroup.counter("1");
assertEquals("Reporters were not properly instantiated", 2, testRegistry.getReporters().size());
for (MetricReporter reporter : testRegistry.getReporters()) {
ScopeCheckingTestReporter typedReporter = (ScopeCheckingTestReporter) reporter;
if (typedReporter.failureCause != null) {
throw typedReporter.failureCause;
}
}
} finally {
testRegistry.shutdown().get();
}
}
示例2
@Test
public void testLogicalScopeCachingForMultipleReporters() throws Exception {
Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, LogicalScopeReporter1.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, LogicalScopeReporter2.class.getName());
MetricRegistryImpl testRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
try {
MetricGroup tmGroup = new TaskManagerMetricGroup(testRegistry, "host", "id")
.addGroup("B")
.addGroup("C");
tmGroup.counter("1");
assertEquals("Reporters were not properly instantiated", 2, testRegistry.getReporters().size());
for (MetricReporter reporter : testRegistry.getReporters()) {
ScopeCheckingTestReporter typedReporter = (ScopeCheckingTestReporter) reporter;
if (typedReporter.failureCause != null) {
throw typedReporter.failureCause;
}
}
} finally {
testRegistry.shutdown().get();
}
}
示例3
@Test
public void testConfigurableDelimiterForReportersInGroup() throws Exception {
Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test4." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B");
MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
List<MetricReporter> reporters = registry.getReporters();
((TestReporter8) reporters.get(0)).expectedDelimiter = '_'; //test1 reporter
((TestReporter8) reporters.get(1)).expectedDelimiter = '-'; //test2 reporter
((TestReporter8) reporters.get(2)).expectedDelimiter = GLOBAL_DEFAULT_DELIMITER; //test3 reporter, because 'AA' - not correct delimiter
((TestReporter8) reporters.get(3)).expectedDelimiter = GLOBAL_DEFAULT_DELIMITER; //for test4 reporter use global delimiter
TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "host", "id");
group.counter("C");
group.close();
registry.shutdown().get();
assertEquals(4, TestReporter8.numCorrectDelimitersForRegister);
assertEquals(4, TestReporter8.numCorrectDelimitersForUnregister);
}
示例4
private static Optional<MetricReporter> loadReporter(
final String reporterName,
final Configuration reporterConfig,
final Map<String, MetricReporterFactory> reporterFactories)
throws ClassNotFoundException, IllegalAccessException, InstantiationException {
final String reporterClassName = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
final String factoryClassName = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, null);
if (factoryClassName != null) {
return loadViaFactory(factoryClassName, reporterName, reporterConfig, reporterFactories);
}
if (reporterClassName != null) {
return loadViaReflection(reporterClassName, reporterName, reporterConfig, reporterFactories);
}
LOG.warn("No reporter class nor factory set for reporter {}. Metrics might not be exposed/reported.", reporterName);
return Optional.empty();
}
示例5
private static Optional<MetricReporter> loadViaFactory(
final String factoryClassName,
final String reporterName,
final Configuration reporterConfig,
final Map<String, MetricReporterFactory> reporterFactories) {
MetricReporterFactory factory = reporterFactories.get(factoryClassName);
if (factory == null) {
LOG.warn("The reporter factory ({}) could not be found for reporter {}. Available factories: ", factoryClassName, reporterName, reporterFactories.keySet());
return Optional.empty();
} else {
final MetricConfig metricConfig = new MetricConfig();
reporterConfig.addAllToProperties(metricConfig);
return Optional.of(factory.createMetricReporter(metricConfig));
}
}
示例6
private static Optional<MetricReporter> loadViaReflection(
final String reporterClassName,
final String reporterName,
final Configuration reporterConfig,
final Map<String, MetricReporterFactory> reporterFactories) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
final Class<?> reporterClass = Class.forName(reporterClassName);
final InstantiateViaFactory alternativeFactoryAnnotation = reporterClass.getAnnotation(InstantiateViaFactory.class);
if (alternativeFactoryAnnotation != null) {
final String alternativeFactoryClassName = alternativeFactoryAnnotation.factoryClassName();
LOG.info("The reporter configuration of {} is out-dated (but still supported)." +
" Please configure a factory class instead: '{}{}.{}: {}' to ensure that the configuration" +
" continues to work with future versions.",
reporterName,
ConfigConstants.METRICS_REPORTER_PREFIX,
reporterName,
ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX,
alternativeFactoryClassName);
return loadViaFactory(alternativeFactoryClassName, reporterName, reporterConfig, reporterFactories);
}
return Optional.of((MetricReporter) reporterClass.newInstance());
}
示例7
@Test
public void testLogicalScopeCachingForMultipleReporters() throws Exception {
MetricRegistryImpl testRegistry = new MetricRegistryImpl(
MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
Arrays.asList(
ReporterSetup.forReporter("test1", new LogicalScopeReporter1()),
ReporterSetup.forReporter("test2", new LogicalScopeReporter2())));
try {
MetricGroup tmGroup = new TaskManagerMetricGroup(testRegistry, "host", "id")
.addGroup("B")
.addGroup("C");
tmGroup.counter("1");
assertEquals("Reporters were not properly instantiated", 2, testRegistry.getReporters().size());
for (MetricReporter reporter : testRegistry.getReporters()) {
ScopeCheckingTestReporter typedReporter = (ScopeCheckingTestReporter) reporter;
if (typedReporter.failureCause != null) {
throw typedReporter.failureCause;
}
}
} finally {
testRegistry.shutdown().get();
}
}
示例8
private static List<ReporterSetup> setupReporters(Map<String, MetricReporterFactory> reporterFactories, List<Tuple2<String, Configuration>> reporterConfigurations) {
List<ReporterSetup> reporterSetups = new ArrayList<>(reporterConfigurations.size());
for (Tuple2<String, Configuration> reporterConfiguration: reporterConfigurations) {
String reporterName = reporterConfiguration.f0;
Configuration reporterConfig = reporterConfiguration.f1;
try {
Optional<MetricReporter> metricReporterOptional = loadReporter(reporterName, reporterConfig, reporterFactories);
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new MetricConfig();
reporterConfig.addAllToProperties(metricConfig);
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
}
catch (Throwable t) {
LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t);
}
}
return reporterSetups;
}
示例9
private static Optional<MetricReporter> loadViaReflection(
final String reporterClassName,
final String reporterName,
final Configuration reporterConfig,
final Map<String, MetricReporterFactory> reporterFactories) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
final Class<?> reporterClass = Class.forName(reporterClassName);
final InstantiateViaFactory alternativeFactoryAnnotation = reporterClass.getAnnotation(InstantiateViaFactory.class);
if (alternativeFactoryAnnotation != null) {
final String alternativeFactoryClassName = alternativeFactoryAnnotation.factoryClassName();
LOG.info("The reporter configuration of {} is out-dated (but still supported)." +
" Please configure a factory class instead: '{}{}.{}: {}' to ensure that the configuration" +
" continues to work with future versions.",
reporterName,
ConfigConstants.METRICS_REPORTER_PREFIX,
reporterName,
ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX,
alternativeFactoryClassName);
return loadViaFactory(alternativeFactoryClassName, reporterName, reporterConfig, reporterFactories);
}
return Optional.of((MetricReporter) reporterClass.newInstance());
}
示例10
@Test
public void testLogicalScopeCachingForMultipleReporters() throws Exception {
MetricRegistryImpl testRegistry = new MetricRegistryImpl(
MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
Arrays.asList(
ReporterSetup.forReporter("test1", new LogicalScopeReporter1()),
ReporterSetup.forReporter("test2", new LogicalScopeReporter2())));
try {
MetricGroup tmGroup = new TaskManagerMetricGroup(testRegistry, "host", "id")
.addGroup("B")
.addGroup("C");
tmGroup.counter("1");
assertEquals("Reporters were not properly instantiated", 2, testRegistry.getReporters().size());
for (MetricReporter reporter : testRegistry.getReporters()) {
ScopeCheckingTestReporter typedReporter = (ScopeCheckingTestReporter) reporter;
if (typedReporter.failureCause != null) {
throw typedReporter.failureCause;
}
}
} finally {
testRegistry.shutdown().get();
}
}
示例11
@Test
public void testReporterRegistration() throws Exception {
MetricRegistryImpl metricRegistry = createMetricRegistry();
try {
assertEquals(1, metricRegistry.getReporters().size());
MetricReporter reporter = metricRegistry.getReporters().get(0);
assertTrue(reporter instanceof InfluxdbReporter);
} finally {
metricRegistry.shutdown().get();
}
}
示例12
@Test
public void testScopeCachingForMultipleReporters() throws Exception {
Configuration config = new Configuration();
config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D");
MetricConfig metricConfig1 = new MetricConfig();
metricConfig1.setProperty(ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
MetricConfig metricConfig2 = new MetricConfig();
metricConfig2.setProperty(ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "!");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "!");
MetricRegistryImpl testRegistry = new MetricRegistryImpl(
MetricRegistryConfiguration.fromConfiguration(config),
Arrays.asList(
ReporterSetup.forReporter("test1", metricConfig1, new TestReporter1()),
ReporterSetup.forReporter("test2", metricConfig2, new TestReporter2())));
try {
MetricGroup tmGroup = new TaskManagerMetricGroup(testRegistry, "host", "id");
tmGroup.counter("1");
assertEquals("Reporters were not properly instantiated", 2, testRegistry.getReporters().size());
for (MetricReporter reporter : testRegistry.getReporters()) {
ScopeCheckingTestReporter typedReporter = (ScopeCheckingTestReporter) reporter;
if (typedReporter.failureCause != null) {
throw typedReporter.failureCause;
}
}
} finally {
testRegistry.shutdown().get();
}
}
示例13
@Test
public void testReporterSetupSupplier() throws Exception {
final Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "reporter1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config);
Assert.assertEquals(1, reporterSetups.size());
final ReporterSetup reporterSetup = reporterSetups.get(0);
final MetricReporter metricReporter = reporterSetup.getReporter();
Assert.assertThat(metricReporter, instanceOf(TestReporter1.class));
}
示例14
@Test
public void testReporterRegistration() throws Exception {
MetricRegistryImpl metricRegistry = createMetricRegistry(InfluxdbReporterOptions.RETENTION_POLICY.defaultValue());
try {
assertEquals(1, metricRegistry.getReporters().size());
MetricReporter reporter = metricRegistry.getReporters().get(0);
assertTrue(reporter instanceof InfluxdbReporter);
} finally {
metricRegistry.shutdown().get();
}
}
示例15
private static Optional<MetricReporter> loadReporter(
final String reporterName,
final Configuration reporterConfig,
final Map<String, MetricReporterFactory> reporterFactories)
throws ClassNotFoundException, IllegalAccessException, InstantiationException {
final String reporterClassName = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
final String factoryClassName = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, null);
if (factoryClassName != null) {
return loadViaFactory(factoryClassName, reporterName, reporterConfig, reporterFactories);
}
if (reporterClassName != null) {
final Optional<MetricReporterFactory> interceptingFactory = reporterFactories.values().stream()
.filter(factory -> {
InterceptInstantiationViaReflection annotation = factory.getClass().getAnnotation(InterceptInstantiationViaReflection.class);
return annotation != null && annotation.reporterClassName().equals(reporterClassName);
})
.findAny();
if (interceptingFactory.isPresent()) {
return loadViaFactory(reporterConfig, interceptingFactory.get());
}
return loadViaReflection(reporterClassName, reporterName, reporterConfig, reporterFactories);
}
LOG.warn("No reporter class nor factory set for reporter {}. Metrics might not be exposed/reported.", reporterName);
return Optional.empty();
}
示例16
private static Optional<MetricReporter> loadViaFactory(
final String factoryClassName,
final String reporterName,
final Configuration reporterConfig,
final Map<String, MetricReporterFactory> reporterFactories) {
MetricReporterFactory factory = reporterFactories.get(factoryClassName);
if (factory == null) {
LOG.warn("The reporter factory ({}) could not be found for reporter {}. Available factories: {}.", factoryClassName, reporterName, reporterFactories.keySet());
return Optional.empty();
} else {
return loadViaFactory(reporterConfig, factory);
}
}
示例17
private static Optional<MetricReporter> loadViaFactory(
final Configuration reporterConfig,
final MetricReporterFactory factory) {
final MetricConfig metricConfig = new MetricConfig();
reporterConfig.addAllToProperties(metricConfig);
return Optional.of(factory.createMetricReporter(metricConfig));
}
示例18
@Test
public void testScopeCachingForMultipleReporters() throws Exception {
Configuration config = new Configuration();
config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D");
MetricConfig metricConfig1 = new MetricConfig();
metricConfig1.setProperty(ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
MetricConfig metricConfig2 = new MetricConfig();
metricConfig2.setProperty(ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "!");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "!");
MetricRegistryImpl testRegistry = new MetricRegistryImpl(
MetricRegistryConfiguration.fromConfiguration(config),
Arrays.asList(
ReporterSetup.forReporter("test1", metricConfig1, new TestReporter1()),
ReporterSetup.forReporter("test2", metricConfig2, new TestReporter2())));
try {
MetricGroup tmGroup = new TaskManagerMetricGroup(testRegistry, "host", "id");
tmGroup.counter("1");
assertEquals("Reporters were not properly instantiated", 2, testRegistry.getReporters().size());
for (MetricReporter reporter : testRegistry.getReporters()) {
ScopeCheckingTestReporter typedReporter = (ScopeCheckingTestReporter) reporter;
if (typedReporter.failureCause != null) {
throw typedReporter.failureCause;
}
}
} finally {
testRegistry.shutdown().get();
}
}
示例19
@Test
public void testReporterSetupSupplier() throws Exception {
final Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "reporter1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config, null);
Assert.assertEquals(1, reporterSetups.size());
final ReporterSetup reporterSetup = reporterSetups.get(0);
final MetricReporter metricReporter = reporterSetup.getReporter();
Assert.assertThat(metricReporter, instanceOf(TestReporter1.class));
}
示例20
@Test
public void testReporterRegistration() throws Exception {
MetricRegistryImpl metricRegistry = createMetricRegistry(InfluxdbReporterOptions.RETENTION_POLICY.defaultValue(),
InfluxdbReporterOptions.CONSISTENCY.defaultValue());
try {
assertEquals(1, metricRegistry.getReporters().size());
MetricReporter reporter = metricRegistry.getReporters().get(0);
assertTrue(reporter instanceof InfluxdbReporter);
} finally {
metricRegistry.shutdown().get();
}
}
示例21
/**
* Creates a new MetricRegistry and starts the configured reporter.
*/
public MetricRegistryImpl(MetricRegistryConfiguration config) {
this.maximumFramesize = config.getQueryServiceMessageSizeLimit();
this.scopeFormats = config.getScopeFormats();
this.globalDelimiter = config.getDelimiter();
this.delimiters = new ArrayList<>(10);
this.terminationFuture = new CompletableFuture<>();
this.isShutdown = false;
// second, instantiate any custom configured reporters
this.reporters = new ArrayList<>(4);
List<Tuple2<String, Configuration>> reporterConfigurations = config.getReporterConfigurations();
this.executor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-MetricRegistry"));
this.queryService = null;
this.metricQueryServicePath = null;
if (reporterConfigurations.isEmpty()) {
// no reporters defined
// by default, don't report anything
LOG.info("No metrics reporter configured, no metrics will be exposed/reported.");
} else {
// we have some reporters so
for (Tuple2<String, Configuration> reporterConfiguration: reporterConfigurations) {
String namedReporter = reporterConfiguration.f0;
Configuration reporterConfig = reporterConfiguration.f1;
final String className = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
if (className == null) {
LOG.error("No reporter class set for reporter " + namedReporter + ". Metrics might not be exposed/reported.");
continue;
}
try {
String configuredPeriod = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, null);
TimeUnit timeunit = TimeUnit.SECONDS;
long period = 10;
if (configuredPeriod != null) {
try {
String[] interval = configuredPeriod.split(" ");
period = Long.parseLong(interval[0]);
timeunit = TimeUnit.valueOf(interval[1]);
}
catch (Exception e) {
LOG.error("Cannot parse report interval from config: " + configuredPeriod +
" - please use values like '10 SECONDS' or '500 MILLISECONDS'. " +
"Using default reporting interval.");
}
}
Class<?> reporterClass = Class.forName(className);
MetricReporter reporterInstance = (MetricReporter) reporterClass.newInstance();
MetricConfig metricConfig = new MetricConfig();
reporterConfig.addAllToProperties(metricConfig);
LOG.info("Configuring {} with {}.", namedReporter, metricConfig);
reporterInstance.open(metricConfig);
if (reporterInstance instanceof Scheduled) {
LOG.info("Periodically reporting metrics in intervals of {} {} for reporter {} of type {}.", period, timeunit.name(), namedReporter, className);
executor.scheduleWithFixedDelay(
new MetricRegistryImpl.ReporterTask((Scheduled) reporterInstance), period, period, timeunit);
} else {
LOG.info("Reporting metrics for reporter {} of type {}.", namedReporter, className);
}
reporters.add(reporterInstance);
String delimiterForReporter = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, String.valueOf(globalDelimiter));
if (delimiterForReporter.length() != 1) {
LOG.warn("Failed to parse delimiter '{}' for reporter '{}', using global delimiter '{}'.", delimiterForReporter, namedReporter, globalDelimiter);
delimiterForReporter = String.valueOf(globalDelimiter);
}
this.delimiters.add(delimiterForReporter.charAt(0));
}
catch (Throwable t) {
LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", namedReporter, t);
}
}
}
}
示例22
@VisibleForTesting
public List<MetricReporter> getReporters() {
return reporters;
}
示例23
/**
* Shuts down this registry and the associated {@link MetricReporter}.
*
* <p>NOTE: This operation is asynchronous and returns a future which is completed
* once the shutdown operation has been completed.
*
* @return Future which is completed once the {@link MetricRegistryImpl}
* is shut down.
*/
public CompletableFuture<Void> shutdown() {
synchronized (lock) {
if (isShutdown) {
return terminationFuture;
} else {
isShutdown = true;
final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3);
final Time gracePeriod = Time.seconds(1L);
if (queryService != null) {
final CompletableFuture<Void> queryServiceTerminationFuture = ActorUtils.nonBlockingShutDown(
gracePeriod.toMilliseconds(),
TimeUnit.MILLISECONDS,
queryService);
terminationFutures.add(queryServiceTerminationFuture);
}
Throwable throwable = null;
for (MetricReporter reporter : reporters) {
try {
reporter.close();
} catch (Throwable t) {
throwable = ExceptionUtils.firstOrSuppressed(t, throwable);
}
}
reporters.clear();
if (throwable != null) {
terminationFutures.add(
FutureUtils.completedExceptionally(
new FlinkException("Could not shut down the metric reporters properly.", throwable)));
}
final CompletableFuture<Void> executorShutdownFuture = ExecutorUtils.nonBlockingShutdown(
gracePeriod.toMilliseconds(),
TimeUnit.MILLISECONDS,
executor);
terminationFutures.add(executorShutdownFuture);
FutureUtils
.completeAll(terminationFutures)
.whenComplete(
(Void ignored, Throwable error) -> {
if (error != null) {
terminationFuture.completeExceptionally(error);
} else {
terminationFuture.complete(null);
}
});
return terminationFuture;
}
}
}
示例24
/**
* Tests that the registered metrics' names don't contain invalid characters.
*/
@Test
public void testAddingMetrics() throws Exception {
Configuration configuration = new Configuration();
String taskName = "test\"Ta\"..sk";
String jobName = "testJ\"ob:-!ax..?";
String hostname = "loc<>al\"::host\".:";
String taskManagerId = "tas:kMana::ger";
String counterName = "testCounter";
configuration.setString(
ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
"org.apache.flink.dropwizard.ScheduledDropwizardReporterTest$TestingScheduledDropwizardReporter");
configuration.setString(MetricOptions.SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");
configuration.setString(MetricOptions.SCOPE_DELIMITER, "_");
MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration);
MetricRegistryImpl metricRegistry = new MetricRegistryImpl(metricRegistryConfiguration);
char delimiter = metricRegistry.getDelimiter();
TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(metricRegistry, hostname, taskManagerId);
TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(metricRegistry, tmMetricGroup, new JobID(), jobName);
TaskMetricGroup taskMetricGroup = new TaskMetricGroup(metricRegistry, tmJobMetricGroup, new JobVertexID(), new AbstractID(), taskName, 0, 0);
SimpleCounter myCounter = new SimpleCounter();
com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();
DropwizardMeterWrapper meterWrapper = new DropwizardMeterWrapper(dropwizardMeter);
taskMetricGroup.counter(counterName, myCounter);
taskMetricGroup.meter("meter", meterWrapper);
List<MetricReporter> reporters = metricRegistry.getReporters();
assertTrue(reporters.size() == 1);
MetricReporter metricReporter = reporters.get(0);
assertTrue("Reporter should be of type ScheduledDropwizardReporter", metricReporter instanceof ScheduledDropwizardReporter);
TestingScheduledDropwizardReporter reporter = (TestingScheduledDropwizardReporter) metricReporter;
Map<Counter, String> counters = reporter.getCounters();
assertTrue(counters.containsKey(myCounter));
Map<Meter, String> meters = reporter.getMeters();
assertTrue(meters.containsKey(meterWrapper));
String expectedCounterName = reporter.filterCharacters(hostname)
+ delimiter
+ reporter.filterCharacters(taskManagerId)
+ delimiter
+ reporter.filterCharacters(jobName)
+ delimiter
+ reporter.filterCharacters(counterName);
assertEquals(expectedCounterName, counters.get(myCounter));
metricRegistry.shutdown().get();
}
示例25
/**
* Tests that the registered metrics' names don't contain invalid characters.
*/
@Test
public void testAddingMetrics() throws Exception {
Configuration configuration = new Configuration();
String taskName = "testTask";
String jobName = "testJob:-!ax..?";
String hostname = "local::host:";
String taskManagerId = "tas:kMana::ger";
String counterName = "testCounter";
configuration.setString(
ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
"org.apache.flink.metrics.statsd.StatsDReporterTest$TestingStatsDReporter");
configuration.setString(MetricOptions.SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");
configuration.setString(MetricOptions.SCOPE_DELIMITER, "_");
MetricRegistryImpl metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
char delimiter = metricRegistry.getDelimiter();
TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(metricRegistry, hostname, taskManagerId);
TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(metricRegistry, tmMetricGroup, new JobID(), jobName);
TaskMetricGroup taskMetricGroup = new TaskMetricGroup(metricRegistry, tmJobMetricGroup, new JobVertexID(), new AbstractID(), taskName, 0, 0);
SimpleCounter myCounter = new SimpleCounter();
taskMetricGroup.counter(counterName, myCounter);
List<MetricReporter> reporters = metricRegistry.getReporters();
assertTrue(reporters.size() == 1);
MetricReporter metricReporter = reporters.get(0);
assertTrue("Reporter should be of type StatsDReporter", metricReporter instanceof StatsDReporter);
TestingStatsDReporter reporter = (TestingStatsDReporter) metricReporter;
Map<Counter, String> counters = reporter.getCounters();
assertTrue(counters.containsKey(myCounter));
String expectedCounterName = reporter.filterCharacters(hostname)
+ delimiter
+ reporter.filterCharacters(taskManagerId)
+ delimiter
+ reporter.filterCharacters(jobName)
+ delimiter
+ reporter.filterCharacters(counterName);
assertEquals(expectedCounterName, counters.get(myCounter));
metricRegistry.shutdown().get();
}
示例26
/**
* Verifies that multiple JMXReporters can be started on the same machine and register metrics at the MBeanServer.
*
* @throws Exception if the attribute/mbean could not be found or the test is broken
*/
@Test
public void testPortConflictHandling() throws Exception {
Configuration cfg = new Configuration();
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1.port", "9020-9035");
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2.port", "9020-9035");
MetricRegistryImpl reg = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, "host", "tm");
List<MetricReporter> reporters = reg.getReporters();
assertTrue(reporters.size() == 2);
MetricReporter rep1 = reporters.get(0);
MetricReporter rep2 = reporters.get(1);
Gauge<Integer> g1 = new Gauge<Integer>() {
@Override
public Integer getValue() {
return 1;
}
};
Gauge<Integer> g2 = new Gauge<Integer>() {
@Override
public Integer getValue() {
return 2;
}
};
rep1.notifyOfAddedMetric(g1, "rep1", new FrontMetricGroup<>(0, new TaskManagerMetricGroup(reg, "host", "tm")));
rep2.notifyOfAddedMetric(g2, "rep2", new FrontMetricGroup<>(0, new TaskManagerMetricGroup(reg, "host", "tm")));
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName objectName1 = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager.rep1", JMXReporter.generateJmxTable(mg.getAllVariables()));
ObjectName objectName2 = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager.rep2", JMXReporter.generateJmxTable(mg.getAllVariables()));
assertEquals(1, mBeanServer.getAttribute(objectName1, "Value"));
assertEquals(2, mBeanServer.getAttribute(objectName2, "Value"));
rep1.notifyOfRemovedMetric(g1, "rep1", null);
rep1.notifyOfRemovedMetric(g2, "rep2", null);
mg.close();
reg.shutdown().get();
}
示例27
/**
* Verifies that we can connect to multiple JMXReporters running on the same machine.
*
* @throws Exception
*/
@Test
public void testJMXAvailability() throws Exception {
Configuration cfg = new Configuration();
cfg.setString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName());
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1.port", "9040-9055");
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2.port", "9040-9055");
MetricRegistryImpl reg = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, "host", "tm");
List<MetricReporter> reporters = reg.getReporters();
assertTrue(reporters.size() == 2);
MetricReporter rep1 = reporters.get(0);
MetricReporter rep2 = reporters.get(1);
Gauge<Integer> g1 = new Gauge<Integer>() {
@Override
public Integer getValue() {
return 1;
}
};
Gauge<Integer> g2 = new Gauge<Integer>() {
@Override
public Integer getValue() {
return 2;
}
};
rep1.notifyOfAddedMetric(g1, "rep1", new FrontMetricGroup<>(0, new TaskManagerMetricGroup(reg, "host", "tm")));
rep2.notifyOfAddedMetric(g2, "rep2", new FrontMetricGroup<>(1, new TaskManagerMetricGroup(reg, "host", "tm")));
ObjectName objectName1 = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager.rep1", JMXReporter.generateJmxTable(mg.getAllVariables()));
ObjectName objectName2 = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager.rep2", JMXReporter.generateJmxTable(mg.getAllVariables()));
JMXServiceURL url1 = new JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter) rep1).getPort() + "/jndi/rmi://localhost:" + ((JMXReporter) rep1).getPort() + "/jmxrmi");
JMXConnector jmxCon1 = JMXConnectorFactory.connect(url1);
MBeanServerConnection mCon1 = jmxCon1.getMBeanServerConnection();
assertEquals(1, mCon1.getAttribute(objectName1, "Value"));
assertEquals(2, mCon1.getAttribute(objectName2, "Value"));
jmxCon1.close();
JMXServiceURL url2 = new JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter) rep2).getPort() + "/jndi/rmi://localhost:" + ((JMXReporter) rep2).getPort() + "/jmxrmi");
JMXConnector jmxCon2 = JMXConnectorFactory.connect(url2);
MBeanServerConnection mCon2 = jmxCon2.getMBeanServerConnection();
assertEquals(1, mCon2.getAttribute(objectName1, "Value"));
assertEquals(2, mCon2.getAttribute(objectName2, "Value"));
rep1.notifyOfRemovedMetric(g1, "rep1", null);
rep1.notifyOfRemovedMetric(g2, "rep2", null);
jmxCon2.close();
rep1.close();
rep2.close();
mg.close();
reg.shutdown().get();
}
示例28
public ReporterSetup(final String name, final MetricConfig configuration, MetricReporter reporter) {
this.name = name;
this.configuration = configuration;
this.reporter = reporter;
}
示例29
public MetricReporter getReporter() {
return reporter;
}
示例30
@VisibleForTesting
public static ReporterSetup forReporter(String reporterName, MetricReporter reporter) {
return createReporterSetup(reporterName, new MetricConfig(), reporter);
}