OpenCDC record
An OpenCDC record in Conduit aims to standardize the format of data records exchanged between different connectors within a data processing pipeline. The primary objective is to ensure compatibility between various combinations of source and destination connectors.
Benefits
- Support for Operations: The format should support representing records for
create,update,delete, andsnapshotoperations. - Standard Metadata Fields: Definition of standard metadata fields to provide essential information about each record. These can vary depending on the record. See Metadata Fields for more information.
- Integration with Data Tools: We believe that being strict about the record format Conduit consumes and produces will make it easier to integrate with other data processing tools.
Fields
.Positionuniquely represents the position of record. This is used to track the position of a record in a source connector, enabling Conduit to resume a stopped pipeline..Operationdefines what triggered the creation of a record. There are four possibilities:create,update,deleteorsnapshot. The first three operations are encountered during normal CDC operation, whilesnapshotis meant to represent records during an initial load. Depending on the operation, the record will contain either the payload before the change, after the change, both or none (see fields.Payload.Beforeand.Payload.After)..Keyrepresents a value that should identify the entity (e.g. database row)..Metadatacontains additional information regarding the record..Payload.Beforeholds the payload before the operation ocurred. These could be present in operations such asupdateanddelete..Payload.Afterholds the payload after the operation ocurred. These could be present in operations such ascreate,snapshotorupdate.
We're indicating .Position, and not .position as defined in its Record message, to show its Go template notation as used by the Go representation of an OpenCDC record. This field is public and must start with an uppercase letter.
Representation
Conduit relies on Protocol Buffers (protobuf) when it comes to defining an OpenCDC record to benefit from the several advantages that it provides. Its definition can be found in the Buf Schema Registry.
When processing records in Conduit, you can always expect a similar structure to the following:
{
"position": "c3RhbmRpbmc=",
"operation": "update",
"metadata": {
"file.path": "./example.in",
"opencdc.readAt": "1663858188836816000",
"opencdc.version": "v1"
},
"key": "cGFkbG9jay1rZXk=",
"payload": {
"before": "eWVsbG93",
"after": {
"bool": true,
"float32": 1.2,
"float64": 1.2,
"int": 1,
"int32": 1,
"int64": 1,
"string": "orange"
}
}
}
.Position, .Key, and .Payload.Before are represented as Base64 encoded in the example above because these will be a byte slice when represented as JSON.
Metadata fields
As part of an OpenCDC record, there will be a set of fields provided that will vary depending on the connector. These fields can be common to all OpenCDC records as part of our standard, some related to Conduit, and others that will be provided by each Connector implementation independently. These fields can be useful to define conventions that will be then used by Conduit to expand its functionality. Notice that all these fields use a dot notation syntax to indicate what they refer to, preventing accidental clashes. Here are the ones you can find:
OpenCDC
opencdc.createdAtcan contain the time when the record was created in the 3rd party system. The expected format is a Unix timestamp in nanoseconds.opencdc.readAtcan contain the time when the record was read from the 3rd party system. The expected format is a Unix timestamp in nanoseconds.opencdc.versioncontains the version of the OpenCDC format (e.g., "v1"). This field exists to ensure the OpenCDC format version can be easily identified in case the record gets marshaled into a different untyped format (e.g. JSON).
{
...
"metadata": {
"opencdc.createdAt": "1663858188836816000",
"opencdc.readAt": "1663858188836816000",
"opencdc.version": "v1",
...
},
...
}
Conduit
Only available in records once they are read by a source plugin:
conduit.source.plugin.nameis the name of the source plugin that created the record.conduit.source.plugin.versionis the version of the source plugin that created the record.conduit.source.connector.idis the ID of the source connector that received the record.
{
...
"metadata": {
"conduit.source.connector.id": "connectorID",
"conduit.source.plugin.name": "example",
"conduit.source.plugin.version": "v1",
...
},
...
}
Only available in records once they are written by a destination plugin:
conduit.destination.plugin.nameis the name of the destination plugin that has written the record.conduit.destination.plugin.versionis the version of the destination plugin that has written the record.
{
...
"metadata": {
"conduit.destination.plugin.name": "example",
"conduit.destination.plugin.version": "v1",
...
},
...
}
When a record is sent to the Dead-Letter Queue (DLQ), you'll also see these extra fields that will give you an insight into why the record landed in the DLQ.
conduit.dlq.nack.errorcontains the error that caused a record to be nacked and pushed to the dead-letter queue.conduit.dlq.nack.node.idis the ID of the internal node that nacked the record.
Connector
These metadata fields will be provided by each connector implementation allowing them to add any necessary metadata. As previously mentioned, to avoid unintended conflicts of metadata keys, the convention these will follow are the same as before, indicating first the connector name that's adding them.
Taking the same previous record example, you'll notice there is a metadata key named file.path, which would indicate this field was added by a file plugin.
{
...
"metadata": {
"file.path": "./example.in",
...
},
...
}