We have data replication solution using Kafka Connect. Data is being read using Debezium SqlServerConnector into multiple topics, and then written to PostgreSQL using JdbcSinkConnector. Each topic/table has dedicated sink.
Now we are having error on one of the sinks:
org.postgresql.util.PSQLException: ERROR: invalid byte sequence for encoding "UTF8": 0x00
One of replicated records has some non-printable characters in text column. Message is placed in Kafka topic successfully, but then it fails when sink connector is writing into target table.
Target tables are created automatically, we are using “auto.create”: “true”
Source field (SQL Server): NVARCHAR(400)
Target field (PostgreSQL): TEXT
Now we need to fix it. I see two options:
(preffered) fin a way to write data as is, including non-printable characters
remove non-printable characters, either in source connector, or in sink.
I was looking for some transformation that we could use, but no luck so far.
Any suggestions?
Hi
@rgurskis
,
What output format (
key.converter
and
value.converter
) are you using in the
SqlServerConnector
and
JdbcSinkConnector
configs (or worker if not specified in the connector config)? For this kind of error, I’d recommend starting with these and ensuring that source and sink are in agreement.
E.g., to use Avro for both key and value, both sides would use the same converters (along with any
auth settings
needed for Schema Registry):
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "<SR endpoint>",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "<SR endpoint>",
Hope this helps!
Thanks
@dtroiano
,
We are using
io.confluent.connect.avro.AvroConverter
for both key and value, on both source and sink.
It looks to me that either Postgres is unable to accept 0x00 character, or JdbcSinkConnector is unable to produce correct INSERT statement with such value.
BTW, I have found an open issue in JdbcSinkConnector that might be related:
0x00 never supported in Postgres, automatic stripping · Issue #1216 · confluentinc/kafka-connect-jdbc · GitHub
OK, so I manage to solve it with some workaround. Instead of “fixing” the string values that are causing error I’m replacing it with constant value which is close enough. The field value in question (
exceptionmsg
) is an exception message. We don’t need exact message as long the type of exception is known.
The solution involves two sink connectors:
connector 1 - original
This connector is handling all records
except
the ones that are causing the problem. I’m using regex expression to filter on value that starts with specific text and contains non-printable characters. Part of config looks like this:
"transforms": "..., filterRecords",
"transforms.filterRecords.filter.condition": "$[?(@.exceptionmsg =~ /Failed to evaluate expression.*[^[:print:]].*/)]",
"transforms.filterRecords.filter.type": "exclude",
"transforms.filterRecords.type": "io.confluent.connect.transforms.Filter$Value",
connector 2 - handle issues
This connector is reading the same topic as the original connector, but is processing only the records that might cause the problems on write.
Config:
"transforms": "... , filterRecords, exceptionmsgReplace",
"transforms.exceptionmsgReplace.fields": "exceptionmsg",
"transforms.exceptionmsgReplace.replacement": "Failed to evaluate expression (full exception truncated by replication, see source database)",
"transforms.exceptionmsgReplace.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.filterRecords.filter.condition": "$[?(@.exceptionmsg =~ /Failed to evaluate expression.*[^[:print:]].*/)]",
"transforms.filterRecords.filter.type": "include",
"transforms.filterRecords.type": "io.confluent.connect.transforms.Filter$Value",
To filter on field value I’m using Confluent’s Filter transformation. It is not available out of the box in Kafka Connect, but can be installed from here:
connect-transformations | Confluent Hub