Java源码示例:org.apache.flume.source.AvroSource

示例1
@Before
public void setUp() throws Exception {
  port = NetworkUtils.getRandomPort();
  source = new AvroSource();
  ch = new MemoryChannel();
  Configurables.configure(ch, new Context());

  Context context = new Context();
  context.put("port", String.valueOf(port));
  context.put("bind", "localhost");
  Configurables.configure(source, context);

  List<Channel> channels = new ArrayList<>();
  channels.add(ch);
  ChannelSelector rcs = new ReplicatingChannelSelector();
  rcs.setChannels(channels);
  source.setChannelProcessor(new ChannelProcessor(rcs));
  source.start();
}
 
示例2
@Before
public void initiate() throws Exception{
  int port = 25430;
  source = new AvroSource();
  ch = new MemoryChannel();
  Configurables.configure(ch, new Context());

  Context context = new Context();
  context.put("port", String.valueOf(port));
  context.put("bind", "localhost");
  Configurables.configure(source, context);

  File TESTFILE = new File(
      TestLog4jAppender.class.getClassLoader()
          .getResource("flume-log4jtest.properties").getFile());
  FileReader reader = new FileReader(TESTFILE);
  props = new Properties();
  props.load(reader);
  reader.close();
}
 
示例3
@Before
public void setUp() throws Exception {
  URL schemaUrl = getClass().getClassLoader().getResource("myrecord.avsc");
  Files.copy(Resources.newInputStreamSupplier(schemaUrl),
      new File("/tmp/myrecord.avsc"));

  int port = 25430;
  source = new AvroSource();
  ch = new MemoryChannel();
  Configurables.configure(ch, new Context());

  Context context = new Context();
  context.put("port", String.valueOf(port));
  context.put("bind", "localhost");
  Configurables.configure(source, context);

  List<Channel> channels = new ArrayList<Channel>();
  channels.add(ch);

  ChannelSelector rcs = new ReplicatingChannelSelector();
  rcs.setChannels(channels);

  source.setChannelProcessor(new ChannelProcessor(rcs));

  source.start();
}
 
示例4
@Before
public void setUp() throws Exception {
  URL schemaUrl = getClass().getClassLoader().getResource("myrecord.avsc");
  Files.copy(Resources.newInputStreamSupplier(schemaUrl),
      new File("/tmp/myrecord.avsc"));

  int port = 25430;
  source = new AvroSource();
  ch = new MemoryChannel();
  Configurables.configure(ch, new Context());

  Context context = new Context();
  context.put("port", String.valueOf(port));
  context.put("bind", "localhost");
  Configurables.configure(source, context);

  List<Channel> channels = new ArrayList<Channel>();
  channels.add(ch);

  ChannelSelector rcs = new ReplicatingChannelSelector();
  rcs.setChannels(channels);

  source.setChannelProcessor(new ChannelProcessor(rcs));

  source.start();
}
 
示例5
@Before
public void setUp() {
  sources = new ArrayList<>(NUM_HOSTS);
  chs = new ArrayList<>(NUM_HOSTS);

  for(int i = 0; i < NUM_HOSTS; i++) {
    AvroSource source = new AvroSource();
    Channel channel = new MemoryChannel();
    Configurables.configure(channel, new Context());

    Context context = new Context();
    context.put("port", String.valueOf(ports.get(i)));
    context.put("bind", "localhost");
    Configurables.configure(source, context);

    List<Channel> channels = new ArrayList<>();
    channels.add(channel);
    ChannelSelector rcs = new ReplicatingChannelSelector();
    rcs.setChannels(channels);
    source.setChannelProcessor(new ChannelProcessor(rcs));

    sources.add(source);
    chs.add(channel);

    source.start();
  }

}
 
示例6
@Before
public void setUp() throws Exception {
    eventSource = new AvroSource();
    channel = new MemoryChannel();

    Configurables.configure(channel, new Context());

    avroLogger = (Logger) LogManager.getLogger("avrologger");
    /*
     * Clear out all other appenders associated with this logger to ensure
     * we're only hitting the Avro appender.
     */
    removeAppenders(avroLogger);
    final Context context = new Context();
    testPort = String.valueOf(AvailablePortFinder.getNextAvailable());
    context.put("port", testPort);
    context.put("bind", "0.0.0.0");
    Configurables.configure(eventSource, context);

    final List<Channel> channels = new ArrayList<>();
    channels.add(channel);

    final ChannelSelector cs = new ReplicatingChannelSelector();
    cs.setChannels(channels);

    eventSource.setChannelProcessor(new ChannelProcessor(cs));

    eventSource.start();

    Assert.assertTrue("Reached start or error", LifecycleController
            .waitForOneOf(eventSource, LifecycleState.START_OR_ERROR));
    Assert.assertEquals("Server is started", LifecycleState.START,
            eventSource.getLifecycleState());
}
 
示例7
@Before
public void initiate() throws Exception{
  int port = 25430;
  source = new AvroSource();
  ch = new MemoryChannel();
  Configurables.configure(ch, new Context());

  Context context = new Context();
  context.put("port", String.valueOf(port));
  context.put("bind", "localhost");
  Configurables.configure(source, context);

  List<Channel> channels = new ArrayList<Channel>();
  channels.add(ch);

  ChannelSelector rcs = new ReplicatingChannelSelector();
  rcs.setChannels(channels);

  source.setChannelProcessor(new ChannelProcessor(rcs));

  source.start();
  File TESTFILE = new File(
      TestLog4jAppender.class.getClassLoader()
          .getResource("flume-log4jtest.properties").getFile());
  FileReader reader = new FileReader(TESTFILE);
  props = new Properties();
  props.load(reader);
  reader.close();
}
 
示例8
private void createAvroSourceWithSelectorHDFSAndESSinks() {
	Channel ESChannel = flumeESSinkService.getChannel();
	Channel HDFSChannel = flumeHDFSSinkService.getChannel();
	Channel HbaseChannel = flumeHbaseSinkService.getChannel();

	final Map<String, String> properties = new HashMap<String, String>();
	properties.put("type", "avro");
	properties.put("bind", "localhost");
	properties.put("port", "44444");

	avroSource = new AvroSource();
	avroSource.setName("AvroSource-" + UUID.randomUUID());
	Context sourceContext = new Context(properties);
	avroSource.configure(sourceContext);
	ChannelSelector selector = new MultiplexingChannelSelector();
	List<Channel> channels = new ArrayList<>();
	channels.add(ESChannel);
	channels.add(HDFSChannel);
	channels.add(sparkAvroChannel);
	channels.add(HbaseChannel);
	selector.setChannels(channels);
	final Map<String, String> selectorProperties = new HashMap<String, String>();
	selectorProperties.put("type", "multiplexing");
	selectorProperties.put("header", "State");
	// selectorProperties.put("mapping.VIEWED", HDFSChannel.getName() + " "
	// + ESChannel.getName());
	// selectorProperties.put("mapping.FAVOURITE", HDFSChannel.getName() +
	// " "
	// + ESChannel.getName());
	// selectorProperties.put("default", HDFSChannel.getName());
	// In case spark avro sink is used.
	selectorProperties.put("mapping.VIEWED", HDFSChannel.getName() + " "
			+ ESChannel.getName() + " " + sparkAvroChannel.getName() + " "
			+ HbaseChannel.getName());
	selectorProperties.put("mapping.FAVOURITE", HDFSChannel.getName() + " "
			+ ESChannel.getName() + " " + sparkAvroChannel.getName() + " "
			+ HbaseChannel.getName());
	selectorProperties.put("default", HDFSChannel.getName() + " "
			+ sparkAvroChannel.getName() + " " + HbaseChannel.getName());
	Context selectorContext = new Context(selectorProperties);
	selector.configure(selectorContext);
	ChannelProcessor cp = new ChannelProcessor(selector);
	avroSource.setChannelProcessor(cp);

	avroSource.start();
}
 
示例9
@SuppressWarnings("unused")
private void createAvroSourceWithLocalFileRollingSink() {
	channel = new MemoryChannel();
	String channelName = "AvroSourceMemoryChannel-" + UUID.randomUUID();
	channel.setName(channelName);

	sink = new RollingFileSink();
	sink.setName("RollingFileSink-" + UUID.randomUUID());
	Map<String, String> paramters = new HashMap<>();
	paramters.put("type", "file_roll");
	paramters.put("sink.directory", "target/flumefilelog");
	Context sinkContext = new Context(paramters);
	sink.configure(sinkContext);
	Configurables.configure(channel, sinkContext);
	sink.setChannel(channel);

	final Map<String, String> properties = new HashMap<String, String>();
	properties.put("type", "avro");
	properties.put("bind", "localhost");
	properties.put("port", "44444");
	properties.put("selector.type", "multiplexing");
	properties.put("selector.header", "State");
	properties.put("selector.mapping.VIEWED", channelName);
	properties.put("selector.mapping.default", channelName);

	avroSource = new AvroSource();
	avroSource.setName("AvroSource-" + UUID.randomUUID());
	Context sourceContext = new Context(properties);
	avroSource.configure(sourceContext);
	ChannelSelector selector = new MultiplexingChannelSelector();
	List<Channel> channels = new ArrayList<>();
	channels.add(channel);
	selector.setChannels(channels);
	final Map<String, String> selectorProperties = new HashMap<String, String>();
	properties.put("default", channelName);
	Context selectorContext = new Context(selectorProperties);
	selector.configure(selectorContext);
	ChannelProcessor cp = new ChannelProcessor(selector);
	avroSource.setChannelProcessor(cp);

	sink.start();
	channel.start();
	avroSource.start();
}