unwrap.kafkaconnect
Unwraps a Kafka Connect record from an OpenCDC record.
Description
This processor unwraps a Kafka Connect record from the input OpenCDC record.
The input record's payload is replaced with the Kafka Connect record.
This is useful in cases where Conduit acts as an intermediary between a Debezium source and a Debezium destination. In such cases, the Debezium record is set as the OpenCDC record's payload, and needs to be unwrapped for further usage.
Configuration parameters
Name | Type | Default | Description |
---|---|---|---|
field | string | .Payload.After | Field is a reference to the field that contains the Kafka Connect record. For more information about the format, see Referencing fields. |
Examples
Unwrap a Kafka Connect record
This example shows how to unwrap a Kafka Connect record.
The Kafka Connect record is serialized as a JSON string in the .Payload.After
field (raw data).
The Kafka Connect record's payload will replace the OpenCDC record's payload.
We also see how the key is unwrapped too. In this case, the key comes in as structured data.
Configuration parameters
Name | Value |
---|---|
field | .Payload.After |
Record difference
Before | After | ||||
1 | { | 1 | { | ||
2 | "position": "dGVzdCBwb3NpdGlvbg==", | 2 | "position": "dGVzdCBwb3NpdGlvbg==", | ||
3 | "operation": "create", | 3 | "operation": "create", | ||
4 | "metadata": { | 4 | "metadata": { | ||
5 | "metadata-key": "metadata-value" | 5 | "metadata-key": "metadata-value" | ||
6 | }, | 6 | }, | ||
7 | "key": { | 7 | "key": { | ||
8 | - | "payload": { | 8 | + | "id": 27 |
9 | - | "id": 27 | |||
10 | - | }, | |||
11 | - | "schema": {} | |||
12 | }, | 9 | }, | ||
13 | "payload": { | 10 | "payload": { | ||
14 | "before": null, | 11 | "before": null, | ||
15 | - | "after": "{\n\"payload\": {\n \"description\": \"test2\"\n},\n\"schema\": {}\n}" | 12 | + | "after": { |
13 | + | "description": "test2" | |||
14 | + | } | |||
16 | } | 15 | } | ||
17 | } | 16 | } |