提问者:小点点

不支持的源数据类型:从Kafka主题消费时,JDBC Postgres Sink Connector中的STRUCT错误


当我尝试将JDBC连接器连接到Kafka中的主题时,我收到了这个错误,但我无法修复它。数据可以使用模式成功地序列化和反序列化,并且在将其发送到主题之前对其进行验证。此外,主题的基本消费者可以接收发送到主题中的消息,但JDBC连接器无法将消息中的值映射到表中。我尝试过自动创建=true和自动进化=true,但没有成功。值得一提的是,我有一个非常相似的表、模式和JDBC连接器,用于不同的主题,它可以成功地在数据库中插入值,它还有一个名为id的自动增量键,不需要在消息中指定它才能工作(以防万一你的第一个想法是Avro模式中缺少的id字段)。

这是我得到的错误:

Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Unsupported source data type: STRUCT (org.apache.kafka.connect.runtime.WorkerSinkTask)

org.apache.kafka.connect.errors.ConnectException: Unsupported source data type: STRUCT

at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.bindField(GenericDatabaseDialect.java:1627)

at io.confluent.connect.jdbc.dialect.DatabaseDialect.bindField(DatabaseDialect.java:608)

at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindField(PreparedStatementBinder.java:186)

at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindKeyFields(PreparedStatementBinder.java:154)

at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:102)

at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:184)

at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:80)

at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)

at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)

at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)

at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)

at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)

at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)

at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)

at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)

at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)

at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

at java.base/java.lang.Thread.run(Thread.java:829)

[2022-03-17 00:55:12,913] ERROR WorkerSinkTask{id=sink_postgres_filing-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.

at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:631)

at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)

at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)

at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)

at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)

at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)

at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)

at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)

at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

at java.base/java.lang.Thread.run(Thread.java:829)

Caused by: org.apache.kafka.connect.errors.ConnectException: Unsupported source data type: STRUCT

at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.bindField(GenericDatabaseDialect.java:1627)

at io.confluent.connect.jdbc.dialect.DatabaseDialect.bindField(DatabaseDialect.java:608)

at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindField(PreparedStatementBinder.java:186)

at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindKeyFields(PreparedStatementBinder.java:154)

at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:102)

at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:184)

at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:80)

at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)

at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)

... 10 more

当设置auto. create=true和auto.发展=true时,错误消息变得更加具体,并表示io.confluent.connect.avro.Union(STRUCT)类型没有映射到SQL数据库列类型

这是我的JDBC接收器连接器设置:

curl -i -X PUT http://localhost:8083/connectors/sink_postgres_filing/config \
     -H "Content-Type: application/json" \
     -d '{
            "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
            "value.converter.schema.registry.url": "http://schema-registry:8081",
            "value.converter.schema.enable": true,
            "value.converter": "io.confluent.connect.avro.AvroConverter",
            "key.converter.schema.registry.url": "http://schema-registry:8081",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "connection.url": "jdbc:postgresql://host.docker.internal:5432/filing-api",
            "connection.user": "postgres",
            "connection.password": "12345",
            "insert.mode": "upsert",
            "pk.mode": "record_value",
            "pk.fields": "filing_number",
            "topics": "filing",
            "errors.log.enable":true,
            "errors.log.include.messages":true
         }'

这是我的Avro模式:

curl -i -X POST http://localhost:8081/subjects/filing-value/versions/ \
      -H "Content-Type: application/vnd.schemaregistry.v1+json" \
      -d '{
             "schema": "{\"namespace\":\"io.Filing\",\"type\":\"record\",\"name\":\"Filing\",\"fields\":[{\"name\":\"filing_number\",\"type\":[\"long\"]},{\"name\":\"field1\",\"type\":[\"string\", \"null\"]},{\"name\":\"field2\",\"type\":[\"string\", \"null\"]},{\"name\":\"field3\",\"type\":[\"string\", \"null\"]},{\"name\":\"field4\",\"type\":[\"string\", \"null\"]},{\"name\":\"field5\",\"type\":[\"long\", \"null\"]},{\"name\":\"field6\",\"type\":[\"long\", \"null\"]},{\"name\":\"field7\",\"type\":[\"string\", \"null\"]},{\"name\":\"field8\",\"type\":\"string\"}]}",
             "schemaType": "AVRO"
          }'

这是我的PostgreSQL表:

CREATE TABLE IF NOT EXISTS filing
(
    id bigserial PRIMARY KEY,
    filing_number bigint NOT NULL UNIQUE,
    field1 character varying,
    field2 character varying,
    field3 character varying,
    field4 character varying,
    field5 bigint,
    field6 bigint,
    field7 character varying,
    field8 character varying NOT NULL
);

共1个答案

匿名用户

我已经解决了这个问题。但老实说,我不确定我做了什么。我将jdbc连接器设置更改为auto. create=trueauto.发展=true插入.mode=插入pk.mode=无,并删除了pk.field属性。

本质上,我希望连接器以自己的方式将其转储到数据库中,而不受我的干扰,看看它是否可以工作。一旦我这样做了,我就可以一个接一个地更改设置,它就可以工作了。

我已经产生了一些消息之前的一个不同的格式的主题,所以也许这些是导致问题的消息,但我不完全确定,因为我也删除了docker容器托管我的kafka代理清除之前的消息设置为那些我提到的,我也设置了错误。