avro.encode
Encodes a record's field into the Avro format.
Description
The processor takes a record's field and encodes it using a schema into the Avro format. It provides two strategies for determining the schema:
-
preRegistered (recommended) This strategy downloads an existing schema from the schema registry and uses it to encode the record. This requires the schema to already be registered in the schema registry. The schema is downloaded only once and cached locally.
-
autoRegister (for development purposes) This strategy infers the schema by inspecting the structured data and registers it in the schema registry. If the record schema is known in advance it's recommended to use the preRegistered strategy and manually register the schema, as this strategy comes with limitations.
The strategy uses reflection to traverse the structured data of each record and determine the type of each field. If a specific field is set to nil the processor won't have enough information to determine the type and will default to a nullable string. Because of this it is not guaranteed that two records with the same structure produce the same schema or even a backwards compatible schema. The processor registers each inferred schema in the schema registry with the same subject, therefore the schema compatibility checks need to be disabled for this schema to prevent failures. If the schema subject does not exist before running this processor, it will automatically set the correct compatibility settings in the schema registry.
This processor is the counterpart to avro.decode
.
Configuration parameters
Name | Type | Default | Description |
---|---|---|---|
auth.basic.password | string | null | The password to use with basic authentication. This option is required if auth.basic.username contains a value. If both auth.basic.username and auth.basic.password are empty basic authentication is disabled. |
auth.basic.username | string | null | The username to use with basic authentication. This option is required if auth.basic.password contains a value. If both auth.basic.username and auth.basic.password are empty basic authentication is disabled. |
field | string | .Payload.After | The field that will be encoded. For more information about the format, see Referencing fields. |
schema.autoRegister.subject | string | null | The subject name under which the inferred schema will be registered in the schema registry. |
schema.preRegistered.subject | string | null | The subject of the schema in the schema registry used to encode the record. |
schema.preRegistered.version | int | null | The version of the schema in the schema registry used to encode the record. |
schema.strategy | string | null | Strategy to use to determine the schema for the record. Available strategies are:
For more information about the behavior of each strategy read the main processor description. |
tls.ca.cert | string | null | The path to a file containing PEM encoded CA certificates. If this option is empty, Conduit falls back to using the host's root CA set. |
tls.client.cert | string | null | The path to a file containing a PEM encoded certificate. This option is required if tls.client.key contains a value. If both tls.client.cert and tls.client.key are empty TLS is disabled. |
tls.client.key | string | null | The path to a file containing a PEM encoded private key. This option is required if tls.client.cert contains a value. If both tls.client.cert and tls.client.key are empty TLS is disabled. |
url | string | null | URL of the schema registry (e.g. http://localhost:8085) |
Examples
Auto-register schema
This example shows the usage of the avro.encode
processor
with the autoRegister
schema strategy. The processor encodes the record's
.Payload.After
field using the schema that is extracted from the data
and registered on the fly under the subject example-autoRegister
.
Configuration parameters
Name | Value |
---|---|
auth.basic.password | null |
auth.basic.username | null |
field | .Payload.After |
schema.autoRegister.subject | example-autoRegister |
schema.preRegistered.subject | null |
schema.preRegistered.version | null |
schema.strategy | autoRegister |
tls.ca.cert | null |
tls.client.cert | null |
tls.client.key | null |
url | http://127.0.0.1:54322 |
Record difference
Before | After | ||||
1 | { | 1 | { | ||
2 | "position": "dGVzdC1wb3NpdGlvbg==", | 2 | "position": "dGVzdC1wb3NpdGlvbg==", | ||
3 | "operation": "create", | 3 | "operation": "create", | ||
4 | "metadata": { | 4 | "metadata": { | ||
5 | "key1": "val1" | 5 | "key1": "val1" | ||
6 | }, | 6 | }, | ||
7 | "key": null, | 7 | "key": null, | ||
8 | "payload": { | 8 | "payload": { | ||
9 | "before": null, | 9 | "before": null, | ||
10 | - | "after": { | 10 | + | "after": "\u0000\u0000\u0000\u0000\u0001ffffff\u0002@\u0002������\u0001@\u0001\u0006bar\u0000\u0002" |
11 | - | "myFloat": 2.3, | |||
12 | - | "myInt": 1, | |||
13 | - | "myMap": { | |||
14 | - | "bar": 2.2, | |||
15 | - | "foo": true | |||
16 | - | }, | |||
17 | - | "myString": "bar", | |||
18 | - | "myStruct": { | |||
19 | - | "bar": false, | |||
20 | - | "foo": 1 | |||
21 | - | } | |||
22 | - | } | |||
23 | } | 11 | } | ||
24 | } | 12 | } |
Pre-register schema
This example shows the usage of the avro.encode
processor
with the preRegistered
schema strategy. When using this strategy, the
schema has to be manually pre-registered. In this example we use the following schema:
{
"type":"record",
"name":"record",
"fields":[
{"name":"myString","type":"string"},
{"name":"myInt","type":"int"}
]
}
The processor encodes the record's.Key
field using the above schema.
Configuration parameters
Name | Value |
---|---|
auth.basic.password | null |
auth.basic.username | null |
field | .Key |
schema.autoRegister.subject | null |
schema.preRegistered.subject | example-preRegistered |
schema.preRegistered.version | 1 |
schema.strategy | preRegistered |
tls.ca.cert | null |
tls.client.cert | null |
tls.client.key | null |
url | http://127.0.0.1:54322 |
Record difference
Before | After | ||||
1 | { | 1 | { | ||
2 | "position": "dGVzdC1wb3NpdGlvbg==", | 2 | "position": "dGVzdC1wb3NpdGlvbg==", | ||
3 | "operation": "create", | 3 | "operation": "create", | ||
4 | "metadata": { | 4 | "metadata": { | ||
5 | "key1": "val1" | 5 | "key1": "val1" | ||
6 | }, | 6 | }, | ||
7 | - | "key": { | 7 | + | "key": "\u0000\u0000\u0000\u0000\u0001\u0006bar\u0002", |
8 | - | "myInt": 1, | |||
9 | - | "myString": "bar" | |||
10 | - | }, | |||
11 | "payload": { | 8 | "payload": { | ||
12 | "before": null, | 9 | "before": null, | ||
13 | "after": null | 10 | "after": null | ||
14 | } | 11 | } | ||
15 | } | 12 | } |