Skip to main content

avro.encode

Encodes a record's field into the Avro format.

Description

The processor takes a record's field and encodes it using a schema into the Avro format. It provides two strategies for determining the schema:

  • preRegistered (recommended) This strategy downloads an existing schema from the schema registry and uses it to encode the record. This requires the schema to already be registered in the schema registry. The schema is downloaded only once and cached locally.

  • autoRegister (for development purposes) This strategy infers the schema by inspecting the structured data and registers it in the schema registry. If the record schema is known in advance it's recommended to use the preRegistered strategy and manually register the schema, as this strategy comes with limitations.

    The strategy uses reflection to traverse the structured data of each record and determine the type of each field. If a specific field is set to nil the processor won't have enough information to determine the type and will default to a nullable string. Because of this it is not guaranteed that two records with the same structure produce the same schema or even a backwards compatible schema. The processor registers each inferred schema in the schema registry with the same subject, therefore the schema compatibility checks need to be disabled for this schema to prevent failures. If the schema subject does not exist before running this processor, it will automatically set the correct compatibility settings in the schema registry.

This processor is the counterpart to avro.decode.

Configuration parameters

NameTypeDefaultDescription
auth.basic.passwordstring
null

The password to use with basic authentication. This option is required if auth.basic.username contains a value. If both auth.basic.username and auth.basic.password are empty basic authentication is disabled.

auth.basic.usernamestring
null

The username to use with basic authentication. This option is required if auth.basic.password contains a value. If both auth.basic.username and auth.basic.password are empty basic authentication is disabled.

fieldstring.Payload.After

The field that will be encoded.

For more information about the format, see Referencing fields.

schema.autoRegister.subjectstring
null

The subject name under which the inferred schema will be registered in the schema registry.

schema.preRegistered.subjectstring
null

The subject of the schema in the schema registry used to encode the record.

schema.preRegistered.versionint
null

The version of the schema in the schema registry used to encode the record.

schema.strategystring
null

Strategy to use to determine the schema for the record. Available strategies are:

  • preRegistered (recommended) - Download an existing schema from the schema registry. This strategy is further configured with options starting with schema.preRegistered.*.
  • autoRegister (for development purposes) - Infer the schema from the record and register it in the schema registry. This strategy is further configured with options starting with schema.autoRegister.*.

For more information about the behavior of each strategy read the main processor description.

tls.ca.certstring
null

The path to a file containing PEM encoded CA certificates. If this option is empty, Conduit falls back to using the host's root CA set.

tls.client.certstring
null

The path to a file containing a PEM encoded certificate. This option is required if tls.client.key contains a value. If both tls.client.cert and tls.client.key are empty TLS is disabled.

tls.client.keystring
null

The path to a file containing a PEM encoded private key. This option is required if tls.client.cert contains a value. If both tls.client.cert and tls.client.key are empty TLS is disabled.

urlstring
null

URL of the schema registry (e.g. http://localhost:8085)

Examples

Auto-register schema

This example shows the usage of the avro.encode processor with the autoRegister schema strategy. The processor encodes the record's .Payload.After field using the schema that is extracted from the data and registered on the fly under the subject example-autoRegister.

Configuration parameters

NameValue
auth.basic.password
null
auth.basic.username
null
field.Payload.After
schema.autoRegister.subjectexample-autoRegister
schema.preRegistered.subject
null
schema.preRegistered.version
null
schema.strategyautoRegister
tls.ca.cert
null
tls.client.cert
null
tls.client.key
null
urlhttp://127.0.0.1:54322

Record difference

Before
After
1
{
1
{
2
  "position": "dGVzdC1wb3NpdGlvbg==",
2
  "position": "dGVzdC1wb3NpdGlvbg==",
3
  "operation": "create",
3
  "operation": "create",
4
  "metadata": {
4
  "metadata": {
5
    "key1": "val1"
5
    "key1": "val1"
6
  },
6
  },
7
  "key": null,
7
  "key": null,
8
  "payload": {
8
  "payload": {
9
    "before": null,
9
    "before": null,
10
-
    "after": {
10
+
    "after": "\u0000\u0000\u0000\u0000\u0001ffffff\u0002@\u0002������\u0001@\u0001\u0006bar\u0000\u0002"
11
-
      "myFloat": 2.3,
12
-
      "myInt": 1,
13
-
      "myMap": {
14
-
        "bar": 2.2,
15
-
        "foo": true
16
-
      },
17
-
      "myString": "bar",
18
-
      "myStruct": {
19
-
        "bar": false,
20
-
        "foo": 1
21
-
      }
22
-
    }
23
  }
11
  }
24
}
12
}

Pre-register schema

This example shows the usage of the avro.encode processor with the preRegistered schema strategy. When using this strategy, the schema has to be manually pre-registered. In this example we use the following schema:

{
"type":"record",
"name":"record",
"fields":[
{"name":"myString","type":"string"},
{"name":"myInt","type":"int"}
]
}

The processor encodes the record's.Key field using the above schema.

Configuration parameters

NameValue
auth.basic.password
null
auth.basic.username
null
field.Key
schema.autoRegister.subject
null
schema.preRegistered.subjectexample-preRegistered
schema.preRegistered.version1
schema.strategypreRegistered
tls.ca.cert
null
tls.client.cert
null
tls.client.key
null
urlhttp://127.0.0.1:54322

Record difference

Before
After
1
{
1
{
2
  "position": "dGVzdC1wb3NpdGlvbg==",
2
  "position": "dGVzdC1wb3NpdGlvbg==",
3
  "operation": "create",
3
  "operation": "create",
4
  "metadata": {
4
  "metadata": {
5
    "key1": "val1"
5
    "key1": "val1"
6
  },
6
  },
7
-
  "key": {
7
+
  "key": "\u0000\u0000\u0000\u0000\u0001\u0006bar\u0002",
8
-
    "myInt": 1,
9
-
    "myString": "bar"
10
-
  },
11
  "payload": {
8
  "payload": {
12
    "before": null,
9
    "before": null,
13
    "after": null
10
    "after": null
14
  }
11
  }
15
}
12
}