提问者:小点点

BigQuery中的数据流/波束流插入导致SSL错误


我们将数据流式传输到BigQuery(使用动态目标)的数据流管道在作业日志中显示了许多异常。所有这些都是javax.net. ssl.SSLException:连接重置异常。我在下面提供了堆栈跟踪,但想知道这是否会导致数据丢失?

这些错误很多(每天数百个),似乎与以下工作日志一致:

Execution of work for computation 'P6' on key '-REDACTED-' failed with uncaught 
exception. Work will be retried locally.

我是否可以得出结论,工作在工作人员本地进行了有效重试,并且没有数据丢失,但作业日志中仍然提到了异常?

我发现仅仅通过查看不同的日志来理解哪些错误实际上导致了数据丢失是非常令人困惑的。

示例堆栈跟踪:

Error message from worker: java.lang.RuntimeException: javax.net.ssl.SSLException: Connection reset
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:954) 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:994) 
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.flushRows(BatchedStreamingWrite.java:375) 
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.access$800(BatchedStreamingWrite.java:69) 
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements.finishBundle(BatchedStreamingWrite.java:271) 

Caused by: javax.net.ssl.SSLException: Connection reset java.base/sun.security.ssl.Alert.createSSLException(Alert.java:127) 
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:350) 
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:293) 
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:288) 
java.base/sun.security.ssl.SSLSocketImpl.handleException(SSLSocketImpl.java:1581) 
java.base/sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:979) 
java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:252) 
java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:292) 
java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351) 
java.base/sun.net.www.http.ChunkedInputStream.readAheadBlocking(ChunkedInputStream.java:552) 
java.base/sun.net.www.http.ChunkedInputStream.readAhead(ChunkedInputStream.java:609) 
java.base/sun.net.www.http.ChunkedInputStream.read(ChunkedInputStream.java:696) 
java.base/java.io.FilterInputStream.read(FilterInputStream.java:133) 
java.base/sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3510) 
com.google.api.client.http.javanet.NetHttpResponse$SizeValidatingInputStream.read(NetHttpResponse.java:164) 
java.base/java.io.FilterInputStream.read(FilterInputStream.java:133) 
java.base/java.io.FilterInputStream.read(FilterInputStream.java:107) 
com.google.common.io.ByteStreams.exhaust(ByteStreams.java:274) 
com.google.api.client.http.ConsumingInputStream.close(ConsumingInputStream.java:40) 
java.base/java.util.zip.InflaterInputStream.close(InflaterInputStream.java:232) 
java.base/java.util.zip.GZIPInputStream.close(GZIPInputStream.java:137) 
com.fasterxml.jackson.core.json.UTF8StreamJsonParser._closeInput(UTF8StreamJsonParser.java:252) 
com.fasterxml.jackson.core.base.ParserBase.close(ParserBase.java:369) 
com.google.api.client.json.jackson2.JacksonParser.close(JacksonParser.java:48) 
com.google.api.client.json.JsonParser.parse(JsonParser.java:363) 
com.google.api.client.json.JsonParser.parse(JsonParser.java:335) 
com.google.api.client.json.JsonObjectParser.parseAndClose(JsonObjectParser.java:79) 
com.google.api.client.json.JsonObjectParser.parseAndClose(JsonObjectParser.java:73) 
com.google.api.client.http.HttpResponse.parseAs(HttpResponse.java:456) 
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:591) 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.lambda$insertAll$1(BigQueryServicesImpl.java:878) 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$BoundedExecutorService$SemaphoreCallable.call(BigQueryServicesImpl.java:1342) 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
java.base/java.lang.Thread.run(Thread.java:834) 

Caused by: java.net.SocketException: Connection reset java.base/java.net.SocketInputStream.read(SocketInputStream.java:186) 
java.base/java.net.SocketInputStream.read(SocketInputStream.java:140) 
java.base/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:476) 
java.base/sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:470) 
java.base/sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(SSLSocketInputRecord.java:70) 
java.base/sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1354) 
java.base/sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:963) 
java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:252) 
java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:292) 
java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351) 
java.base/sun.net.www.http.ChunkedInputStream.readAheadBlocking(ChunkedInputStream.java:552) 
java.base/sun.net.www.http.ChunkedInputStream.readAhead(ChunkedInputStream.java:609) 
java.base/sun.net.www.http.ChunkedInputStream.read(ChunkedInputStream.java:696) 
java.base/java.io.FilterInputStream.read(FilterInputStream.java:133) 
java.base/sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3510) 
com.google.api.client.http.javanet.NetHttpResponse$SizeValidatingInputStream.read(NetHttpResponse.java:164) 
java.base/java.io.FilterInputStream.read(FilterInputStream.java:133) 
java.base/java.io.FilterInputStream.read(FilterInputStream.java:107) 
com.google.common.io.ByteStreams.exhaust(ByteStreams.java:274) 
com.google.api.client.http.ConsumingInputStream.close(ConsumingInputStream.java:40) 
java.base/java.util.zip.InflaterInputStream.close(InflaterInputStream.java:232) 
java.base/java.util.zip.GZIPInputStream.close(GZIPInputStream.java:137) 
com.fasterxml.jackson.core.json.UTF8StreamJsonParser._closeInput(UTF8StreamJsonParser.java:252) 
com.fasterxml.jackson.core.base.ParserBase.close(ParserBase.java:369) 
com.google.api.client.json.jackson2.JacksonParser.close(JacksonParser.java:48) 
com.google.api.client.json.JsonParser.parse(JsonParser.java:363) 
com.google.api.client.json.JsonParser.parse(JsonParser.java:335) 
com.google.api.client.json.JsonObjectParser.parseAndClose(JsonObjectParser.java:79) 
com.google.api.client.json.JsonObjectParser.parseAndClose(JsonObjectParser.java:73) 
com.google.api.client.http.HttpResponse.parseAs(HttpResponse.java:456) 
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:591) 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.lambda$insertAll$1(BigQueryServicesImpl.java:878) 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$BoundedExecutorService$SemaphoreCallable.call(BigQueryServicesImpl.java:1342) 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
java.base/java.lang.Thread.run(Thread.java:834)

共1个答案

匿名用户

这些错误不应导致数据丢失,因为抛出异常时会重试捆绑包。