Java源码示例:org.elasticsearch.cluster.routing.ShardRouting
示例1
/**
* Move started shards that can not be allocated to a node anymore
*
* For each shard to be moved this function executes a move operation
* to the minimal eligible node with respect to the
* weight function. If a shard is moved the shard will be set to
* {@link ShardRoutingState#RELOCATING} and a shadow instance of this
* shard is created with an incremented version in the state
* {@link ShardRoutingState#INITIALIZING}.
*/
public void moveShards() {
// Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling
// shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are
// offloading the shards.
for (Iterator<ShardRouting> it = allocation.routingNodes().nodeInterleavedShardIterator(); it.hasNext(); ) {
ShardRouting shardRouting = it.next();
final MoveDecision moveDecision = decideMove(shardRouting);
if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) {
final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
final ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId());
sourceNode.removeShard(shardRouting);
Tuple<ShardRouting, ShardRouting> relocatingShards = routingNodes.relocateShard(shardRouting, targetNode.getNodeId(),
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), allocation.changes());
targetNode.addShard(relocatingShards.v2());
if (logger.isTraceEnabled()) {
logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode());
}
} else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) {
logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
}
}
}
示例2
@Override
protected ShardUpgradeStatus shardOperation(UpgradeStatusRequest request, ShardRouting shardRouting) {
IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());
IndexShard indexShard = indexService.shardSafe(shardRouting.shardId().id());
List<Segment> segments = indexShard.engine().segments(false);
long total_bytes = 0;
long to_upgrade_bytes = 0;
long to_upgrade_bytes_ancient = 0;
for (Segment seg : segments) {
total_bytes += seg.sizeInBytes;
if (seg.version.major != Version.CURRENT.luceneVersion.major) {
to_upgrade_bytes_ancient += seg.sizeInBytes;
to_upgrade_bytes += seg.sizeInBytes;
} else if (seg.version.minor != Version.CURRENT.luceneVersion.minor) {
// TODO: this comparison is bogus! it would cause us to upgrade even with the same format
// instead, we should check if the codec has changed
to_upgrade_bytes += seg.sizeInBytes;
}
}
return new ShardUpgradeStatus(indexShard.routingEntry(), total_bytes, to_upgrade_bytes, to_upgrade_bytes_ancient);
}
示例3
void performFirstPhase(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) {
if (shard == null) {
// no more active shards... (we should not really get here, but just for safety)
onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
} else {
final DiscoveryNode node = nodes.get(shard.currentNodeId());
if (node == null) {
onFirstPhaseResult(shardIndex, shard, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
} else {
String[] filteringAliases = indexNameExpressionResolver.filteringAliases(clusterState, shard.index(), request.indices());
sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request, filteringAliases, startTime()), new ActionListener<FirstResult>() {
@Override
public void onResponse(FirstResult result) {
onFirstPhaseResult(shardIndex, shard, result, shardIt);
}
@Override
public void onFailure(Throwable t) {
onFirstPhaseResult(shardIndex, shard, node.id(), shardIt, t);
}
});
}
}
}
示例4
private void addTrackingInfo(IndexShardRoutingTable indexShardRoutingTable, ShardRouting primaryShard, Set<String> trackedShards,
Set<String> untrackedShards) {
for (ShardRouting shr : indexShardRoutingTable.shards()) {
if (shr.unassigned() == false) {
if (shr.initializing()) {
if (randomBoolean()) {
trackedShards.add(shr.allocationId().getId());
} else {
untrackedShards.add(shr.allocationId().getId());
}
} else {
trackedShards.add(shr.allocationId().getId());
if (shr.relocating()) {
if (primaryShard == shr.getTargetRelocatingShard() || randomBoolean()) {
trackedShards.add(shr.getTargetRelocatingShard().allocationId().getId());
} else {
untrackedShards.add(shr.getTargetRelocatingShard().allocationId().getId());
}
}
}
}
}
}
示例5
private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardRouting shardRouting, ClusterState state) {
assert shardRouting.initializing() : "only allow shard creation for initializing shard but was " + shardRouting;
DiscoveryNode sourceNode = null;
if (shardRouting.recoverySource().getType() == Type.PEER) {
sourceNode = findSourceNodeForPeerRecovery(LOGGER, routingTable, nodes, shardRouting);
if (sourceNode == null) {
LOGGER.trace("ignoring initializing shard {} - no source node can be found.", shardRouting.shardId());
return;
}
}
try {
LOGGER.debug("{} creating shard", shardRouting.shardId());
RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode);
indicesService.createShard(shardRouting, recoveryState, recoveryTargetService, new RecoveryListener(shardRouting),
repositoriesService, failedShardHandler, globalCheckpointSyncer);
} catch (Exception e) {
failAndRemoveShard(shardRouting, true, "failed to create shard", e, state);
}
}
示例6
/**
* Retrieves the routing for sys.shards
* <p>
* This routing contains ALL shards of ALL indices.
* Any shards that are not yet assigned to a node will have a NEGATIVE shard id (see {@link UnassignedShard}
*/
public static Routing getRouting(ClusterState clusterState, RoutingProvider routingProvider, SessionContext sessionContext) {
String[] concreteIndices = Arrays.stream(clusterState.metaData().getConcreteAllOpenIndices())
.filter(index -> !IndexParts.isDangling(index))
.toArray(String[]::new);
User user = sessionContext != null ? sessionContext.user() : null;
if (user != null) {
List<String> accessibleTables = new ArrayList<>(concreteIndices.length);
for (String indexName : concreteIndices) {
String tableName = RelationName.fqnFromIndexName(indexName);
if (user.hasAnyPrivilege(Privilege.Clazz.TABLE, tableName)) {
accessibleTables.add(indexName);
}
}
concreteIndices = accessibleTables.toArray(new String[0]);
}
Map<String, Map<String, IntIndexedContainer>> locations = new TreeMap<>();
GroupShardsIterator<ShardIterator> groupShardsIterator =
clusterState.getRoutingTable().allAssignedShardsGrouped(concreteIndices, true, true);
for (final ShardIterator shardIt : groupShardsIterator) {
final ShardRouting shardRouting = shardIt.nextOrNull();
processShardRouting(clusterState.getNodes().getLocalNodeId(), locations, shardRouting, shardIt.shardId());
}
return new Routing(locations);
}
示例7
static void buildShardLevelInfo(ESLogger logger, ShardStats[] stats, HashMap<String, Long> newShardSizes, HashMap<ShardRouting, String> newShardRoutingToDataPath, ClusterState state) {
MetaData meta = state.getMetaData();
for (ShardStats s : stats) {
IndexMetaData indexMeta = meta.index(s.getShardRouting().index());
Settings indexSettings = indexMeta == null ? null : indexMeta.getSettings();
newShardRoutingToDataPath.put(s.getShardRouting(), s.getDataPath());
long size = s.getStats().getStore().sizeInBytes();
String sid = ClusterInfo.shardIdentifierFromRouting(s.getShardRouting());
if (logger.isTraceEnabled()) {
logger.trace("shard: {} size: {}", sid, size);
}
if (indexSettings != null && IndexMetaData.isIndexUsingShadowReplicas(indexSettings)) {
// Shards on a shared filesystem should be considered of size 0
if (logger.isTraceEnabled()) {
logger.trace("shard: {} is using shadow replicas and will be treated as size 0", sid);
}
size = 0;
}
newShardSizes.put(sid, size);
}
}
示例8
private Decision decideSameNode(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation,
Iterable<ShardRouting> assignedShards) {
for (ShardRouting assignedShard : assignedShards) {
if (node.nodeId().equals(assignedShard.currentNodeId())) {
if (assignedShard.isSameAllocation(shardRouting)) {
return allocation.decision(Decision.NO, NAME,
"the shard cannot be allocated to the node on which it already exists [%s]",
shardRouting.toString());
} else {
return allocation.decision(Decision.NO, NAME,
"the shard cannot be allocated to the same node on which a copy of the shard already exists [%s]",
assignedShard.toString());
}
}
}
return allocation.decision(Decision.YES, NAME, "the shard does not exist on the same node");
}
示例9
@Override
public final int compare(ShardRouting o1, ShardRouting o2) {
final String o1Index = o1.getIndexName();
final String o2Index = o2.getIndexName();
int cmp = 0;
if (o1Index.equals(o2Index) == false) {
final Settings settingsO1 = getIndexSettings(o1.index());
final Settings settingsO2 = getIndexSettings(o2.index());
cmp = Long.compare(priority(settingsO2), priority(settingsO1));
if (cmp == 0) {
cmp = Long.compare(timeCreated(settingsO2), timeCreated(settingsO1));
if (cmp == 0) {
cmp = o2Index.compareTo(o1Index);
}
}
}
return cmp;
}
示例10
private Decision canMove(ShardRouting shardRouting, RoutingAllocation allocation) {
if (!enableRelocation && shardRouting.primary()) {
// Only primary shards are snapshotted
SnapshotsInProgress snapshotsInProgress = allocation.routingNodes().custom(SnapshotsInProgress.TYPE);
if (snapshotsInProgress == null) {
// Snapshots are not running
return allocation.decision(Decision.YES, NAME, "no snapshots are currently running");
}
for (SnapshotsInProgress.Entry snapshot : snapshotsInProgress.entries()) {
SnapshotsInProgress.ShardSnapshotStatus shardSnapshotStatus = snapshot.shards().get(shardRouting.shardId());
if (shardSnapshotStatus != null && !shardSnapshotStatus.state().completed() && shardSnapshotStatus.nodeId() != null && shardSnapshotStatus.nodeId().equals(shardRouting.currentNodeId())) {
logger.trace("Preventing snapshotted shard [{}] to be moved from node [{}]", shardRouting.shardId(), shardSnapshotStatus.nodeId());
return allocation.decision(Decision.NO, NAME, "snapshot for shard [%s] is currently running on node [%s]",
shardRouting.shardId(), shardSnapshotStatus.nodeId());
}
}
}
return allocation.decision(Decision.YES, NAME, "shard not primary or relocation disabled");
}
示例11
public ImmutableMap<ShardRouting, ShardStats> asMap() {
if (shardStatsMap == null) {
ImmutableMap.Builder<ShardRouting, ShardStats> mb = ImmutableMap.builder();
for (ShardStats ss : shards) {
mb.put(ss.getShardRouting(), ss);
}
shardStatsMap = mb.build();
}
return shardStatsMap;
}
示例12
/**
* Retrieves the routing for sys.shards
*
* This routing contains ALL shards of ALL indices.
* Any shards that are not yet assigned to a node will have a NEGATIVE shard id (see {@link UnassignedShard}
*/
@Override
public Routing getRouting(WhereClause whereClause, @Nullable String preference) {
// TODO: filter on whereClause
Map<String, Map<String, List<Integer>>> locations = new TreeMap<>();
ClusterState state = service.state();
String[] concreteIndices = state.metaData().concreteAllIndices();
GroupShardsIterator groupShardsIterator = state.getRoutingTable().allAssignedShardsGrouped(concreteIndices, true, true);
for (final ShardIterator shardIt : groupShardsIterator) {
final ShardRouting shardRouting = shardIt.nextOrNull();
processShardRouting(locations, shardRouting, shardIt.shardId());
}
return new Routing(locations);
}
示例13
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
indicesLevelRequest = readRequestFrom(in);
int size = in.readVInt();
shards = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
shards.add(ShardRouting.readShardRoutingEntry(in));
}
nodeId = in.readString();
}
示例14
@Override
protected ShardUpgradeResult shardOperation(UpgradeRequest request, ShardRouting shardRouting) throws IOException {
IndexShard indexShard = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()).shardSafe(shardRouting.shardId().id());
org.apache.lucene.util.Version oldestLuceneSegment = indexShard.upgrade(request);
// We are using the current version of Elasticsearch as upgrade version since we update mapping to match the current version
return new ShardUpgradeResult(shardRouting.shardId(), indexShard.routingEntry().primary(), Version.CURRENT, oldestLuceneSegment);
}
示例15
private long getExpectedShardSize(ShardRouting shardRouting, long defaultSize, RoutingAllocation allocation) {
return DiskThresholdDecider.getExpectedShardSize(
shardRouting,
defaultSize,
allocation.clusterInfo(),
allocation.metaData(),
allocation.routingTable()
);
}
示例16
/**
* success constructor
*/
public ShardsSyncedFlushResult(ShardId shardId, String syncId, int totalShards, Map<ShardRouting, SyncedFlushService.ShardSyncedFlushResponse> shardResponses) {
this.failureReason = null;
ImmutableMap.Builder<ShardRouting, SyncedFlushService.ShardSyncedFlushResponse> builder = ImmutableMap.builder();
this.shardResponses = builder.putAll(shardResponses).build();
this.syncId = syncId;
this.totalShards = totalShards;
this.shardId = shardId;
}
示例17
public ClusterInfo(StreamInput in) throws IOException {
Map<String, DiskUsage> leastMap = in.readMap(StreamInput::readString, DiskUsage::new);
Map<String, DiskUsage> mostMap = in.readMap(StreamInput::readString, DiskUsage::new);
Map<String, Long> sizeMap = in.readMap(StreamInput::readString, StreamInput::readLong);
final Map<ShardRouting, String> routingMap = in.readMap(ShardRouting::new, StreamInput::readString);
ImmutableOpenMap.Builder<String, DiskUsage> leastBuilder = ImmutableOpenMap.builder();
this.leastAvailableSpaceUsage = leastBuilder.putAll(leastMap).build();
ImmutableOpenMap.Builder<String, DiskUsage> mostBuilder = ImmutableOpenMap.builder();
this.mostAvailableSpaceUsage = mostBuilder.putAll(mostMap).build();
ImmutableOpenMap.Builder<String, Long> sizeBuilder = ImmutableOpenMap.builder();
this.shardSizes = sizeBuilder.putAll(sizeMap).build();
ImmutableOpenMap.Builder<ShardRouting, String> routingBuilder = ImmutableOpenMap.builder();
this.routingToDataPath = routingBuilder.putAll(routingMap).build();
}
示例18
@Override
public void readFrom(StreamInput in) throws IOException {
failureReason = in.readOptionalString();
int numResponses = in.readInt();
shardResponses = new HashMap<>();
for (int i = 0; i < numResponses; i++) {
ShardRouting shardRouting = ShardRouting.readShardRoutingEntry(in);
SyncedFlushService.ShardSyncedFlushResponse response = SyncedFlushService.ShardSyncedFlushResponse.readSyncedFlushResponse(in);
shardResponses.put(shardRouting, response);
}
syncId = in.readOptionalString();
shardId = ShardId.readShardId(in);
totalShards = in.readInt();
}
示例19
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(failureReason);
out.writeInt(shardResponses.size());
for (Map.Entry<ShardRouting, SyncedFlushService.ShardSyncedFlushResponse> entry : shardResponses.entrySet()) {
entry.getKey().writeTo(out);
entry.getValue().writeTo(out);
}
out.writeOptionalString(syncId);
shardId.writeTo(out);
out.writeInt(totalShards);
}
示例20
private void contDownAndSendResponseIfDone(String syncId, List<ShardRouting> shards, ShardId shardId, int totalShards,
ActionListener<ShardsSyncedFlushResult> listener, CountDown countDown, Map<ShardRouting,
ShardSyncedFlushResponse> results) {
if (countDown.countDown()) {
assert results.size() == shards.size();
listener.onResponse(new ShardsSyncedFlushResult(shardId, syncId, totalShards, results));
}
}
示例21
/**
* Creates a new initializing shard. The shard will have its own unique data path.
*
* @param primary indicates whether to a primary shard (ready to recover from an empty store) or a replica (ready to recover from
* another shard)
* @param settings the settings to use for this shard
* @param engineFactory the engine factory to use for this shard
*/
protected IndexShard newShard(boolean primary, Settings settings, EngineFactory engineFactory) throws IOException {
final RecoverySource recoverySource =
primary ? RecoverySource.EmptyStoreRecoverySource.INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE;
final ShardRouting shardRouting =
TestShardRouting.newShardRouting(
new ShardId("index", "_na_", 0), randomAlphaOfLength(10), primary, ShardRoutingState.INITIALIZING, recoverySource);
return newShard(shardRouting, settings, engineFactory);
}
示例22
private IndicesShardStoresResponse.StoreStatus.Allocation getAllocation(String index, int shardID, DiscoveryNode node) {
for (ShardRouting shardRouting : routingNodes.node(node.id())) {
ShardId shardId = shardRouting.shardId();
if (shardId.id() == shardID && shardId.getIndex().equals(index)) {
if (shardRouting.primary()) {
return IndicesShardStoresResponse.StoreStatus.Allocation.PRIMARY;
} else if (shardRouting.assignedToNode()) {
return IndicesShardStoresResponse.StoreStatus.Allocation.REPLICA;
} else {
return IndicesShardStoresResponse.StoreStatus.Allocation.UNUSED;
}
}
}
return IndicesShardStoresResponse.StoreStatus.Allocation.UNUSED;
}
示例23
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
final UnassignedInfo unassignedInfo = shardRouting.unassignedInfo();
if (unassignedInfo != null && shardRouting.recoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {
// we only make decisions here if we have an unassigned info and we have to recover from another index ie. split / shrink
final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index());
Index resizeSourceIndex = indexMetaData.getResizeSourceIndex();
assert resizeSourceIndex != null;
if (allocation.metaData().index(resizeSourceIndex) == null) {
return allocation.decision(Decision.NO, NAME, "resize source index [%s] doesn't exists", resizeSourceIndex.toString());
}
IndexMetaData sourceIndexMetaData = allocation.metaData().getIndexSafe(resizeSourceIndex);
if (indexMetaData.getNumberOfShards() < sourceIndexMetaData.getNumberOfShards()) {
// this only handles splits so far.
return Decision.ALWAYS;
}
ShardId shardId = IndexMetaData.selectSplitShard(shardRouting.id(), sourceIndexMetaData, indexMetaData.getNumberOfShards());
ShardRouting sourceShardRouting = allocation.routingNodes().activePrimary(shardId);
if (sourceShardRouting == null) {
return allocation.decision(Decision.NO, NAME, "source primary shard [%s] is not active", shardId);
}
if (node != null) { // we might get called from the 2 param canAllocate method..
if (sourceShardRouting.currentNodeId().equals(node.nodeId())) {
return allocation.decision(Decision.YES, NAME, "source primary is allocated on this node");
} else {
return allocation.decision(Decision.NO, NAME, "source primary is allocated on another node");
}
} else {
return allocation.decision(Decision.YES, NAME, "source primary is active");
}
}
return super.canAllocate(shardRouting, node, allocation);
}
示例24
ShardRoutingEntry(ShardRouting shardRouting, String indexUUID, String message, String finderNodeId, @Nullable Throwable failure) {
this.shardRouting = shardRouting;
this.indexUUID = indexUUID;
this.message = message;
this.finderNodeId = finderNodeId;
this.failure = failure;
}
示例25
private void sendNodeRequest(final DiscoveryNode node, List<ShardRouting> shards, final int nodeIndex) {
try {
NodeRequest nodeRequest = new NodeRequest(node.getId(), request, shards);
if (task != null) {
nodeRequest.setParentTask(clusterService.localNode().id(), task.getId());
taskManager.registerChildTask(task, node.getId());
}
transportService.sendRequest(node, transportNodeBroadcastAction, nodeRequest, new BaseTransportResponseHandler<NodeResponse>() {
@Override
public NodeResponse newInstance() {
return new NodeResponse();
}
@Override
public void handleResponse(NodeResponse response) {
onNodeResponse(node, nodeIndex, response);
}
@Override
public void handleException(TransportException exp) {
onNodeFailure(node, nodeIndex, exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
} catch (Throwable e) {
onNodeFailure(node, nodeIndex, e);
}
}
示例26
private String executionFailureMsg(@Nullable ShardRouting shard, final ShardIterator shardIt, SearchRequest request, boolean lastShard) {
if (shard != null) {
return shard.shortSummary() + ": Failed to execute [" + request + "] lastShard [" + lastShard + "]";
} else {
return shardIt.shardId() + ": Failed to execute [" + request + "] lastShard [" + lastShard + "]";
}
}
示例27
private boolean isEnoughAllocationsFound(ShardRouting shard, IndexMetaData indexMetaData, NodesAndVersions nodesAndVersions) {
// check if the counts meets the minimum set
int requiredAllocation = 1;
// if we restore from a repository one copy is more then enough
if (shard.restoreSource() == null) {
try {
String initialShards = indexMetaData.getSettings().get(INDEX_RECOVERY_INITIAL_SHARDS, settings.get(INDEX_RECOVERY_INITIAL_SHARDS, this.initialShards));
if ("quorum".equals(initialShards)) {
if (indexMetaData.getNumberOfReplicas() > 1) {
requiredAllocation = ((1 + indexMetaData.getNumberOfReplicas()) / 2) + 1;
}
} else if ("quorum-1".equals(initialShards) || "half".equals(initialShards)) {
if (indexMetaData.getNumberOfReplicas() > 2) {
requiredAllocation = ((1 + indexMetaData.getNumberOfReplicas()) / 2);
}
} else if ("one".equals(initialShards)) {
requiredAllocation = 1;
} else if ("full".equals(initialShards) || "all".equals(initialShards)) {
requiredAllocation = indexMetaData.getNumberOfReplicas() + 1;
} else if ("full-1".equals(initialShards) || "all-1".equals(initialShards)) {
if (indexMetaData.getNumberOfReplicas() > 1) {
requiredAllocation = indexMetaData.getNumberOfReplicas();
}
} else {
requiredAllocation = Integer.parseInt(initialShards);
}
} catch (Exception e) {
logger.warn("[{}][{}] failed to derived initial_shards from value {}, ignore allocation for {}", shard.index(), shard.id(), initialShards, shard);
}
}
return nodesAndVersions.allocationsFound >= requiredAllocation;
}
示例28
public ShardsSyncedFlushResult(StreamInput in) throws IOException {
failureReason = in.readOptionalString();
int numResponses = in.readInt();
shardResponses = new HashMap<>();
for (int i = 0; i < numResponses; i++) {
ShardRouting shardRouting = new ShardRouting(in);
SyncedFlushService.ShardSyncedFlushResponse response = new SyncedFlushService.ShardSyncedFlushResponse(in);
shardResponses.put(shardRouting, response);
}
syncId = in.readOptionalString();
shardId = new ShardId(in);
totalShards = in.readInt();
}
示例29
/**
* Takes an existing shard, closes it and starts a new initialing shard at the same location
*
* @param routing the shard routing to use for the newly created shard.
* @param listeners new listerns to use for the newly created shard
*/
protected IndexShard reinitShard(IndexShard current, ShardRouting routing, IndexingOperationListener... listeners) throws IOException {
closeShards(current);
return newShard(
routing,
current.shardPath(),
current.indexSettings().getIndexMetaData(),
null,
null,
current.engineFactory,
current.getGlobalCheckpointSyncer(),
EMPTY_EVENT_LISTENER, listeners);
}
示例30
private CompletableFuture<List<Row>> retrieveRows(ShardRouting activePrimaryRouting,
RoutedCollectPhase collectPhase,
CollectTask collectTask,
ShardCollectorProviderFactory shardCollectorProviderFactory) {
Collector<Row, ?, List<Object[]>> listCollector = Collectors.mapping(Row::materialize, Collectors.toList());
CollectingRowConsumer<?, List<Object[]>> consumer = new CollectingRowConsumer<>(listCollector);
String nodeId = activePrimaryRouting.currentNodeId();
String localNodeId = clusterService.localNode().getId();
if (localNodeId.equalsIgnoreCase(nodeId)) {
var indexShard = indicesService.indexServiceSafe(activePrimaryRouting.index())
.getShard(activePrimaryRouting.shardId().id());
var collectorProvider = shardCollectorProviderFactory.create(indexShard);
BatchIterator<Row> it;
try {
it = collectorProvider.getIterator(collectPhase, consumer.requiresScroll(), collectTask);
} catch (Exception e) {
return Exceptions.rethrowRuntimeException(e);
}
consumer.accept(it, null);
} else {
UUID childJobId = UUID.randomUUID();
RemoteCollector remoteCollector = new RemoteCollector(
childJobId,
collectTask.txnCtx().sessionSettings(),
localNodeId,
nodeId,
transportActionProvider.transportJobInitAction(),
transportActionProvider.transportKillJobsNodeAction(),
searchTp,
tasksService,
collectTask.getRamAccounting(),
consumer,
createRemoteCollectPhase(childJobId, collectPhase, activePrimaryRouting.shardId(), nodeId)
);
remoteCollector.doCollect();
}
return consumer
.completionFuture()
.thenApply(rows -> LazyMapList.of(rows, Buckets.arrayToSharedRow()));
}