Java源码示例:org.apache.flink.metrics.reporter.MetricReporter

示例1
@Test
public void testScopeCachingForMultipleReporters() throws Exception {
	Configuration config = new Configuration();
	config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D");
	config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
	config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
	config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName());
	config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "!");

	MetricRegistryImpl testRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
	try {
		MetricGroup tmGroup = new TaskManagerMetricGroup(testRegistry, "host", "id");
		tmGroup.counter("1");
		assertEquals("Reporters were not properly instantiated", 2, testRegistry.getReporters().size());
		for (MetricReporter reporter : testRegistry.getReporters()) {
			ScopeCheckingTestReporter typedReporter = (ScopeCheckingTestReporter) reporter;
			if (typedReporter.failureCause != null) {
				throw typedReporter.failureCause;
			}
		}
	} finally {
		testRegistry.shutdown().get();
	}
}
 
示例2
@Test
public void testLogicalScopeCachingForMultipleReporters() throws Exception {
	Configuration config = new Configuration();
	config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, LogicalScopeReporter1.class.getName());
	config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, LogicalScopeReporter2.class.getName());

	MetricRegistryImpl testRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
	try {
		MetricGroup tmGroup = new TaskManagerMetricGroup(testRegistry, "host", "id")
			.addGroup("B")
			.addGroup("C");
		tmGroup.counter("1");
		assertEquals("Reporters were not properly instantiated", 2, testRegistry.getReporters().size());
		for (MetricReporter reporter : testRegistry.getReporters()) {
			ScopeCheckingTestReporter typedReporter = (ScopeCheckingTestReporter) reporter;
			if (typedReporter.failureCause != null) {
				throw typedReporter.failureCause;
			}
		}
	} finally {
		testRegistry.shutdown().get();
	}
}
 
示例3
@Test
public void testConfigurableDelimiterForReportersInGroup() throws Exception {
	Configuration config = new Configuration();
	config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
	config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
	config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
	config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
	config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA");
	config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
	config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test4." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
	config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B");

	MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
	List<MetricReporter> reporters = registry.getReporters();
	((TestReporter8) reporters.get(0)).expectedDelimiter = '_'; //test1  reporter
	((TestReporter8) reporters.get(1)).expectedDelimiter = '-'; //test2 reporter
	((TestReporter8) reporters.get(2)).expectedDelimiter = GLOBAL_DEFAULT_DELIMITER; //test3 reporter, because 'AA' - not correct delimiter
	((TestReporter8) reporters.get(3)).expectedDelimiter = GLOBAL_DEFAULT_DELIMITER; //for test4 reporter use global delimiter

	TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "host", "id");
	group.counter("C");
	group.close();
	registry.shutdown().get();
	assertEquals(4, TestReporter8.numCorrectDelimitersForRegister);
	assertEquals(4, TestReporter8.numCorrectDelimitersForUnregister);
}
 
示例4
private static Optional<MetricReporter> loadReporter(
		final String reporterName,
		final Configuration reporterConfig,
		final Map<String, MetricReporterFactory> reporterFactories)
		throws ClassNotFoundException, IllegalAccessException, InstantiationException {

	final String reporterClassName = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
	final String factoryClassName = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, null);

	if (factoryClassName != null) {
		return loadViaFactory(factoryClassName, reporterName, reporterConfig, reporterFactories);
	}

	if (reporterClassName != null) {
		return loadViaReflection(reporterClassName, reporterName, reporterConfig, reporterFactories);
	}

	LOG.warn("No reporter class nor factory set for reporter {}. Metrics might not be exposed/reported.", reporterName);
	return Optional.empty();
}
 
示例5
private static Optional<MetricReporter> loadViaFactory(
		final String factoryClassName,
		final String reporterName,
		final Configuration reporterConfig,
		final Map<String, MetricReporterFactory> reporterFactories) {

	MetricReporterFactory factory = reporterFactories.get(factoryClassName);

	if (factory == null) {
		LOG.warn("The reporter factory ({}) could not be found for reporter {}. Available factories: ", factoryClassName, reporterName, reporterFactories.keySet());
		return Optional.empty();
	} else {
		final MetricConfig metricConfig = new MetricConfig();
		reporterConfig.addAllToProperties(metricConfig);

		return Optional.of(factory.createMetricReporter(metricConfig));
	}
}
 
示例6
private static Optional<MetricReporter> loadViaReflection(
		final String reporterClassName,
		final String reporterName,
		final Configuration reporterConfig,
		final Map<String, MetricReporterFactory> reporterFactories) throws ClassNotFoundException, IllegalAccessException, InstantiationException {

	final Class<?> reporterClass = Class.forName(reporterClassName);

	final InstantiateViaFactory alternativeFactoryAnnotation = reporterClass.getAnnotation(InstantiateViaFactory.class);
	if (alternativeFactoryAnnotation != null) {
		final String alternativeFactoryClassName = alternativeFactoryAnnotation.factoryClassName();
		LOG.info("The reporter configuration of {} is out-dated (but still supported)." +
				" Please configure a factory class instead: '{}{}.{}: {}' to ensure that the configuration" +
				" continues to work with future versions.",
			reporterName,
			ConfigConstants.METRICS_REPORTER_PREFIX,
			reporterName,
			ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX,
			alternativeFactoryClassName);
		return loadViaFactory(alternativeFactoryClassName, reporterName, reporterConfig, reporterFactories);
	}

	return Optional.of((MetricReporter) reporterClass.newInstance());
}
 
示例7
@Test
public void testLogicalScopeCachingForMultipleReporters() throws Exception {
	MetricRegistryImpl testRegistry = new MetricRegistryImpl(
		MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
		Arrays.asList(
			ReporterSetup.forReporter("test1", new LogicalScopeReporter1()),
			ReporterSetup.forReporter("test2", new LogicalScopeReporter2())));
	try {
		MetricGroup tmGroup = new TaskManagerMetricGroup(testRegistry, "host", "id")
			.addGroup("B")
			.addGroup("C");
		tmGroup.counter("1");
		assertEquals("Reporters were not properly instantiated", 2, testRegistry.getReporters().size());
		for (MetricReporter reporter : testRegistry.getReporters()) {
			ScopeCheckingTestReporter typedReporter = (ScopeCheckingTestReporter) reporter;
			if (typedReporter.failureCause != null) {
				throw typedReporter.failureCause;
			}
		}
	} finally {
		testRegistry.shutdown().get();
	}
}
 
示例8
private static List<ReporterSetup> setupReporters(Map<String, MetricReporterFactory> reporterFactories, List<Tuple2<String, Configuration>> reporterConfigurations) {
	List<ReporterSetup> reporterSetups = new ArrayList<>(reporterConfigurations.size());
	for (Tuple2<String, Configuration> reporterConfiguration: reporterConfigurations) {
		String reporterName = reporterConfiguration.f0;
		Configuration reporterConfig = reporterConfiguration.f1;

		try {
			Optional<MetricReporter> metricReporterOptional = loadReporter(reporterName, reporterConfig, reporterFactories);
			metricReporterOptional.ifPresent(reporter -> {
				MetricConfig metricConfig = new MetricConfig();
				reporterConfig.addAllToProperties(metricConfig);
				reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
			});
		}
		catch (Throwable t) {
			LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t);
		}
	}
	return reporterSetups;
}
 
示例9
private static Optional<MetricReporter> loadViaReflection(
		final String reporterClassName,
		final String reporterName,
		final Configuration reporterConfig,
		final Map<String, MetricReporterFactory> reporterFactories) throws ClassNotFoundException, IllegalAccessException, InstantiationException {

	final Class<?> reporterClass = Class.forName(reporterClassName);

	final InstantiateViaFactory alternativeFactoryAnnotation = reporterClass.getAnnotation(InstantiateViaFactory.class);
	if (alternativeFactoryAnnotation != null) {
		final String alternativeFactoryClassName = alternativeFactoryAnnotation.factoryClassName();
		LOG.info("The reporter configuration of {} is out-dated (but still supported)." +
				" Please configure a factory class instead: '{}{}.{}: {}' to ensure that the configuration" +
				" continues to work with future versions.",
			reporterName,
			ConfigConstants.METRICS_REPORTER_PREFIX,
			reporterName,
			ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX,
			alternativeFactoryClassName);
		return loadViaFactory(alternativeFactoryClassName, reporterName, reporterConfig, reporterFactories);
	}

	return Optional.of((MetricReporter) reporterClass.newInstance());
}
 
示例10
@Test
public void testLogicalScopeCachingForMultipleReporters() throws Exception {
	MetricRegistryImpl testRegistry = new MetricRegistryImpl(
		MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
		Arrays.asList(
			ReporterSetup.forReporter("test1", new LogicalScopeReporter1()),
			ReporterSetup.forReporter("test2", new LogicalScopeReporter2())));
	try {
		MetricGroup tmGroup = new TaskManagerMetricGroup(testRegistry, "host", "id")
			.addGroup("B")
			.addGroup("C");
		tmGroup.counter("1");
		assertEquals("Reporters were not properly instantiated", 2, testRegistry.getReporters().size());
		for (MetricReporter reporter : testRegistry.getReporters()) {
			ScopeCheckingTestReporter typedReporter = (ScopeCheckingTestReporter) reporter;
			if (typedReporter.failureCause != null) {
				throw typedReporter.failureCause;
			}
		}
	} finally {
		testRegistry.shutdown().get();
	}
}
 
示例11
@Test
public void testReporterRegistration() throws Exception {
	MetricRegistryImpl metricRegistry = createMetricRegistry();
	try {
		assertEquals(1, metricRegistry.getReporters().size());
		MetricReporter reporter = metricRegistry.getReporters().get(0);
		assertTrue(reporter instanceof InfluxdbReporter);
	} finally {
		metricRegistry.shutdown().get();
	}
}
 
示例12
@Test
public void testScopeCachingForMultipleReporters() throws Exception {
	Configuration config = new Configuration();
	config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D");

	MetricConfig metricConfig1 = new MetricConfig();
	metricConfig1.setProperty(ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");

	MetricConfig metricConfig2 = new MetricConfig();
	metricConfig2.setProperty(ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "!");

	config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
	config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
	config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName());
	config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "!");

	MetricRegistryImpl testRegistry = new MetricRegistryImpl(
		MetricRegistryConfiguration.fromConfiguration(config),
		Arrays.asList(
			ReporterSetup.forReporter("test1", metricConfig1, new TestReporter1()),
			ReporterSetup.forReporter("test2", metricConfig2, new TestReporter2())));
	try {
		MetricGroup tmGroup = new TaskManagerMetricGroup(testRegistry, "host", "id");
		tmGroup.counter("1");
		assertEquals("Reporters were not properly instantiated", 2, testRegistry.getReporters().size());
		for (MetricReporter reporter : testRegistry.getReporters()) {
			ScopeCheckingTestReporter typedReporter = (ScopeCheckingTestReporter) reporter;
			if (typedReporter.failureCause != null) {
				throw typedReporter.failureCause;
			}
		}
	} finally {
		testRegistry.shutdown().get();
	}
}
 
示例13
@Test
public void testReporterSetupSupplier() throws Exception {
	final Configuration config = new Configuration();

	config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "reporter1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());

	final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config);

	Assert.assertEquals(1, reporterSetups.size());

	final ReporterSetup reporterSetup = reporterSetups.get(0);
	final MetricReporter metricReporter = reporterSetup.getReporter();
	Assert.assertThat(metricReporter, instanceOf(TestReporter1.class));
}
 
示例14
@Test
public void testReporterRegistration() throws Exception {
	MetricRegistryImpl metricRegistry = createMetricRegistry(InfluxdbReporterOptions.RETENTION_POLICY.defaultValue());
	try {
		assertEquals(1, metricRegistry.getReporters().size());
		MetricReporter reporter = metricRegistry.getReporters().get(0);
		assertTrue(reporter instanceof InfluxdbReporter);
	} finally {
		metricRegistry.shutdown().get();
	}
}
 
示例15
private static Optional<MetricReporter> loadReporter(
		final String reporterName,
		final Configuration reporterConfig,
		final Map<String, MetricReporterFactory> reporterFactories)
		throws ClassNotFoundException, IllegalAccessException, InstantiationException {

	final String reporterClassName = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
	final String factoryClassName = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, null);

	if (factoryClassName != null) {
		return loadViaFactory(factoryClassName, reporterName, reporterConfig, reporterFactories);
	}

	if (reporterClassName != null) {
		final Optional<MetricReporterFactory> interceptingFactory = reporterFactories.values().stream()
			.filter(factory -> {
				InterceptInstantiationViaReflection annotation = factory.getClass().getAnnotation(InterceptInstantiationViaReflection.class);
				return annotation != null && annotation.reporterClassName().equals(reporterClassName);
			})
			.findAny();

		if (interceptingFactory.isPresent()) {
			return loadViaFactory(reporterConfig, interceptingFactory.get());
		}

		return loadViaReflection(reporterClassName, reporterName, reporterConfig, reporterFactories);
	}

	LOG.warn("No reporter class nor factory set for reporter {}. Metrics might not be exposed/reported.", reporterName);
	return Optional.empty();
}
 
示例16
private static Optional<MetricReporter> loadViaFactory(
		final String factoryClassName,
		final String reporterName,
		final Configuration reporterConfig,
		final Map<String, MetricReporterFactory> reporterFactories) {

	MetricReporterFactory factory = reporterFactories.get(factoryClassName);

	if (factory == null) {
		LOG.warn("The reporter factory ({}) could not be found for reporter {}. Available factories: {}.", factoryClassName, reporterName, reporterFactories.keySet());
		return Optional.empty();
	} else {
		return loadViaFactory(reporterConfig, factory);
	}
}
 
示例17
private static Optional<MetricReporter> loadViaFactory(
	final Configuration reporterConfig,
	final MetricReporterFactory factory) {

	final MetricConfig metricConfig = new MetricConfig();
	reporterConfig.addAllToProperties(metricConfig);

	return Optional.of(factory.createMetricReporter(metricConfig));
}
 
示例18
@Test
public void testScopeCachingForMultipleReporters() throws Exception {
	Configuration config = new Configuration();
	config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D");

	MetricConfig metricConfig1 = new MetricConfig();
	metricConfig1.setProperty(ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");

	MetricConfig metricConfig2 = new MetricConfig();
	metricConfig2.setProperty(ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "!");

	config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
	config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
	config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName());
	config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "!");

	MetricRegistryImpl testRegistry = new MetricRegistryImpl(
		MetricRegistryConfiguration.fromConfiguration(config),
		Arrays.asList(
			ReporterSetup.forReporter("test1", metricConfig1, new TestReporter1()),
			ReporterSetup.forReporter("test2", metricConfig2, new TestReporter2())));
	try {
		MetricGroup tmGroup = new TaskManagerMetricGroup(testRegistry, "host", "id");
		tmGroup.counter("1");
		assertEquals("Reporters were not properly instantiated", 2, testRegistry.getReporters().size());
		for (MetricReporter reporter : testRegistry.getReporters()) {
			ScopeCheckingTestReporter typedReporter = (ScopeCheckingTestReporter) reporter;
			if (typedReporter.failureCause != null) {
				throw typedReporter.failureCause;
			}
		}
	} finally {
		testRegistry.shutdown().get();
	}
}
 
示例19
@Test
public void testReporterSetupSupplier() throws Exception {
	final Configuration config = new Configuration();

	config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "reporter1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());

	final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config, null);

	Assert.assertEquals(1, reporterSetups.size());

	final ReporterSetup reporterSetup = reporterSetups.get(0);
	final MetricReporter metricReporter = reporterSetup.getReporter();
	Assert.assertThat(metricReporter, instanceOf(TestReporter1.class));
}
 
示例20
@Test
public void testReporterRegistration() throws Exception {
	MetricRegistryImpl metricRegistry = createMetricRegistry(InfluxdbReporterOptions.RETENTION_POLICY.defaultValue(),
		InfluxdbReporterOptions.CONSISTENCY.defaultValue());
	try {
		assertEquals(1, metricRegistry.getReporters().size());
		MetricReporter reporter = metricRegistry.getReporters().get(0);
		assertTrue(reporter instanceof InfluxdbReporter);
	} finally {
		metricRegistry.shutdown().get();
	}
}
 
示例21
/**
 * Creates a new MetricRegistry and starts the configured reporter.
 */
public MetricRegistryImpl(MetricRegistryConfiguration config) {
	this.maximumFramesize = config.getQueryServiceMessageSizeLimit();
	this.scopeFormats = config.getScopeFormats();
	this.globalDelimiter = config.getDelimiter();
	this.delimiters = new ArrayList<>(10);
	this.terminationFuture = new CompletableFuture<>();
	this.isShutdown = false;

	// second, instantiate any custom configured reporters
	this.reporters = new ArrayList<>(4);

	List<Tuple2<String, Configuration>> reporterConfigurations = config.getReporterConfigurations();

	this.executor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-MetricRegistry"));

	this.queryService = null;
	this.metricQueryServicePath = null;

	if (reporterConfigurations.isEmpty()) {
		// no reporters defined
		// by default, don't report anything
		LOG.info("No metrics reporter configured, no metrics will be exposed/reported.");
	} else {
		// we have some reporters so
		for (Tuple2<String, Configuration> reporterConfiguration: reporterConfigurations) {
			String namedReporter = reporterConfiguration.f0;
			Configuration reporterConfig = reporterConfiguration.f1;

			final String className = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
			if (className == null) {
				LOG.error("No reporter class set for reporter " + namedReporter + ". Metrics might not be exposed/reported.");
				continue;
			}

			try {
				String configuredPeriod = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, null);
				TimeUnit timeunit = TimeUnit.SECONDS;
				long period = 10;

				if (configuredPeriod != null) {
					try {
						String[] interval = configuredPeriod.split(" ");
						period = Long.parseLong(interval[0]);
						timeunit = TimeUnit.valueOf(interval[1]);
					}
					catch (Exception e) {
						LOG.error("Cannot parse report interval from config: " + configuredPeriod +
								" - please use values like '10 SECONDS' or '500 MILLISECONDS'. " +
								"Using default reporting interval.");
					}
				}

				Class<?> reporterClass = Class.forName(className);
				MetricReporter reporterInstance = (MetricReporter) reporterClass.newInstance();

				MetricConfig metricConfig = new MetricConfig();
				reporterConfig.addAllToProperties(metricConfig);
				LOG.info("Configuring {} with {}.", namedReporter, metricConfig);
				reporterInstance.open(metricConfig);

				if (reporterInstance instanceof Scheduled) {
					LOG.info("Periodically reporting metrics in intervals of {} {} for reporter {} of type {}.", period, timeunit.name(), namedReporter, className);

					executor.scheduleWithFixedDelay(
							new MetricRegistryImpl.ReporterTask((Scheduled) reporterInstance), period, period, timeunit);
				} else {
					LOG.info("Reporting metrics for reporter {} of type {}.", namedReporter, className);
				}
				reporters.add(reporterInstance);

				String delimiterForReporter = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, String.valueOf(globalDelimiter));
				if (delimiterForReporter.length() != 1) {
					LOG.warn("Failed to parse delimiter '{}' for reporter '{}', using global delimiter '{}'.", delimiterForReporter, namedReporter, globalDelimiter);
					delimiterForReporter = String.valueOf(globalDelimiter);
				}
				this.delimiters.add(delimiterForReporter.charAt(0));
			}
			catch (Throwable t) {
				LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", namedReporter, t);
			}
		}
	}
}
 
示例22
@VisibleForTesting
public List<MetricReporter> getReporters() {
	return reporters;
}
 
示例23
/**
 * Shuts down this registry and the associated {@link MetricReporter}.
 *
 * <p>NOTE: This operation is asynchronous and returns a future which is completed
 * once the shutdown operation has been completed.
 *
 * @return Future which is completed once the {@link MetricRegistryImpl}
 * is shut down.
 */
public CompletableFuture<Void> shutdown() {
	synchronized (lock) {
		if (isShutdown) {
			return terminationFuture;
		} else {
			isShutdown = true;
			final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3);
			final Time gracePeriod = Time.seconds(1L);

			if (queryService != null) {
				final CompletableFuture<Void> queryServiceTerminationFuture = ActorUtils.nonBlockingShutDown(
					gracePeriod.toMilliseconds(),
					TimeUnit.MILLISECONDS,
					queryService);

				terminationFutures.add(queryServiceTerminationFuture);
			}

			Throwable throwable = null;
			for (MetricReporter reporter : reporters) {
				try {
					reporter.close();
				} catch (Throwable t) {
					throwable = ExceptionUtils.firstOrSuppressed(t, throwable);
				}
			}
			reporters.clear();

			if (throwable != null) {
				terminationFutures.add(
					FutureUtils.completedExceptionally(
						new FlinkException("Could not shut down the metric reporters properly.", throwable)));
			}

			final CompletableFuture<Void> executorShutdownFuture = ExecutorUtils.nonBlockingShutdown(
				gracePeriod.toMilliseconds(),
				TimeUnit.MILLISECONDS,
				executor);

			terminationFutures.add(executorShutdownFuture);

			FutureUtils
				.completeAll(terminationFutures)
				.whenComplete(
					(Void ignored, Throwable error) -> {
						if (error != null) {
							terminationFuture.completeExceptionally(error);
						} else {
							terminationFuture.complete(null);
						}
					});

			return terminationFuture;
		}
	}
}
 
示例24
/**
 * Tests that the registered metrics' names don't contain invalid characters.
 */
@Test
public void testAddingMetrics() throws Exception {
	Configuration configuration = new Configuration();
	String taskName = "test\"Ta\"..sk";
	String jobName = "testJ\"ob:-!ax..?";
	String hostname = "loc<>al\"::host\".:";
	String taskManagerId = "tas:kMana::ger";
	String counterName = "testCounter";

	configuration.setString(
			ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
			"org.apache.flink.dropwizard.ScheduledDropwizardReporterTest$TestingScheduledDropwizardReporter");

	configuration.setString(MetricOptions.SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");
	configuration.setString(MetricOptions.SCOPE_DELIMITER, "_");

	MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration);

	MetricRegistryImpl metricRegistry = new MetricRegistryImpl(metricRegistryConfiguration);

	char delimiter = metricRegistry.getDelimiter();

	TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(metricRegistry, hostname, taskManagerId);
	TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(metricRegistry, tmMetricGroup, new JobID(), jobName);
	TaskMetricGroup taskMetricGroup = new TaskMetricGroup(metricRegistry, tmJobMetricGroup, new JobVertexID(), new AbstractID(), taskName, 0, 0);

	SimpleCounter myCounter = new SimpleCounter();
	com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();
	DropwizardMeterWrapper meterWrapper = new DropwizardMeterWrapper(dropwizardMeter);

	taskMetricGroup.counter(counterName, myCounter);
	taskMetricGroup.meter("meter", meterWrapper);

	List<MetricReporter> reporters = metricRegistry.getReporters();

	assertTrue(reporters.size() == 1);
	MetricReporter metricReporter = reporters.get(0);

	assertTrue("Reporter should be of type ScheduledDropwizardReporter", metricReporter instanceof ScheduledDropwizardReporter);

	TestingScheduledDropwizardReporter reporter = (TestingScheduledDropwizardReporter) metricReporter;

	Map<Counter, String> counters = reporter.getCounters();
	assertTrue(counters.containsKey(myCounter));

	Map<Meter, String> meters = reporter.getMeters();
	assertTrue(meters.containsKey(meterWrapper));

	String expectedCounterName = reporter.filterCharacters(hostname)
		+ delimiter
		+ reporter.filterCharacters(taskManagerId)
		+ delimiter
		+ reporter.filterCharacters(jobName)
		+ delimiter
		+ reporter.filterCharacters(counterName);

	assertEquals(expectedCounterName, counters.get(myCounter));

	metricRegistry.shutdown().get();
}
 
示例25
/**
 * Tests that the registered metrics' names don't contain invalid characters.
 */
@Test
public void testAddingMetrics() throws Exception {
	Configuration configuration = new Configuration();
	String taskName = "testTask";
	String jobName = "testJob:-!ax..?";
	String hostname = "local::host:";
	String taskManagerId = "tas:kMana::ger";
	String counterName = "testCounter";

	configuration.setString(
			ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
			"org.apache.flink.metrics.statsd.StatsDReporterTest$TestingStatsDReporter");

	configuration.setString(MetricOptions.SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");
	configuration.setString(MetricOptions.SCOPE_DELIMITER, "_");

	MetricRegistryImpl metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));

	char delimiter = metricRegistry.getDelimiter();

	TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(metricRegistry, hostname, taskManagerId);
	TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(metricRegistry, tmMetricGroup, new JobID(), jobName);
	TaskMetricGroup taskMetricGroup = new TaskMetricGroup(metricRegistry, tmJobMetricGroup, new JobVertexID(), new AbstractID(), taskName, 0, 0);

	SimpleCounter myCounter = new SimpleCounter();

	taskMetricGroup.counter(counterName, myCounter);

	List<MetricReporter> reporters = metricRegistry.getReporters();

	assertTrue(reporters.size() == 1);

	MetricReporter metricReporter = reporters.get(0);

	assertTrue("Reporter should be of type StatsDReporter", metricReporter instanceof StatsDReporter);

	TestingStatsDReporter reporter = (TestingStatsDReporter) metricReporter;

	Map<Counter, String> counters = reporter.getCounters();

	assertTrue(counters.containsKey(myCounter));

	String expectedCounterName = reporter.filterCharacters(hostname)
		+ delimiter
		+ reporter.filterCharacters(taskManagerId)
		+ delimiter
		+ reporter.filterCharacters(jobName)
		+ delimiter
		+ reporter.filterCharacters(counterName);

	assertEquals(expectedCounterName, counters.get(myCounter));

	metricRegistry.shutdown().get();
}
 
示例26
/**
 * Verifies that multiple JMXReporters can be started on the same machine and register metrics at the MBeanServer.
 *
 * @throws Exception if the attribute/mbean could not be found or the test is broken
 */
@Test
public void testPortConflictHandling() throws Exception {
	Configuration cfg = new Configuration();

	cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
	cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1.port", "9020-9035");

	cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
	cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2.port", "9020-9035");

	MetricRegistryImpl reg = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));

	TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, "host", "tm");

	List<MetricReporter> reporters = reg.getReporters();

	assertTrue(reporters.size() == 2);

	MetricReporter rep1 = reporters.get(0);
	MetricReporter rep2 = reporters.get(1);

	Gauge<Integer> g1 = new Gauge<Integer>() {
		@Override
		public Integer getValue() {
			return 1;
		}
	};
	Gauge<Integer> g2 = new Gauge<Integer>() {
		@Override
		public Integer getValue() {
			return 2;
		}
	};

	rep1.notifyOfAddedMetric(g1, "rep1", new FrontMetricGroup<>(0, new TaskManagerMetricGroup(reg, "host", "tm")));
	rep2.notifyOfAddedMetric(g2, "rep2", new FrontMetricGroup<>(0, new TaskManagerMetricGroup(reg, "host", "tm")));

	MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();

	ObjectName objectName1 = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager.rep1", JMXReporter.generateJmxTable(mg.getAllVariables()));
	ObjectName objectName2 = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager.rep2", JMXReporter.generateJmxTable(mg.getAllVariables()));

	assertEquals(1, mBeanServer.getAttribute(objectName1, "Value"));
	assertEquals(2, mBeanServer.getAttribute(objectName2, "Value"));

	rep1.notifyOfRemovedMetric(g1, "rep1", null);
	rep1.notifyOfRemovedMetric(g2, "rep2", null);

	mg.close();
	reg.shutdown().get();
}
 
示例27
/**
 * Verifies that we can connect to multiple JMXReporters running on the same machine.
 *
 * @throws Exception
 */
@Test
public void testJMXAvailability() throws Exception {
	Configuration cfg = new Configuration();
	cfg.setString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName());

	cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
	cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1.port", "9040-9055");

	cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
	cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2.port", "9040-9055");

	MetricRegistryImpl reg = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));

	TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, "host", "tm");

	List<MetricReporter> reporters = reg.getReporters();

	assertTrue(reporters.size() == 2);

	MetricReporter rep1 = reporters.get(0);
	MetricReporter rep2 = reporters.get(1);

	Gauge<Integer> g1 = new Gauge<Integer>() {
		@Override
		public Integer getValue() {
			return 1;
		}
	};
	Gauge<Integer> g2 = new Gauge<Integer>() {
		@Override
		public Integer getValue() {
			return 2;
		}
	};

	rep1.notifyOfAddedMetric(g1, "rep1", new FrontMetricGroup<>(0, new TaskManagerMetricGroup(reg, "host", "tm")));

	rep2.notifyOfAddedMetric(g2, "rep2", new FrontMetricGroup<>(1, new TaskManagerMetricGroup(reg, "host", "tm")));

	ObjectName objectName1 = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager.rep1", JMXReporter.generateJmxTable(mg.getAllVariables()));
	ObjectName objectName2 = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager.rep2", JMXReporter.generateJmxTable(mg.getAllVariables()));

	JMXServiceURL url1 = new JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter) rep1).getPort() + "/jndi/rmi://localhost:" + ((JMXReporter) rep1).getPort() + "/jmxrmi");
	JMXConnector jmxCon1 = JMXConnectorFactory.connect(url1);
	MBeanServerConnection mCon1 = jmxCon1.getMBeanServerConnection();

	assertEquals(1, mCon1.getAttribute(objectName1, "Value"));
	assertEquals(2, mCon1.getAttribute(objectName2, "Value"));

	jmxCon1.close();

	JMXServiceURL url2 = new JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter) rep2).getPort() + "/jndi/rmi://localhost:" + ((JMXReporter) rep2).getPort() + "/jmxrmi");
	JMXConnector jmxCon2 = JMXConnectorFactory.connect(url2);
	MBeanServerConnection mCon2 = jmxCon2.getMBeanServerConnection();

	assertEquals(1, mCon2.getAttribute(objectName1, "Value"));
	assertEquals(2, mCon2.getAttribute(objectName2, "Value"));

	rep1.notifyOfRemovedMetric(g1, "rep1", null);
	rep1.notifyOfRemovedMetric(g2, "rep2", null);

	jmxCon2.close();

	rep1.close();
	rep2.close();
	mg.close();
	reg.shutdown().get();
}
 
示例28
public ReporterSetup(final String name, final MetricConfig configuration, MetricReporter reporter) {
	this.name = name;
	this.configuration = configuration;
	this.reporter = reporter;
}
 
示例29
public MetricReporter getReporter() {
	return reporter;
}
 
示例30
@VisibleForTesting
public static ReporterSetup forReporter(String reporterName, MetricReporter reporter) {
	return createReporterSetup(reporterName, new MetricConfig(), reporter);
}