添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

We have data replication solution using Kafka Connect. Data is being read using Debezium SqlServerConnector into multiple topics, and then written to PostgreSQL using JdbcSinkConnector. Each topic/table has dedicated sink.
Now we are having error on one of the sinks: org.postgresql.util.PSQLException: ERROR: invalid byte sequence for encoding "UTF8": 0x00

One of replicated records has some non-printable characters in text column. Message is placed in Kafka topic successfully, but then it fails when sink connector is writing into target table.
Target tables are created automatically, we are using “auto.create”: “true”
Source field (SQL Server): NVARCHAR(400)
Target field (PostgreSQL): TEXT

Now we need to fix it. I see two options:

  • (preffered) fin a way to write data as is, including non-printable characters
  • remove non-printable characters, either in source connector, or in sink.
  • I was looking for some transformation that we could use, but no luck so far.
    Any suggestions?

    Hi @rgurskis ,

    What output format ( key.converter and value.converter ) are you using in the SqlServerConnector and JdbcSinkConnector configs (or worker if not specified in the connector config)? For this kind of error, I’d recommend starting with these and ensuring that source and sink are in agreement.

    E.g., to use Avro for both key and value, both sides would use the same converters (along with any auth settings needed for Schema Registry):

    "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "<SR endpoint>", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "<SR endpoint>",

    Hope this helps!

    Thanks @dtroiano ,

    We are using io.confluent.connect.avro.AvroConverter for both key and value, on both source and sink.
    It looks to me that either Postgres is unable to accept 0x00 character, or JdbcSinkConnector is unable to produce correct INSERT statement with such value.
    BTW, I have found an open issue in JdbcSinkConnector that might be related: 0x00 never supported in Postgres, automatic stripping · Issue #1216 · confluentinc/kafka-connect-jdbc · GitHub

    OK, so I manage to solve it with some workaround. Instead of “fixing” the string values that are causing error I’m replacing it with constant value which is close enough. The field value in question ( exceptionmsg ) is an exception message. We don’t need exact message as long the type of exception is known.
    The solution involves two sink connectors:
    connector 1 - original
    This connector is handling all records except the ones that are causing the problem. I’m using regex expression to filter on value that starts with specific text and contains non-printable characters. Part of config looks like this:

    "transforms": "..., filterRecords", "transforms.filterRecords.filter.condition": "$[?(@.exceptionmsg =~ /Failed to evaluate expression.*[^[:print:]].*/)]", "transforms.filterRecords.filter.type": "exclude", "transforms.filterRecords.type": "io.confluent.connect.transforms.Filter$Value",

    connector 2 - handle issues
    This connector is reading the same topic as the original connector, but is processing only the records that might cause the problems on write.
    Config:

    "transforms": "... , filterRecords, exceptionmsgReplace", "transforms.exceptionmsgReplace.fields": "exceptionmsg", "transforms.exceptionmsgReplace.replacement": "Failed to evaluate expression (full exception truncated by replication, see source database)", "transforms.exceptionmsgReplace.type": "org.apache.kafka.connect.transforms.MaskField$Value", "transforms.filterRecords.filter.condition": "$[?(@.exceptionmsg =~ /Failed to evaluate expression.*[^[:print:]].*/)]", "transforms.filterRecords.filter.type": "include", "transforms.filterRecords.type": "io.confluent.connect.transforms.Filter$Value",

    To filter on field value I’m using Confluent’s Filter transformation. It is not available out of the box in Kafka Connect, but can be installed from here: connect-transformations | Confluent Hub