Java源码示例:org.apache.flume.source.AvroSource
示例1
@Before
public void setUp() throws Exception {
port = NetworkUtils.getRandomPort();
source = new AvroSource();
ch = new MemoryChannel();
Configurables.configure(ch, new Context());
Context context = new Context();
context.put("port", String.valueOf(port));
context.put("bind", "localhost");
Configurables.configure(source, context);
List<Channel> channels = new ArrayList<>();
channels.add(ch);
ChannelSelector rcs = new ReplicatingChannelSelector();
rcs.setChannels(channels);
source.setChannelProcessor(new ChannelProcessor(rcs));
source.start();
}
示例2
@Before
public void initiate() throws Exception{
int port = 25430;
source = new AvroSource();
ch = new MemoryChannel();
Configurables.configure(ch, new Context());
Context context = new Context();
context.put("port", String.valueOf(port));
context.put("bind", "localhost");
Configurables.configure(source, context);
File TESTFILE = new File(
TestLog4jAppender.class.getClassLoader()
.getResource("flume-log4jtest.properties").getFile());
FileReader reader = new FileReader(TESTFILE);
props = new Properties();
props.load(reader);
reader.close();
}
示例3
@Before
public void setUp() throws Exception {
URL schemaUrl = getClass().getClassLoader().getResource("myrecord.avsc");
Files.copy(Resources.newInputStreamSupplier(schemaUrl),
new File("/tmp/myrecord.avsc"));
int port = 25430;
source = new AvroSource();
ch = new MemoryChannel();
Configurables.configure(ch, new Context());
Context context = new Context();
context.put("port", String.valueOf(port));
context.put("bind", "localhost");
Configurables.configure(source, context);
List<Channel> channels = new ArrayList<Channel>();
channels.add(ch);
ChannelSelector rcs = new ReplicatingChannelSelector();
rcs.setChannels(channels);
source.setChannelProcessor(new ChannelProcessor(rcs));
source.start();
}
示例4
@Before
public void setUp() throws Exception {
URL schemaUrl = getClass().getClassLoader().getResource("myrecord.avsc");
Files.copy(Resources.newInputStreamSupplier(schemaUrl),
new File("/tmp/myrecord.avsc"));
int port = 25430;
source = new AvroSource();
ch = new MemoryChannel();
Configurables.configure(ch, new Context());
Context context = new Context();
context.put("port", String.valueOf(port));
context.put("bind", "localhost");
Configurables.configure(source, context);
List<Channel> channels = new ArrayList<Channel>();
channels.add(ch);
ChannelSelector rcs = new ReplicatingChannelSelector();
rcs.setChannels(channels);
source.setChannelProcessor(new ChannelProcessor(rcs));
source.start();
}
示例5
@Before
public void setUp() {
sources = new ArrayList<>(NUM_HOSTS);
chs = new ArrayList<>(NUM_HOSTS);
for(int i = 0; i < NUM_HOSTS; i++) {
AvroSource source = new AvroSource();
Channel channel = new MemoryChannel();
Configurables.configure(channel, new Context());
Context context = new Context();
context.put("port", String.valueOf(ports.get(i)));
context.put("bind", "localhost");
Configurables.configure(source, context);
List<Channel> channels = new ArrayList<>();
channels.add(channel);
ChannelSelector rcs = new ReplicatingChannelSelector();
rcs.setChannels(channels);
source.setChannelProcessor(new ChannelProcessor(rcs));
sources.add(source);
chs.add(channel);
source.start();
}
}
示例6
@Before
public void setUp() throws Exception {
eventSource = new AvroSource();
channel = new MemoryChannel();
Configurables.configure(channel, new Context());
avroLogger = (Logger) LogManager.getLogger("avrologger");
/*
* Clear out all other appenders associated with this logger to ensure
* we're only hitting the Avro appender.
*/
removeAppenders(avroLogger);
final Context context = new Context();
testPort = String.valueOf(AvailablePortFinder.getNextAvailable());
context.put("port", testPort);
context.put("bind", "0.0.0.0");
Configurables.configure(eventSource, context);
final List<Channel> channels = new ArrayList<>();
channels.add(channel);
final ChannelSelector cs = new ReplicatingChannelSelector();
cs.setChannels(channels);
eventSource.setChannelProcessor(new ChannelProcessor(cs));
eventSource.start();
Assert.assertTrue("Reached start or error", LifecycleController
.waitForOneOf(eventSource, LifecycleState.START_OR_ERROR));
Assert.assertEquals("Server is started", LifecycleState.START,
eventSource.getLifecycleState());
}
示例7
@Before
public void initiate() throws Exception{
int port = 25430;
source = new AvroSource();
ch = new MemoryChannel();
Configurables.configure(ch, new Context());
Context context = new Context();
context.put("port", String.valueOf(port));
context.put("bind", "localhost");
Configurables.configure(source, context);
List<Channel> channels = new ArrayList<Channel>();
channels.add(ch);
ChannelSelector rcs = new ReplicatingChannelSelector();
rcs.setChannels(channels);
source.setChannelProcessor(new ChannelProcessor(rcs));
source.start();
File TESTFILE = new File(
TestLog4jAppender.class.getClassLoader()
.getResource("flume-log4jtest.properties").getFile());
FileReader reader = new FileReader(TESTFILE);
props = new Properties();
props.load(reader);
reader.close();
}
示例8
private void createAvroSourceWithSelectorHDFSAndESSinks() {
Channel ESChannel = flumeESSinkService.getChannel();
Channel HDFSChannel = flumeHDFSSinkService.getChannel();
Channel HbaseChannel = flumeHbaseSinkService.getChannel();
final Map<String, String> properties = new HashMap<String, String>();
properties.put("type", "avro");
properties.put("bind", "localhost");
properties.put("port", "44444");
avroSource = new AvroSource();
avroSource.setName("AvroSource-" + UUID.randomUUID());
Context sourceContext = new Context(properties);
avroSource.configure(sourceContext);
ChannelSelector selector = new MultiplexingChannelSelector();
List<Channel> channels = new ArrayList<>();
channels.add(ESChannel);
channels.add(HDFSChannel);
channels.add(sparkAvroChannel);
channels.add(HbaseChannel);
selector.setChannels(channels);
final Map<String, String> selectorProperties = new HashMap<String, String>();
selectorProperties.put("type", "multiplexing");
selectorProperties.put("header", "State");
// selectorProperties.put("mapping.VIEWED", HDFSChannel.getName() + " "
// + ESChannel.getName());
// selectorProperties.put("mapping.FAVOURITE", HDFSChannel.getName() +
// " "
// + ESChannel.getName());
// selectorProperties.put("default", HDFSChannel.getName());
// In case spark avro sink is used.
selectorProperties.put("mapping.VIEWED", HDFSChannel.getName() + " "
+ ESChannel.getName() + " " + sparkAvroChannel.getName() + " "
+ HbaseChannel.getName());
selectorProperties.put("mapping.FAVOURITE", HDFSChannel.getName() + " "
+ ESChannel.getName() + " " + sparkAvroChannel.getName() + " "
+ HbaseChannel.getName());
selectorProperties.put("default", HDFSChannel.getName() + " "
+ sparkAvroChannel.getName() + " " + HbaseChannel.getName());
Context selectorContext = new Context(selectorProperties);
selector.configure(selectorContext);
ChannelProcessor cp = new ChannelProcessor(selector);
avroSource.setChannelProcessor(cp);
avroSource.start();
}
示例9
@SuppressWarnings("unused")
private void createAvroSourceWithLocalFileRollingSink() {
channel = new MemoryChannel();
String channelName = "AvroSourceMemoryChannel-" + UUID.randomUUID();
channel.setName(channelName);
sink = new RollingFileSink();
sink.setName("RollingFileSink-" + UUID.randomUUID());
Map<String, String> paramters = new HashMap<>();
paramters.put("type", "file_roll");
paramters.put("sink.directory", "target/flumefilelog");
Context sinkContext = new Context(paramters);
sink.configure(sinkContext);
Configurables.configure(channel, sinkContext);
sink.setChannel(channel);
final Map<String, String> properties = new HashMap<String, String>();
properties.put("type", "avro");
properties.put("bind", "localhost");
properties.put("port", "44444");
properties.put("selector.type", "multiplexing");
properties.put("selector.header", "State");
properties.put("selector.mapping.VIEWED", channelName);
properties.put("selector.mapping.default", channelName);
avroSource = new AvroSource();
avroSource.setName("AvroSource-" + UUID.randomUUID());
Context sourceContext = new Context(properties);
avroSource.configure(sourceContext);
ChannelSelector selector = new MultiplexingChannelSelector();
List<Channel> channels = new ArrayList<>();
channels.add(channel);
selector.setChannels(channels);
final Map<String, String> selectorProperties = new HashMap<String, String>();
properties.put("default", channelName);
Context selectorContext = new Context(selectorProperties);
selector.configure(selectorContext);
ChannelProcessor cp = new ChannelProcessor(selector);
avroSource.setChannelProcessor(cp);
sink.start();
channel.start();
avroSource.start();
}