提问者:小点点

Ignite:简单的流到服务器问题


我有以下两个脚本,它们几乎完全遵循流示例。代码的来源如下:https://github.com/apache/ignite/blob/master/examples/src/main/java/org/apache/ignite/examples/streaming/StreamTransformerExample.java

然而,尽管客户端很好地连接到服务器,但在运行流时,我在服务器端遇到了可怕的错误——我是否未能正确配置某些东西?

[12:41:43] (err) Failed to execute compound future reducer: GridCompoundFuture [rdc=null, initFlag=1, lsnrCalls=0, done=false, cancelled=false, err=null, futs=[true, true, true, true, false]]class org.apache.ignite.IgniteCheckedException: DataStreamer request failed [node=befcb4b8-3262-4d16-be65-c9377b033245]
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$Buffer.onResponse(DataStreamerImpl.java:1857)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$3.onMessage(DataStreamerImpl.java:336)
    at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1555)
    at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:1183)
    at org.apache.ignite.internal.managers.communication.GridIoManager.access$4200(GridIoManager.java:126)
    at org.apache.ignite.internal.managers.communication.GridIoManager$9.run(GridIoManager.java:1090)
    at org.apache.ignite.internal.util.StripedExecutor$Stripe.run(StripedExecutor.java:505)
    at java.lang.Thread.run(Thread.java:748)
Caused by: class org.apache.ignite.IgniteCheckedException: Failed to unmarshal object with optimized marshaller
    at org.apache.ignite.internal.util.IgniteUtils.unmarshal(IgniteUtils.java:9801)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.processRequest(DataStreamProcessor.java:289)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.access$000(DataStreamProcessor.java:59)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$1.onMessage(DataStreamProcessor.java:89)
    ... 6 more
Caused by: class org.apache.ignite.binary.BinaryObjectException: Failed to unmarshal object with optimized marshaller
    at org.apache.ignite.internal.binary.BinaryUtils.doReadOptimized(BinaryUtils.java:1786)
    at org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize0(BinaryReaderExImpl.java:1962)
    at org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize(BinaryReaderExImpl.java:1714)
    at org.apache.ignite.internal.binary.GridBinaryMarshaller.deserialize(GridBinaryMarshaller.java:310)
    at org.apache.ignite.internal.binary.BinaryMarshaller.unmarshal0(BinaryMarshaller.java:99)
    at org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.unmarshal(AbstractNodeNameAwareMarshaller.java:82)
    at org.apache.ignite.internal.util.IgniteUtils.unmarshal(IgniteUtils.java:9795)
    ... 9 more
Caused by: class org.apache.ignite.IgniteCheckedException: Failed to deserialize object with given class loader: [clsLdr=sun.misc.Launcher$AppClassLoader@18b4aac2, err=java.lang.reflect.InvocationTargetException]
    at org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller.unmarshal0(OptimizedMarshaller.java:235)
    at org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.unmarshal(AbstractNodeNameAwareMarshaller.java:94)
    at org.apache.ignite.internal.binary.BinaryUtils.doReadOptimized(BinaryUtils.java:1783)
    ... 15 more
Caused by: java.io.IOException: java.lang.reflect.InvocationTargetException
    at org.apache.ignite.internal.marshaller.optimized.OptimizedObjectInputStream.readSerializable(OptimizedObjectInputStream.java:611)
    at org.apache.ignite.internal.marshaller.optimized.OptimizedClassDescriptor.read(OptimizedClassDescriptor.java:927)
    at org.apache.ignite.internal.marshaller.optimized.OptimizedObjectInputStream.readObject0(OptimizedObjectInputStream.java:346)
    at org.apache.ignite.internal.marshaller.optimized.OptimizedObjectInputStream.readObjectOverride(OptimizedObjectInputStream.java:199)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:421)
    at org.apache.ignite.internal.marshaller.optimized.OptimizedObjectInputStream.readFields(OptimizedObjectInputStream.java:513)
    at org.apache.ignite.internal.marshaller.optimized.OptimizedObjectInputStream.readSerializable(OptimizedObjectInputStream.java:601)
    at org.apache.ignite.internal.marshaller.optimized.OptimizedClassDescriptor.read(OptimizedClassDescriptor.java:927)
    at org.apache.ignite.internal.marshaller.optimized.OptimizedObjectInputStream.readObject0(OptimizedObjectInputStream.java:346)
    at org.apache.ignite.internal.marshaller.optimized.OptimizedObjectInputStream.readObjectOverride(OptimizedObjectInputStream.java:199)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:421)
    at org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller.unmarshal0(OptimizedMarshaller.java:227)
    ... 17 more
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.ignite.internal.marshaller.optimized.OptimizedObjectInputStream.readSerializable(OptimizedObjectInputStream.java:606)
    ... 28 more
Caused by: java.lang.NoSuchMethodException: Test.$deserializeLambda$(java.lang.invoke.SerializedLambda)
    at java.lang.Class.getDeclaredMethod(Class.java:2130)
    at java.lang.invoke.SerializedLambda$1.run(SerializedLambda.java:224)
    at java.lang.invoke.SerializedLambda$1.run(SerializedLambda.java:221)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:221)
    ... 32 more

服务器:

import org.apache.ignite.Ignition;
import org.apache.ignite.Ignite;

public class Test {
    public static void main(String[] args)
    {
        Ignite ignite = Ignition.start();
    }
}

客户:

import org.apache.ignite.Ignition;
import org.apache.ignite.Ignite;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import java.util.List;
import java.util.Random;

import org.apache.ignite.stream.StreamTransformer;

public class Test {
    private static final String CACHE_NAME = "randomNumbers";
    private static final Random RAND = new Random();
    private static final int RANGE = 1000;
    public static void main(String[] args)
    {
        Ignition.setClientMode(true);
        try(Ignite ignite = Ignition.start()) {
            CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>(CACHE_NAME);
            cfg.setIndexedTypes(Integer.class, Long.class);

            try (IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(cfg)) {
                try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
                    stmr.allowOverwrite(true);

                    stmr.receiver(StreamTransformer.from((e, arg) -> {
                        Long val = e.getValue();
                        e.setValue(val == null ? 1L : val + 1);
                        return null;
                    }));

                    // Stream 10 million of random numbers into the streamer cache.
                    for (int i = 1; i <= 1_000; i++) {
                        stmr.addData(RAND.nextInt(RANGE), 1L);

                        if (i % 500_000 == 0)
                            System.out.println("Number of tuples streamed into Ignite: " + i);
}}}}}}

共1个答案

匿名用户

要使其工作,您需要用静态嵌套类替换lambda。

在示例中,服务器和客户端共享相同的类路径,因此服务器可以访问反序列化所需的测试类,更多详细信息可以在以下线程中找到:无法反序列化lambda