提问者:小点点

更新Pivotal GemFire中的单个列


据我所知,没有选择使用gemfire中的查询来更新单个列。要更新单个列,我目前正在获取整个旧对象并修改更改的值并将其存储。如果有人实现了更新单个列的任何东西,请分享。

@Region("tracking")
public class Tracking implements Serializable {
public String id;
public String status;
public String program;
}



@Region("tracking")
public interface TrackingQueryRepository extends CrudRepository<Tracking, String> {
}

我是Delta传播实施的新手。我已经阅读了用户指南并尝试实施并收到了下面给出的例外情况。你能分享你对此的想法吗?

Another.java-域名类

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import org.springframework.data.annotation.Id;
import org.springframework.data.gemfire.mapping.Region;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.gemstone.gemfire.Delta;
import com.gemstone.gemfire.InvalidDeltaException;


@Region("delta")
public class Another implements Delta, Serializable {

    private static final long serialVersionUID = 1L;

    @Id
    private String anotherId;

    @JsonProperty("anotherProgramId")
    private String anotherProgramId;

    public Another() {
    }

    public Another(String anotherId, String anotherProgramId) {
        this.anotherId = anotherId;
        this.anotherProgramId = anotherProgramId;
    }

    public String getAnotherId() {
        return anotherId;
    }

    public void setAnotherId(String anotherId) {
        this.anotherIdChd = true;
        this.anotherId = anotherId;
    }

    public String getAnotherProgramId() {
        return anotherProgramId;
    }

    public void setAnotherProgramId(String anotherProgramId) {
        this.anotherProgramIdChd = true;
        this.anotherProgramId = anotherProgramId;
    }

    private transient boolean anotherIdChd = false;
    private transient boolean anotherProgramIdChd = false;

    @Override
    public String toString() {
        return "Another [anotherId=" + anotherId + ", anotherProgramId=" + anotherProgramId + "]";
    }

    @Override
    public void fromDelta(DataInput in) throws IOException, InvalidDeltaException {

        if (in.readBoolean()) {
            // Read the change and apply it to the object
            this.anotherId = in.toString();
            System.out.println(" Applied delta to field 'anotherId' = " + this.anotherId);
        }
        if (in.readBoolean()) {
            this.anotherProgramId = in.toString();
            System.out.println(" Applied delta to field 'anotherProgramId' = " + this.anotherProgramId);
        }
    }

    @Override
    public boolean hasDelta() {
        return this.anotherIdChd || this.anotherProgramIdChd;

    }

    @Override
    public void toDelta(DataOutput out) throws IOException {
        System.out.println("Extracting delta from " + this.toString());
        out.writeBoolean(anotherIdChd);
        if (anotherIdChd) {
            // Write just the changes into the data stream

            out.writeUTF(this.anotherId);
            // Once the delta information is written, reset the delta status
            // field
            this.anotherIdChd = false;
            System.out.println(" Extracted delta from field 'anotherId' = " + this.anotherId);
        }
        out.writeBoolean(anotherProgramIdChd);
        if (anotherProgramIdChd) {
            out.writeUTF(this.anotherProgramId);
            this.anotherProgramIdChd = false;
            System.out.println(" Extracted delta from field 'anotherProgramId' = " + this.anotherProgramId);
        }

    }

}

client-cache. xml

<pdx>
        <pdx-serializer>
            <class-name>com.gemstone.gemfire.pdx.ReflectionBasedAutoSerializer</class-name>
            <parameter name="classes">
                <string>com\.rs\.main\..+</string>
            </parameter>
        </pdx-serializer>
    </pdx>

SpringXML命名空间

<util:properties id="gemfire-props">
<prop key="delta-propagation">true</prop>
</util:properties>
<gfe:client-cache pool-name="serverPool" cache-xml-location="classpath:client-cache.xml" properties-ref="gemfire-props"/>
<gfe:client-region id="delta" pool-name="serverPool" shortcut="PROXY" cloning-enabled="true">

本地gefire实例版本-pivotal-gemfire-9.0.1

区域创建创建区域-name=delta-type=REPLICATE

例外:

2017-05-08 22:17:12.370 ERROR 14696 --- [nio-8080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.dao.DataAccessResourceFailureException: remote server on 10.148.210.249(:loner):53784:e10627eb: com.gemstone.gemfire.pdx.PdxSerializationException: Could not create an instance of a class com.rs.main.Another; nested exception is com.gemstone.gemfire.cache.client.ServerOperationException: remote server on 10.148.210.249(:loner):53784:e10627eb: com.gemstone.gemfire.pdx.PdxSerializationException: Could not create an instance of a class com.rs.main.Another] with root cause

java.lang.ClassNotFoundException: com.rs.main.Another
    at org.apache.geode.internal.ClassPathLoader.forName(ClassPathLoader.java:437) ~[na:na]
    at org.apache.geode.internal.InternalDataSerializer.getCachedClass(InternalDataSerializer.java:4010) ~[na:na]
    at org.apache.geode.pdx.internal.PdxType.getPdxClass(PdxType.java:235) ~[na:na]
    at org.apache.geode.pdx.internal.PdxReaderImpl.basicGetObject(PdxReaderImpl.java:687) ~[na:na]
    at org.apache.geode.pdx.internal.PdxReaderImpl.getObject(PdxReaderImpl.java:682) ~[na:na]
    at org.apache.geode.internal.InternalDataSerializer.readPdxSerializable(InternalDataSerializer.java:3218) ~[na:na]
    at org.apache.geode.internal.InternalDataSerializer.basicReadObject(InternalDataSerializer.java:3005) ~[na:na]
    at org.apache.geode.DataSerializer.readObject(DataSerializer.java:2897) ~[na:na]
    at org.apache.geode.internal.util.BlobHelper.deserializeBlob(BlobHelper.java:90) ~[na:na]
    at org.apache.geode.internal.cache.EntryEventImpl.deserialize(EntryEventImpl.java:1891) ~[na:na]
    at org.apache.geode.internal.cache.EntryEventImpl.deserialize(EntryEventImpl.java:1884) ~[na:na]
    at org.apache.geode.internal.cache.VMCachedDeserializable.getDeserializedValue(VMCachedDeserializable.java:134) ~[na:na]
    at org.apache.geode.internal.cache.EntryEventImpl.processDeltaBytes(EntryEventImpl.java:1687) ~[na:na]
    at org.apache.geode.internal.cache.EntryEventImpl.setNewValueInRegion(EntryEventImpl.java:1558) ~[na:na]
    at org.apache.geode.internal.cache.EntryEventImpl.putExistingEntry(EntryEventImpl.java:1504) ~[na:na]
    at org.apache.geode.internal.cache.AbstractRegionMap.updateEntry(AbstractRegionMap.java:2959) ~[na:na]
    at org.apache.geode.internal.cache.AbstractRegionMap.basicPut(AbstractRegionMap.java:2782) ~[na:na]
    at org.apache.geode.internal.cache.LocalRegion.virtualPut(LocalRegion.java:5750) ~[na:na]
    at org.apache.geode.internal.cache.DistributedRegion.virtualPut(DistributedRegion.java:337) ~[na:na]
    at org.apache.geode.internal.cache.LocalRegionDataView.putEntry(LocalRegionDataView.java:151) ~[na:na]
    at org.apache.geode.internal.cache.LocalRegion.basicUpdate(LocalRegion.java:5730) ~[na:na]
    at org.apache.geode.internal.cache.LocalRegion.basicBridgePut(LocalRegion.java:5374) ~[na:na]
    at org.apache.geode.internal.cache.tier.sockets.command.Put65.cmdExecute(Put65.java:381) ~[na:na]
    at org.apache.geode.internal.cache.tier.sockets.BaseCommand.execute(BaseCommand.java:141) ~[na:na]
    at org.apache.geode.internal.cache.tier.sockets.ServerConnection.doNormalMsg(ServerConnection.java:776) ~[na:na]
    at org.apache.geode.internal.cache.tier.sockets.ServerConnection.doOneMessage(ServerConnection.java:904) ~[na:na]
    at org.apache.geode.internal.cache.tier.sockets.ServerConnection.run(ServerConnection.java:1160) ~[na:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_121]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_121]
    at org.apache.geode.internal.cache.tier.sockets.AcceptorImpl$1$1.run(AcceptorImpl.java:519) ~[na:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]

共3个答案

匿名用户

嗨(再次)Vigneshwaran-

对,所以GemFire的Query功能(通过QueryService)严格用于运行查询(即SELECT语句)。GemFire OQL中没有用于UPDATES和DELETES的等价物。GemFire是一个Key/Value存储,具有类似于Map的操作(例如get(key)put(key, value)等),您通常在其中处理整个应用程序域对象。但是,无论您的应用程序是对等缓存(即集群的成员)还是缓存客户端,GemFire的一些功能都可以为您提供帮助。通常,应用程序是缓存客户端,并具有/使用ClientCache,其中集群是独立的,客户端连接到集群很像RDBMS。

我还要说,虽然Function服务很有用,但它不是唯一的选择,实际上在代码方面可能会有更多的开销。

正如Wes在上面提到的,使用PARTITION Region是非常典型的,尤其是对于“事务性”数据(注意:复制区域更适用于不经常更改的参考数据)。

在“Function”可以帮助您的地方,您可以编写Function以获取对应用程序域对象的更新。“update”可以在Function的“参数”中传递。要调用Function,您可以使用GemFire的FunctionService来执行,使用目标方法之一(例如[onRegion(“跟踪”)][7])。

注意:其他目标方法(即on会员onServer)分别特定于您的应用程序是“对等体”还是“客户端”。例如,如果您的应用程序是客户端,则不能调用on会员,因为它假定您的应用程序是“对等体”。同样,如果您的应用程序是对等体,则不能调用onServer(s),因为它假定您的应用程序是“客户端”。onRegion(…)无论应用程序是对等体还是客户端都有效。虽然您可能会想为什么不一直使用onRegion,但根据您的UC(例如,考虑服务器组和路由)使用其他形式的定位具有技术优势。

当Region是一个PARTITION时,您还可以设置Function的[OptimizeForWrite()][8],这意味着Function将更新Region数据,因此,当使用Wes上面描述的过滤选项指定键时,将被路由到PARTITION的键主存储桶。

PARTITION Region的一致性来自这样一个事实,即所有更新都首先被路由并写入“主”(无论哪个服务器接收客户端的更新,也可能是一个甚至不托管该Region或相关数据/密钥的服务器;即不同的分片)。在主服务器更新后,然后数据更改被传播(分布)到集群中托管分区/分片数据集的辅助节点的其他节点。这就是Wes上面提到的“事务”一致性。

注意:PARTITION只是数据分片的另一个词,其中数据均匀分布在可用节点的集群中。当添加/删除节点时,数据会重新平衡。PARTITION也可以具有冗余。这些被称为辅助。PARTITION区域有助于延迟和吞吐量,因为数据被划分(默认分为113个桶),其中每个桶都有主副本,可能有1个或多个副本(辅助,用于冗余;HA),从而提高读写吞吐量。

此外,如果数据必须保留,那么您还可以设置Function的HA属性。这将允许在失败的情况下重试。

然而,尽管有所有这些优势,您仍然必须处理服务器上Function中更新应用程序域对象的“操作方法”。您还必须处理“映射”,因为像GemFire这样的Key/Value存储中确实没有等效的ORM。当然,这并不难,但也许有更好的方法。

还有另一个功能,称为Delta传播。本质上,每当您进行更新时,您总是会获取并更新GemFire中的完整值。

注意:可以像投影一样查询对象的选择字段,但它不是代理或与实际对象相关。

当您利用GemFire的序列化功能时,您可以利用Delta传播。

当实现“增量”时,只有应用程序域对象中的差异被实际序列化,通过线路发送,无论是在客户端和服务器之间,还是在坚持冗余策略时在对等方之间。这对你来说是完全无缝的。你得到你的对象(客户端),更新它,然后把它放进去。GemFire为你处理发送“增量”的逻辑。

此外,在集群中的服务器上使用客户端/服务器拓扑和PARTITION区域时,您可以启用单跳访问,这会有效地将数据路由到包含“主”存储桶的服务器,从而避免额外的网络跃点,这将影响您对每次操作的感知延迟。

因此,在Deltas和Single-Hop之间,您最终会得到一个非常高性能的解决方案,并且仍然可以利用面向对象的方法,如您所期望的那样使用您的应用程序域对象API。

但是,请注意使用Deltas的陷阱。

不管怎样,思考的食粮。你通常有不止一种方法来完成一项任务,但是更好的方法并不总是显而易见的,直到你根据你的UC/目标来衡量和评估预期的效果。

干杯约翰

匿名用户

您不能使用查询服务更新列。我建议您考虑Function服务以实现事务一致性。使区域分区并使用. onRegion().withFilter(key).withArgs(列和值映射)调用函数。

您的函数将读取对象、应用更新并放置。

通过这种方式,您的读取和更新将发生在服务器上的单个线程中,确保事务一致性,而不是读取客户端上的对象,更改值,执行放置操作,并希望没有其他人在您下面溜进来。

匿名用户

我们实现相同目标的另一种方法是使用自定义函数,该函数在获取分布式锁后更新值(BeanUtils)。

这可能会增加性能开销,但保证数据完整性。这就是权衡。

见下面的伪代码

 try{

 //this can be regionName 
 dls = DistributedLockService.getServiceNamed(arbitrary-lock-name) 
 //the key is normally the object @Id
 dls.lock(some-key, waitTimeOut, leaseTimeOut)

 row = region.get(id)
 //Here we copy the desired value (input to function) to the latest value
 BeanUtils.copyProperty(row, key, value);      
 //Insert the modified record to Gemfire - now this becomes equivalent of update <region> set value =  for a specific property.
 region.put(id, row) 

 } finally{

 dls.unlock(some-key);

 }